ledgerpy/toldg.py

317 lines
10 KiB
Python
Raw Normal View History

2020-08-10 20:35:57 +02:00
#!/usr/bin/env python3
import json
import sys
import csv
import os.path
import time
import re
import datetime
import logging
import jinja2
import shutil
import tempfile
from dataclasses import dataclass, field
from typing import List, Tuple
@dataclass
class Config:
"""
Basic class for the configuration of this script.
- input_directory: we search for ldg and csv files recursively here
- output_directory: for all input files we do name.replace(input_directory,
output_directory)
- mappings_directory: directory of CSV mapping files
- csv_configs: configuration for the different input files
"""
input_directory: str
output_directory: str
mappings_directory: str
csv_configs: List
@dataclass
class CsvConfig:
"""
Class to define how to parse a certain CSV file. We use the
file_match_regex attribute to decide whether to apply a config for a file.
If multiple configs match a single file we raise an exception.
"""
account1: str
file_match_regex: str
fields: List[str]
input_date_format: str = "%m/%d/%Y"
output_date_format: str = "%Y/%m/%d"
skip: int = 1
delimiter: str = ","
quotechar: str = "\""
currency: str = "$"
@dataclass
class CsvMapping:
"""
Class that defines the account2 attribute for a CSV transaction.
description_pattern: string or regexes to match the description
specifiers: additonal conditions in the form
transaction_attribute=value;another_attribute=value2
"""
mapping_file: str
account2: str
description_pattern: str
specifiers: List[Tuple[str, str]] = field(default_factory=lambda: [])
@dataclass
class LdgTransaction:
"""
Class for ledger transaction to render into ldg file.
"""
currency: str
debit: str
credit: str
date: str
account1: str
account2: str
description: str
csv_file: str
row: str
LEDGER_TRANSACTION_TEMPLATE = """
{{t.date}} {{t.description}} ; {{t.row}}
{{t.account2}} {{t.currency}} {{t.debit}}
{{t.account1}} {{t.currency}} {{t.credit}}
"""
def get_files(input_directory):
""" Gets files from directory recursively in lexigraphic order. """
return sorted([os.path.join(subdir, f)
for subdir, dirs, files in os.walk(input_directory)
for f in files])
def get_mappings(mappings_directory: str) -> List[CsvMapping]:
def parse_specifiers(s):
""" This is a little extra magic I have introduced to specify
mappings with more cranularity. The argument s is a string in the form
attribute1=value1;attribute2=value2;attribute3=value3
and we want to get it into the form
[(attribute1, value1), (attribute2, value2), (attribute3, value3)]
"""
r = []
for pair in s.split(';'):
attr, value = pair.split("=")
r.append((attr, value))
return r
def get_mappings_from_file(csv_file):
def row_to_mapping(row):
pattern = row[1]
if pattern.startswith("/") and pattern.endswith("/"):
row[1] = re.compile(pattern[1:-1], re.IGNORECASE)
if len(row) == 3 and row[2]:
row[2] = parse_specifiers(row[2])
return CsvMapping(csv_file, *row)
with open(csv_file, 'r') as f:
reader = csv.reader(f, delimiter=',', quotechar='"')
# ignore empty lines and comments
return [row_to_mapping(row) for row in reader
if row
if not row[0].startswith("#")]
return [m
for f in get_files(mappings_directory)
for m in get_mappings_from_file(f)]
def get_transactions(csv_file, config: CsvConfig, mappings: List[CsvMapping]):
def date_to_date(date):
d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format)
def flip_sign(amount):
if amount.startswith("-"):
return amount[1:]
return "-" + amount
def make_equal_len(str_1, str_2):
max_len = max(len(str_1), len(str_2))
str_1 += " " * (max_len - len(str_1))
str_2 += " " * (max_len - len(str_2))
return (str_1, str_2)
def get_account2(transaction):
t = transaction
matching_mappings = []
for mapping in mappings:
pattern = mapping.description_pattern
if type(pattern) is str and pattern == transaction.description:
pass
elif type(pattern) is re.Pattern and pattern.match(t.description):
pass
else:
continue
specifiers_match = True
for attr, value in mapping.specifiers:
if getattr(t, attr) != value:
specifiers_match = False
if specifiers_match:
matching_mappings.append(mapping)
if not matching_mappings:
logging.info(f"No match for {transaction}.")
e = f"expenses,{t.description},credit={t.credit};date={t.date}\n"
unmatched_expenses.append(e)
return "expenses"
elif len(matching_mappings) == 1:
return matching_mappings[0].account2
else:
logging.info(
f"\nMultiple matches for {transaction}. Picking first.")
for m in matching_mappings:
logging.info(f" {m}")
return matching_mappings[0].account2
def row_to_transaction(row):
t = {field: row[index] for index, field in fields}
amount = t['amount']
t = LdgTransaction(config.currency, flip_sign(amount), amount,
date_to_date(t['date']), config.account1,
"", t['description'], csv_file, ", ".join(row))
t.account1, t.account2 = make_equal_len(t.account1, get_account2(t))
return t
fields = [(index, field)
for index, field in enumerate(config.fields) if field]
unmatched_expenses = []
with open(csv_file, 'r') as f:
reader = csv.reader(f, delimiter=config.delimiter,
quotechar=config.quotechar)
[next(reader) for _ in range(config.skip)]
transactions = [t
for row in reader
if row
if (t := row_to_transaction(row))
]
return transactions, unmatched_expenses
def render_to_file(transactions, csv_file, ledger_file, template_file=""):
if template_file:
dirname = os.path.dirname(template_file)
template_file = os.path.basename(template_file)
template_loader = jinja2.FileSystemLoader(searchpath=dirname)
template_env = jinja2.Environment(loader=template_loader)
template = template_env.get_template(template_file)
else:
template_env = jinja2.Environment(loader=jinja2.BaseLoader)
template = template_env.from_string(LEDGER_TRANSACTION_TEMPLATE)
# Write transactions into virtual file. We could just create a string
# object, but that doesn't work as nicely with the Jinja API plus I think
# this approach is faster.
tf = tempfile.SpooledTemporaryFile(mode='w+')
for t in transactions:
tf.write(template.render(t=t))
tf.seek(0)
new_ledger_content = tf.read()
status = "no change"
if not os.path.isfile(ledger_file):
with open(ledger_file, 'w') as f:
f.write(new_ledger_content)
status = "new"
else:
with open(ledger_file, 'r') as f:
old_ledger_content = f.read()
f.close()
if new_ledger_content != old_ledger_content:
with open(ledger_file, 'w') as f:
f.write(new_ledger_content)
status = "update"
logging.info(f"{csv_file:30} -> {ledger_file:30} | {status}")
def main(config):
def file_age(file):
return time.time() - os.path.getmtime(file)
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs
if re.match(c.file_match_regex, csv_file)]
if not cs:
raise Exception(f"No config for {csv_file=}.")
elif len(cs) > 1:
raise Exception(f"More than one config for {csv_file=}.")
return cs[0]
def write_unmatched_expenses(unmatched_expenses, mappings_directory):
if not unmatched_expenses:
return
fn = os.path.join(mappings_directory, "unmatched.csv")
with open(fn, 'a') as f:
for e in unmatched_expenses:
f.write(e)
def csv_to_ldg_filename(csv_file: str, config: Config):
r = csv_file
r = r.replace(config.input_directory, config.output_directory)
r = r.replace(".csv", ".ldg")
return r
def process_csv_file(csv_file, mappings: List[CsvMapping], config: Config):
ledger_file = csv_to_ldg_filename(csv_file, config)
csv_config = get_csv_config(csv_file, config.csv_configs)
transactions, unmatched = get_transactions(
csv_file, csv_config, mappings)
write_unmatched_expenses(unmatched, config.mappings_directory)
render_to_file(transactions, csv_file, ledger_file)
def process_ldg_file(ldg_file: str, config: Config):
dest_file = ldg_file.replace(
config.input_directory, config.output_directory)
status = "no change"
if not os.path.isfile(dest_file):
status = "new"
shutil.copy(ldg_file, dest_file)
if file_age(dest_file) > file_age(ldg_file):
shutil.copy(ldg_file, dest_file)
status = "update"
logging.info(f"{ldg_file:30} -> {dest_file:30} | {status}")
input_files = get_files(config.input_directory)
config.csv_configs = [CsvConfig(**c) for c in config.csv_configs]
mappings = get_mappings(config.mappings_directory)
for f in input_files:
if f.endswith(".csv"):
process_csv_file(f, mappings, config)
elif f.endswith(".ldg"):
process_ldg_file(f, config)
else:
m = f"Unsupported file type for '{f}'."
raise Exception(m)
if __name__ == "__main__":
logging.basicConfig(stream=sys.stdout,
level=logging.DEBUG,
format='%(message)s')
try:
config_file = sys.argv[1]
except IndexError:
config_file = "config.json"
with open(config_file, 'r') as f:
config = Config(**json.load(f))
main(config)