diff --git a/.gitignore b/.gitignore index 13d1490..6c3381c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +# Ignore sensitive data +gather.json # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/README.md b/README.md index 2cbb14e..a50ab1e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,28 @@ # ledgerpy -Scripts to transform different input formats (CSV and OFX) into ledger accounting files. Includes mapping language to update transaction details automatically. \ No newline at end of file +Scripts to transform different input formats (CSV and OFX) into ledger +accounting files. Includes mapping language to update transaction details +automatically. + +There are other [scripts](https://github.com/ledger/ledger/wiki/CSV-Import) that +attempt to handle the same use-cases. I have tried a couple of them, as well as +hledger's integrated CSV import, and ran into issues or didn't like the +usability. That's why I wrote my own scripts for my workflow. Probably not too +useful for anybody else, but I included an example workspace to showcase how I +use the scripts. + +## Dependencies + +- jinja2 +- ofxtools +- python3.8 or higher + +## Todo + +- [ ] Write this readme +- [ ] Create setup.py file +- [ ] Use OFX parser from ofxtools instead of parsing the XML +- [ ] Autoappend latest OFX data to CSV file +- [ ] Include example workspace with mock data to demo my workflow + + diff --git a/getofx.py b/getofx.py new file mode 100644 index 0000000..57d4672 --- /dev/null +++ b/getofx.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 + +import datetime +import ofxtools +import json +import logging +import sys +import csv +import xml.etree.ElementTree as ET + +from ofxtools import OFXClient +from ofxtools.Client import StmtRq, CcStmtEndRq, CcStmtRq +from functools import namedtuple + + +def get_transactions(data): + Transaction = namedtuple("Transaction", + ["details", "date", "description", + "amount", "type", "balance", "slip"]) + root = ET.fromstring(data) + ts = [] + for statement in root.iter("STMTTRN"): + description, date, amount = "", "", "" + for child in statement: + if child.tag == "TRNAMT": + amount = child.text + elif child.tag == "DTPOSTED": + d = datetime.datetime.strptime(child.text[:8], "%Y%m%d") + date = d.strftime("%m/%d/%Y") + elif child.tag == "NAME": + if description: + description = child.text + " " + description + else: + description = child.text + elif child.tag == "MEMO": + if description: + description = description + " " + child.text + else: + description = child.text + t = Transaction("-", date, description, amount, "-", "-", "-") + ts.append(t) + return ts + + +def process_account(client, secret, year, name, accttype, acctid, csv_file): + dtstart = datetime.datetime(int(year), 1, 1, tzinfo=ofxtools.utils.UTC) + dtend = datetime.datetime(int(year), 12, 31, tzinfo=ofxtools.utils.UTC) + + if accttype.upper() in ("CHECKING", "SAVINGS"): + rq = StmtRq(acctid=acctid, accttype=accttype.upper(), + dtstart=dtstart, dtend=dtend) + else: + rq = CcStmtRq(acctid=acctid, dtstart=dtstart, dtend=dtend) + + response = client.request_statements(secret, rq) + data = response.read().decode() + # with open(csv_file.replace(".csv", ".xml"), "w") as f: + # f.write(data) + transactions = get_transactions(data) + + with open(csv_file, "w") as f: + csv_writer = csv.writer(f) + csv_writer.writerow(["details", "date", "description", + "amount", "type", "balance", "slip"]) + for t in transactions: + csv_writer.writerow(t) + #if t.date.startswith(year): + + +def get_client(url, userid, org, fid, clientuid, bankid, version, **kwargs): + return OFXClient(url, userid=userid, org=org, fid=fid, + clientuid=clientuid, bankid=bankid, version=version, + prettyprint=True) + + +def main(config): + client = get_client(**config["client"]) + year = config["year"] + secret = config["secret"] + for account in config["accounts"]: + name = account["name"] + logging.info(f"Processing {name}.") + process_account(client, secret, year, **account) + + +if __name__ == "__main__": + try: + config_file = sys.argv[1] + except IndexError: + config_file = "gather.json" + with open(config_file, 'r') as f: + config = json.load(f) + main(config) + diff --git a/toldg.py b/toldg.py new file mode 100644 index 0000000..6dc87a7 --- /dev/null +++ b/toldg.py @@ -0,0 +1,316 @@ +#!/usr/bin/env python3 + +import json +import sys +import csv +import os.path +import time +import re +import datetime +import logging +import jinja2 +import shutil +import tempfile +from dataclasses import dataclass, field +from typing import List, Tuple + + +@dataclass +class Config: + """ + Basic class for the configuration of this script. + - input_directory: we search for ldg and csv files recursively here + - output_directory: for all input files we do name.replace(input_directory, + output_directory) + - mappings_directory: directory of CSV mapping files + - csv_configs: configuration for the different input files + """ + input_directory: str + output_directory: str + mappings_directory: str + csv_configs: List + + +@dataclass +class CsvConfig: + """ + Class to define how to parse a certain CSV file. We use the + file_match_regex attribute to decide whether to apply a config for a file. + If multiple configs match a single file we raise an exception. + """ + account1: str + file_match_regex: str + fields: List[str] + input_date_format: str = "%m/%d/%Y" + output_date_format: str = "%Y/%m/%d" + skip: int = 1 + delimiter: str = "," + quotechar: str = "\"" + currency: str = "$" + + +@dataclass +class CsvMapping: + """ + Class that defines the account2 attribute for a CSV transaction. + description_pattern: string or regexes to match the description + specifiers: additonal conditions in the form + transaction_attribute=value;another_attribute=value2 + """ + mapping_file: str + account2: str + description_pattern: str + specifiers: List[Tuple[str, str]] = field(default_factory=lambda: []) + + +@dataclass +class LdgTransaction: + """ + Class for ledger transaction to render into ldg file. + """ + currency: str + debit: str + credit: str + date: str + account1: str + account2: str + description: str + csv_file: str + row: str + + +LEDGER_TRANSACTION_TEMPLATE = """ +{{t.date}} {{t.description}} ; {{t.row}} + {{t.account2}} {{t.currency}} {{t.debit}} + {{t.account1}} {{t.currency}} {{t.credit}} + +""" + + +def get_files(input_directory): + """ Gets files from directory recursively in lexigraphic order. """ + return sorted([os.path.join(subdir, f) + for subdir, dirs, files in os.walk(input_directory) + for f in files]) + + +def get_mappings(mappings_directory: str) -> List[CsvMapping]: + + def parse_specifiers(s): + """ This is a little extra magic I have introduced to specify + mappings with more cranularity. The argument s is a string in the form + + attribute1=value1;attribute2=value2;attribute3=value3 + + and we want to get it into the form + + [(attribute1, value1), (attribute2, value2), (attribute3, value3)] + """ + r = [] + for pair in s.split(';'): + attr, value = pair.split("=") + r.append((attr, value)) + return r + + def get_mappings_from_file(csv_file): + def row_to_mapping(row): + pattern = row[1] + if pattern.startswith("/") and pattern.endswith("/"): + row[1] = re.compile(pattern[1:-1], re.IGNORECASE) + if len(row) == 3 and row[2]: + row[2] = parse_specifiers(row[2]) + return CsvMapping(csv_file, *row) + + with open(csv_file, 'r') as f: + reader = csv.reader(f, delimiter=',', quotechar='"') + # ignore empty lines and comments + return [row_to_mapping(row) for row in reader + if row + if not row[0].startswith("#")] + return [m + for f in get_files(mappings_directory) + for m in get_mappings_from_file(f)] + + +def get_transactions(csv_file, config: CsvConfig, mappings: List[CsvMapping]): + def date_to_date(date): + d = datetime.datetime.strptime(date, config.input_date_format) + return d.strftime(config.output_date_format) + + def flip_sign(amount): + if amount.startswith("-"): + return amount[1:] + return "-" + amount + + def make_equal_len(str_1, str_2): + max_len = max(len(str_1), len(str_2)) + str_1 += " " * (max_len - len(str_1)) + str_2 += " " * (max_len - len(str_2)) + return (str_1, str_2) + + def get_account2(transaction): + t = transaction + matching_mappings = [] + for mapping in mappings: + pattern = mapping.description_pattern + if type(pattern) is str and pattern == transaction.description: + pass + elif type(pattern) is re.Pattern and pattern.match(t.description): + pass + else: + continue + + specifiers_match = True + for attr, value in mapping.specifiers: + if getattr(t, attr) != value: + specifiers_match = False + + if specifiers_match: + matching_mappings.append(mapping) + + if not matching_mappings: + logging.info(f"No match for {transaction}.") + e = f"expenses,{t.description},credit={t.credit};date={t.date}\n" + unmatched_expenses.append(e) + return "expenses" + elif len(matching_mappings) == 1: + return matching_mappings[0].account2 + else: + logging.info( + f"\nMultiple matches for {transaction}. Picking first.") + for m in matching_mappings: + logging.info(f" {m}") + return matching_mappings[0].account2 + + def row_to_transaction(row): + t = {field: row[index] for index, field in fields} + amount = t['amount'] + t = LdgTransaction(config.currency, flip_sign(amount), amount, + date_to_date(t['date']), config.account1, + "", t['description'], csv_file, ", ".join(row)) + t.account1, t.account2 = make_equal_len(t.account1, get_account2(t)) + return t + + fields = [(index, field) + for index, field in enumerate(config.fields) if field] + unmatched_expenses = [] + with open(csv_file, 'r') as f: + reader = csv.reader(f, delimiter=config.delimiter, + quotechar=config.quotechar) + [next(reader) for _ in range(config.skip)] + transactions = [t + for row in reader + if row + if (t := row_to_transaction(row)) + ] + return transactions, unmatched_expenses + + +def render_to_file(transactions, csv_file, ledger_file, template_file=""): + if template_file: + dirname = os.path.dirname(template_file) + template_file = os.path.basename(template_file) + template_loader = jinja2.FileSystemLoader(searchpath=dirname) + template_env = jinja2.Environment(loader=template_loader) + template = template_env.get_template(template_file) + else: + template_env = jinja2.Environment(loader=jinja2.BaseLoader) + template = template_env.from_string(LEDGER_TRANSACTION_TEMPLATE) + + # Write transactions into virtual file. We could just create a string + # object, but that doesn't work as nicely with the Jinja API plus I think + # this approach is faster. + tf = tempfile.SpooledTemporaryFile(mode='w+') + for t in transactions: + tf.write(template.render(t=t)) + tf.seek(0) + new_ledger_content = tf.read() + + status = "no change" + if not os.path.isfile(ledger_file): + with open(ledger_file, 'w') as f: + f.write(new_ledger_content) + status = "new" + else: + with open(ledger_file, 'r') as f: + old_ledger_content = f.read() + f.close() + if new_ledger_content != old_ledger_content: + with open(ledger_file, 'w') as f: + f.write(new_ledger_content) + status = "update" + logging.info(f"{csv_file:30} -> {ledger_file:30} | {status}") + + +def main(config): + def file_age(file): + return time.time() - os.path.getmtime(file) + + def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig: + cs = [c for c in csv_configs + if re.match(c.file_match_regex, csv_file)] + if not cs: + raise Exception(f"No config for {csv_file=}.") + elif len(cs) > 1: + raise Exception(f"More than one config for {csv_file=}.") + return cs[0] + + def write_unmatched_expenses(unmatched_expenses, mappings_directory): + if not unmatched_expenses: + return + fn = os.path.join(mappings_directory, "unmatched.csv") + with open(fn, 'a') as f: + for e in unmatched_expenses: + f.write(e) + + def csv_to_ldg_filename(csv_file: str, config: Config): + r = csv_file + r = r.replace(config.input_directory, config.output_directory) + r = r.replace(".csv", ".ldg") + return r + + def process_csv_file(csv_file, mappings: List[CsvMapping], config: Config): + ledger_file = csv_to_ldg_filename(csv_file, config) + csv_config = get_csv_config(csv_file, config.csv_configs) + + transactions, unmatched = get_transactions( + csv_file, csv_config, mappings) + write_unmatched_expenses(unmatched, config.mappings_directory) + render_to_file(transactions, csv_file, ledger_file) + + def process_ldg_file(ldg_file: str, config: Config): + dest_file = ldg_file.replace( + config.input_directory, config.output_directory) + status = "no change" + if not os.path.isfile(dest_file): + status = "new" + shutil.copy(ldg_file, dest_file) + if file_age(dest_file) > file_age(ldg_file): + shutil.copy(ldg_file, dest_file) + status = "update" + logging.info(f"{ldg_file:30} -> {dest_file:30} | {status}") + + input_files = get_files(config.input_directory) + config.csv_configs = [CsvConfig(**c) for c in config.csv_configs] + mappings = get_mappings(config.mappings_directory) + for f in input_files: + if f.endswith(".csv"): + process_csv_file(f, mappings, config) + elif f.endswith(".ldg"): + process_ldg_file(f, config) + else: + m = f"Unsupported file type for '{f}'." + raise Exception(m) + + +if __name__ == "__main__": + logging.basicConfig(stream=sys.stdout, + level=logging.DEBUG, + format='%(message)s') + try: + config_file = sys.argv[1] + except IndexError: + config_file = "config.json" + with open(config_file, 'r') as f: + config = Config(**json.load(f)) + main(config)