
306 lines
10 KiB
Raw Permalink Normal View History

2020-08-10 20:35:57 +02:00
#!/usr/bin/env python3
import json
import sys
import csv
import os.path
import time
import re
import datetime
import logging
import shutil
from dataclasses import dataclass, field
from typing import List, Tuple
class Config:
Basic class for the configuration of this script.
- input_directory: we search for ldg and csv files recursively here
- output_directory: for all input files we do name.replace(input_directory,
- mappings_directory: directory of CSV mapping files
- csv_configs: configuration for the different input files
input_directory: str
output_directory: str
mappings_directory: str
csv_configs: List
class CsvConfig:
Class to define how to parse a certain CSV file. We use the
file_match_regex attribute to decide whether to apply a config for a file.
If multiple configs match a single file we raise an exception.
account1: str
file_match_regex: str
fields: List[str]
input_date_format: str = "%m/%d/%Y"
output_date_format: str = "%Y/%m/%d"
skip: int = 1
delimiter: str = ","
quotechar: str = "\""
currency: str = "$"
class CsvMapping:
Class that defines the account2 attribute for a CSV transaction.
description_pattern: string or regexes to match the description
specifiers: additonal conditions in the form
mapping_file: str
account2: str
description_pattern: str
specifiers: List[Tuple[str, str]] = field(default_factory=lambda: [])
class Transaction:
2020-08-10 20:35:57 +02:00
Class for ledger transaction to render into ldg file.
currency: str
debit: str
credit: str
date: str
account1: str
account2: str
description: str
csv_file: str
row: str
{t.date} {t.description} ; {t.row}
{t.account2} {t.currency} {t.debit}
{t.account1} {t.currency} {t.credit}
2020-08-10 20:35:57 +02:00
def get_files(input_directory):
""" Gets files from directory recursively in lexigraphic order. """
return sorted([os.path.join(subdir, f)
for subdir, dirs, files in os.walk(input_directory)
for f in files])
def get_mappings(mappings_directory: str) -> List[CsvMapping]:
def parse_specifiers(s):
""" This is a little extra magic I have introduced to specify
mappings with more cranularity. The argument s is a string in the form
and we want to get it into the form
[(attribute1, value1), (attribute2, value2), (attribute3, value3)]
r = []
for pair in s.split(';'):
attr, value = pair.split("=")
r.append((attr, value))
return r
def get_mappings_from_file(csv_file):
def row_to_mapping(row):
pattern = row[1]
if pattern.startswith("/") and pattern.endswith("/"):
row[1] = re.compile(pattern[1:-1], re.IGNORECASE)
if len(row) == 3 and row[2]:
row[2] = parse_specifiers(row[2])
return CsvMapping(csv_file, *row)
with open(csv_file, 'r') as f:
reader = csv.reader(f, delimiter=',', quotechar='"')
# ignore empty lines and comments
return [row_to_mapping(row) for row in reader
if row
if not row[0].startswith("#")]
return [m
for f in get_files(mappings_directory)
for m in get_mappings_from_file(f)]
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def date_to_date(date: str) -> str:
2020-08-10 20:35:57 +02:00
d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format)
def flip_sign(amount: str) -> str:
return amount[1:] if amount.startswith("-") else "-" + amount
2020-08-10 20:35:57 +02:00
def row_to_transaction(row, fields):
""" The user can configure the mapping of CSV fields to the three
required fields date, amount and description via the CsvConfig. """
t = {field: row[index] for index, field in fields}
amount = t['amount']
return Transaction(config.currency, flip_sign(amount), amount,
date_to_date(t['date']), config.account1,
"account2", t['description'], csv_file, ", ".join(row))
fields = [(i, f) for i, f in enumerate(config.fields) if f]
with open(csv_file, 'r') as f:
reader = csv.reader(f, delimiter=config.delimiter,
for _ in range(config.skip):
transactions = [row_to_transaction(row, fields)
for row in reader if row]
return transactions
def apply_mappings(transactions: List[Transaction], mappings: List[CsvMapping]):
2020-08-10 20:35:57 +02:00
def make_equal_len(str_1, str_2):
max_len = max(len(str_1), len(str_2))
str_1 += " " * (max_len - len(str_1))
str_2 += " " * (max_len - len(str_2))
return (str_1, str_2)
def get_matching_mappings(transaction):
2020-08-10 20:35:57 +02:00
t = transaction
matching_mappings = []
for mapping in mappings:
pattern = mapping.description_pattern
if type(pattern) is str and pattern != transaction.description:
elif type(pattern) is re.Pattern and not pattern.match(t.description):
2020-08-10 20:35:57 +02:00
specifiers_match = True
for attr, value in mapping.specifiers:
if getattr(t, attr) != value:
specifiers_match = False
if not specifiers_match:
return matching_mappings
2020-08-10 20:35:57 +02:00
def get_account2(transaction):
matching_mappings = get_matching_mappings(transaction)
2020-08-10 20:35:57 +02:00
if not matching_mappings:
logging.info(f"No match for {transaction}.")
return ""
2020-08-10 20:35:57 +02:00
elif len(matching_mappings) == 1:
return matching_mappings[0].account2
f"\nMultiple matches for {transaction}. Picking first.")
for m in matching_mappings:
logging.info(f" {m}")
return matching_mappings[0].account2
unmatched_expenses = []
for t in transactions:
account2 = get_account2(t)
if not account2:
account2 = "expenses"
t.account1, t.account2 = make_equal_len(t.account1, account2)
return unmatched_expenses
2020-08-10 20:35:57 +02:00
def render_to_file(transactions: List[Transaction], csv_file: str, ledger_file: str):
content = "".join([LEDGER_TRANSACTION_TEMPLATE.format(t=t)
for t in transactions])
2020-08-10 20:35:57 +02:00
status = "no change"
if not os.path.isfile(ledger_file):
with open(ledger_file, 'w') as f:
2020-08-10 20:35:57 +02:00
status = "new"
with open(ledger_file, 'r') as f:
old_content = f.read()
2020-08-10 20:35:57 +02:00
if old_content != content:
2020-08-10 20:35:57 +02:00
with open(ledger_file, 'w') as f:
2020-08-10 20:35:57 +02:00
status = "update"
2023-07-16 02:52:52 +02:00
if status != "no change":
logging.info(f"{csv_file:30} -> {ledger_file:30} | {status}")
2020-08-10 20:35:57 +02:00
def write_mappings(unmatched_transactions: List[Transaction], mappings_directory: str):
""" Write mappings for unmatched expenses for update by the user. """
if not unmatched_transactions:
fn = os.path.join(mappings_directory, "unmatched.csv")
with open(fn, 'a') as f:
writer = csv.writer(f)
for t in unmatched_transactions:
e = ["expenses", t.description,
def process_csv_file(csv_file, mappings: List[CsvMapping], config: Config):
def csv_to_ldg_filename(csv_file: str, config: Config) -> str :
r = csv_file
r = r.replace(config.input_directory, config.output_directory)
r = r.replace(".csv", ".ldg")
return r
2020-08-10 20:35:57 +02:00
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs
if re.match(c.file_match_regex, csv_file)]
if not cs:
raise Exception(f"No config for {csv_file=}.")
elif len(cs) > 1:
raise Exception(f"More than one config for {csv_file=}.")
return cs[0]
ledger_file = csv_to_ldg_filename(csv_file, config)
csv_config = get_csv_config(csv_file, config.csv_configs)
transactions = get_transactions(csv_file, csv_config)
unmatched_transactions = apply_mappings(transactions, mappings)
write_mappings(unmatched_transactions, config.mappings_directory)
render_to_file(transactions, csv_file, ledger_file)
2020-08-10 20:35:57 +02:00
def process_ldg_file(ldg_file: str, config: Config):
file_age = lambda file: time.time() - os.path.getmtime(file)
dest_file = ldg_file.replace(config.input_directory, config.output_directory)
status = "no change"
if not os.path.isfile(dest_file):
status = "new"
shutil.copy(ldg_file, dest_file)
if file_age(dest_file) > file_age(ldg_file):
shutil.copy(ldg_file, dest_file)
status = "update"
2023-07-16 02:52:52 +02:00
if status != "no change":
logging.info(f"{ldg_file:30} -> {dest_file:30} | {status}")
2020-08-10 20:35:57 +02:00
def main(config):
2020-08-10 20:35:57 +02:00
input_files = get_files(config.input_directory)
config.csv_configs = [CsvConfig(**c) for c in config.csv_configs]
mappings = get_mappings(config.mappings_directory)
for f in input_files:
if f.endswith(".csv"):
process_csv_file(f, mappings, config)
elif f.endswith(".ldg"):
process_ldg_file(f, config)
m = f"Unsupported file type for '{f}'."
raise Exception(m)
if __name__ == "__main__":
2020-08-10 20:35:57 +02:00
config_file = sys.argv[1]
except IndexError:
config_file = "config.json"
with open(config_file, 'r') as f:
config = Config(**json.load(f))