#!/usr/bin/env python3 import json import sys import csv import os.path import time import re import datetime import logging import shutil from dataclasses import dataclass, field from typing import List, Tuple @dataclass class Config: """ Basic class for the configuration of this script. - input_directory: we search for ldg and csv files recursively here - output_directory: for all input files we do name.replace(input_directory, output_directory) - mappings_directory: directory of CSV mapping files - csv_configs: configuration for the different input files """ input_directory: str output_directory: str mappings_directory: str csv_configs: List @dataclass class CsvConfig: """ Class to define how to parse a certain CSV file. We use the file_match_regex attribute to decide whether to apply a config for a file. If multiple configs match a single file we raise an exception. """ account1: str file_match_regex: str fields: List[str] input_date_format: str = "%m/%d/%Y" output_date_format: str = "%Y/%m/%d" skip: int = 1 delimiter: str = "," quotechar: str = "\"" currency: str = "$" @dataclass class CsvMapping: """ Class that defines the account2 attribute for a CSV transaction. description_pattern: string or regexes to match the description specifiers: additonal conditions in the form transaction_attribute=value;another_attribute=value2 """ mapping_file: str account2: str description_pattern: str specifiers: List[Tuple[str, str]] = field(default_factory=lambda: []) @dataclass class Transaction: """ Class for ledger transaction to render into ldg file. """ currency: str debit: str credit: str date: str account1: str account2: str description: str csv_file: str row: str LEDGER_TRANSACTION_TEMPLATE = """ {t.date} {t.description} ; {t.row} {t.account2} {t.currency} {t.debit} {t.account1} {t.currency} {t.credit} """ def get_files(input_directory): """ Gets files from directory recursively in lexigraphic order. """ return sorted([os.path.join(subdir, f) for subdir, dirs, files in os.walk(input_directory) for f in files]) def get_mappings(mappings_directory: str) -> List[CsvMapping]: def parse_specifiers(s): """ This is a little extra magic I have introduced to specify mappings with more cranularity. The argument s is a string in the form attribute1=value1;attribute2=value2;attribute3=value3 and we want to get it into the form [(attribute1, value1), (attribute2, value2), (attribute3, value3)] """ r = [] for pair in s.split(';'): attr, value = pair.split("=") r.append((attr, value)) return r def get_mappings_from_file(csv_file): def row_to_mapping(row): pattern = row[1] if pattern.startswith("/") and pattern.endswith("/"): row[1] = re.compile(pattern[1:-1], re.IGNORECASE) if len(row) == 3 and row[2]: row[2] = parse_specifiers(row[2]) return CsvMapping(csv_file, *row) with open(csv_file, 'r') as f: reader = csv.reader(f, delimiter=',', quotechar='"') # ignore empty lines and comments return [row_to_mapping(row) for row in reader if row if not row[0].startswith("#")] return [m for f in get_files(mappings_directory) for m in get_mappings_from_file(f)] def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]: def date_to_date(date: str) -> str: d = datetime.datetime.strptime(date, config.input_date_format) return d.strftime(config.output_date_format) def flip_sign(amount: str) -> str: return amount[1:] if amount.startswith("-") else "-" + amount def row_to_transaction(row, fields): """ The user can configure the mapping of CSV fields to the three required fields date, amount and description via the CsvConfig. """ t = {field: row[index] for index, field in fields} amount = t['amount'] return Transaction(config.currency, flip_sign(amount), amount, date_to_date(t['date']), config.account1, "account2", t['description'], csv_file, ", ".join(row)) fields = [(i, f) for i, f in enumerate(config.fields) if f] with open(csv_file, 'r') as f: reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar) for _ in range(config.skip): next(reader) transactions = [row_to_transaction(row, fields) for row in reader if row] return transactions def apply_mappings(transactions: List[Transaction], mappings: List[CsvMapping]): def make_equal_len(str_1, str_2): max_len = max(len(str_1), len(str_2)) str_1 += " " * (max_len - len(str_1)) str_2 += " " * (max_len - len(str_2)) return (str_1, str_2) def get_matching_mappings(transaction): t = transaction matching_mappings = [] for mapping in mappings: pattern = mapping.description_pattern if type(pattern) is str and pattern != transaction.description: continue elif type(pattern) is re.Pattern and not pattern.match(t.description): continue specifiers_match = True for attr, value in mapping.specifiers: if getattr(t, attr) != value: specifiers_match = False if not specifiers_match: continue matching_mappings.append(mapping) return matching_mappings def get_account2(transaction): matching_mappings = get_matching_mappings(transaction) if not matching_mappings: logging.info(f"No match for {transaction}.") return "" elif len(matching_mappings) == 1: return matching_mappings[0].account2 else: logging.info( f"\nMultiple matches for {transaction}. Picking first.") for m in matching_mappings: logging.info(f" {m}") return matching_mappings[0].account2 unmatched_expenses = [] for t in transactions: account2 = get_account2(t) if not account2: unmatched_expenses.append(t) account2 = "expenses" t.account1, t.account2 = make_equal_len(t.account1, account2) return unmatched_expenses def render_to_file(transactions: List[Transaction], csv_file: str, ledger_file: str): content = "".join([LEDGER_TRANSACTION_TEMPLATE.format(t=t) for t in transactions]) status = "no change" if not os.path.isfile(ledger_file): with open(ledger_file, 'w') as f: f.write(content) status = "new" else: with open(ledger_file, 'r') as f: old_content = f.read() f.close() if old_content != content: with open(ledger_file, 'w') as f: f.write(content) status = "update" logging.info(f"{csv_file:30} -> {ledger_file:30} | {status}") def write_mappings(unmatched_transactions: List[Transaction], mappings_directory: str): """ Write mappings for unmatched expenses for update by the user. """ if not unmatched_transactions: return fn = os.path.join(mappings_directory, "unmatched.csv") with open(fn, 'a') as f: writer = csv.writer(f) for t in unmatched_transactions: e = ["expenses", t.description, f"credit={t.credit};date={t.date}"] writer.writerow(e) def process_csv_file(csv_file, mappings: List[CsvMapping], config: Config): def csv_to_ldg_filename(csv_file: str, config: Config) -> str : r = csv_file r = r.replace(config.input_directory, config.output_directory) r = r.replace(".csv", ".ldg") return r def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig: cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)] if not cs: raise Exception(f"No config for {csv_file=}.") elif len(cs) > 1: raise Exception(f"More than one config for {csv_file=}.") return cs[0] ledger_file = csv_to_ldg_filename(csv_file, config) csv_config = get_csv_config(csv_file, config.csv_configs) transactions = get_transactions(csv_file, csv_config) unmatched_transactions = apply_mappings(transactions, mappings) write_mappings(unmatched_transactions, config.mappings_directory) render_to_file(transactions, csv_file, ledger_file) def process_ldg_file(ldg_file: str, config: Config): file_age = lambda file: time.time() - os.path.getmtime(file) dest_file = ldg_file.replace(config.input_directory, config.output_directory) status = "no change" if not os.path.isfile(dest_file): status = "new" shutil.copy(ldg_file, dest_file) if file_age(dest_file) > file_age(ldg_file): shutil.copy(ldg_file, dest_file) status = "update" logging.info(f"{ldg_file:30} -> {dest_file:30} | {status}") def main(config): input_files = get_files(config.input_directory) config.csv_configs = [CsvConfig(**c) for c in config.csv_configs] mappings = get_mappings(config.mappings_directory) for f in input_files: if f.endswith(".csv"): process_csv_file(f, mappings, config) elif f.endswith(".ldg"): process_ldg_file(f, config) else: m = f"Unsupported file type for '{f}'." raise Exception(m) if __name__ == "__main__": logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s') try: config_file = sys.argv[1] except IndexError: config_file = "config.json" with open(config_file, 'r') as f: config = Config(**json.load(f)) main(config)