306 lines
10 KiB
Python
306 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import sys
|
|
import csv
|
|
import os.path
|
|
import time
|
|
import re
|
|
import datetime
|
|
import logging
|
|
import shutil
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Tuple
|
|
|
|
|
|
@dataclass
|
|
class Config:
|
|
"""
|
|
Basic class for the configuration of this script.
|
|
- input_directory: we search for ldg and csv files recursively here
|
|
- output_directory: for all input files we do name.replace(input_directory,
|
|
output_directory)
|
|
- mappings_directory: directory of CSV mapping files
|
|
- csv_configs: configuration for the different input files
|
|
"""
|
|
input_directory: str
|
|
output_directory: str
|
|
mappings_directory: str
|
|
csv_configs: List
|
|
|
|
|
|
@dataclass
|
|
class CsvConfig:
|
|
"""
|
|
Class to define how to parse a certain CSV file. We use the
|
|
file_match_regex attribute to decide whether to apply a config for a file.
|
|
If multiple configs match a single file we raise an exception.
|
|
"""
|
|
account1: str
|
|
file_match_regex: str
|
|
fields: List[str]
|
|
input_date_format: str = "%m/%d/%Y"
|
|
output_date_format: str = "%Y/%m/%d"
|
|
skip: int = 1
|
|
delimiter: str = ","
|
|
quotechar: str = "\""
|
|
currency: str = "$"
|
|
|
|
|
|
@dataclass
|
|
class CsvMapping:
|
|
"""
|
|
Class that defines the account2 attribute for a CSV transaction.
|
|
description_pattern: string or regexes to match the description
|
|
specifiers: additonal conditions in the form
|
|
transaction_attribute=value;another_attribute=value2
|
|
"""
|
|
mapping_file: str
|
|
account2: str
|
|
description_pattern: str
|
|
specifiers: List[Tuple[str, str]] = field(default_factory=lambda: [])
|
|
|
|
|
|
@dataclass
|
|
class Transaction:
|
|
"""
|
|
Class for ledger transaction to render into ldg file.
|
|
"""
|
|
currency: str
|
|
debit: str
|
|
credit: str
|
|
date: str
|
|
account1: str
|
|
account2: str
|
|
description: str
|
|
csv_file: str
|
|
row: str
|
|
|
|
|
|
LEDGER_TRANSACTION_TEMPLATE = """
|
|
{t.date} {t.description} ; {t.row}
|
|
{t.account2} {t.currency} {t.debit}
|
|
{t.account1} {t.currency} {t.credit}
|
|
"""
|
|
|
|
|
|
def get_files(input_directory):
|
|
""" Gets files from directory recursively in lexigraphic order. """
|
|
return sorted([os.path.join(subdir, f)
|
|
for subdir, dirs, files in os.walk(input_directory)
|
|
for f in files])
|
|
|
|
|
|
def get_mappings(mappings_directory: str) -> List[CsvMapping]:
|
|
|
|
def parse_specifiers(s):
|
|
""" This is a little extra magic I have introduced to specify
|
|
mappings with more cranularity. The argument s is a string in the form
|
|
|
|
attribute1=value1;attribute2=value2;attribute3=value3
|
|
|
|
and we want to get it into the form
|
|
|
|
[(attribute1, value1), (attribute2, value2), (attribute3, value3)]
|
|
"""
|
|
r = []
|
|
for pair in s.split(';'):
|
|
attr, value = pair.split("=")
|
|
r.append((attr, value))
|
|
return r
|
|
|
|
def get_mappings_from_file(csv_file):
|
|
def row_to_mapping(row):
|
|
pattern = row[1]
|
|
if pattern.startswith("/") and pattern.endswith("/"):
|
|
row[1] = re.compile(pattern[1:-1], re.IGNORECASE)
|
|
if len(row) == 3 and row[2]:
|
|
row[2] = parse_specifiers(row[2])
|
|
return CsvMapping(csv_file, *row)
|
|
|
|
with open(csv_file, 'r') as f:
|
|
reader = csv.reader(f, delimiter=',', quotechar='"')
|
|
# ignore empty lines and comments
|
|
return [row_to_mapping(row) for row in reader
|
|
if row
|
|
if not row[0].startswith("#")]
|
|
return [m
|
|
for f in get_files(mappings_directory)
|
|
for m in get_mappings_from_file(f)]
|
|
|
|
|
|
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
|
def date_to_date(date: str) -> str:
|
|
d = datetime.datetime.strptime(date, config.input_date_format)
|
|
return d.strftime(config.output_date_format)
|
|
|
|
def flip_sign(amount: str) -> str:
|
|
return amount[1:] if amount.startswith("-") else "-" + amount
|
|
|
|
def row_to_transaction(row, fields):
|
|
""" The user can configure the mapping of CSV fields to the three
|
|
required fields date, amount and description via the CsvConfig. """
|
|
t = {field: row[index] for index, field in fields}
|
|
amount = t['amount']
|
|
return Transaction(config.currency, flip_sign(amount), amount,
|
|
date_to_date(t['date']), config.account1,
|
|
"account2", t['description'], csv_file, ", ".join(row))
|
|
|
|
fields = [(i, f) for i, f in enumerate(config.fields) if f]
|
|
with open(csv_file, 'r') as f:
|
|
reader = csv.reader(f, delimiter=config.delimiter,
|
|
quotechar=config.quotechar)
|
|
for _ in range(config.skip):
|
|
next(reader)
|
|
transactions = [row_to_transaction(row, fields)
|
|
for row in reader if row]
|
|
return transactions
|
|
|
|
|
|
def apply_mappings(transactions: List[Transaction], mappings: List[CsvMapping]):
|
|
def make_equal_len(str_1, str_2):
|
|
max_len = max(len(str_1), len(str_2))
|
|
str_1 += " " * (max_len - len(str_1))
|
|
str_2 += " " * (max_len - len(str_2))
|
|
return (str_1, str_2)
|
|
|
|
def get_matching_mappings(transaction):
|
|
t = transaction
|
|
matching_mappings = []
|
|
for mapping in mappings:
|
|
pattern = mapping.description_pattern
|
|
if type(pattern) is str and pattern != transaction.description:
|
|
continue
|
|
elif type(pattern) is re.Pattern and not pattern.match(t.description):
|
|
continue
|
|
specifiers_match = True
|
|
for attr, value in mapping.specifiers:
|
|
if getattr(t, attr) != value:
|
|
specifiers_match = False
|
|
if not specifiers_match:
|
|
continue
|
|
matching_mappings.append(mapping)
|
|
return matching_mappings
|
|
|
|
def get_account2(transaction):
|
|
matching_mappings = get_matching_mappings(transaction)
|
|
if not matching_mappings:
|
|
logging.info(f"No match for {transaction}.")
|
|
return ""
|
|
elif len(matching_mappings) == 1:
|
|
return matching_mappings[0].account2
|
|
else:
|
|
logging.info(
|
|
f"\nMultiple matches for {transaction}. Picking first.")
|
|
for m in matching_mappings:
|
|
logging.info(f" {m}")
|
|
return matching_mappings[0].account2
|
|
|
|
unmatched_expenses = []
|
|
for t in transactions:
|
|
account2 = get_account2(t)
|
|
if not account2:
|
|
unmatched_expenses.append(t)
|
|
account2 = "expenses"
|
|
t.account1, t.account2 = make_equal_len(t.account1, account2)
|
|
return unmatched_expenses
|
|
|
|
|
|
def render_to_file(transactions: List[Transaction], csv_file: str, ledger_file: str):
|
|
content = "".join([LEDGER_TRANSACTION_TEMPLATE.format(t=t)
|
|
for t in transactions])
|
|
status = "no change"
|
|
if not os.path.isfile(ledger_file):
|
|
with open(ledger_file, 'w') as f:
|
|
f.write(content)
|
|
status = "new"
|
|
else:
|
|
with open(ledger_file, 'r') as f:
|
|
old_content = f.read()
|
|
f.close()
|
|
if old_content != content:
|
|
with open(ledger_file, 'w') as f:
|
|
f.write(content)
|
|
status = "update"
|
|
if status != "no change":
|
|
logging.info(f"{csv_file:30} -> {ledger_file:30} | {status}")
|
|
|
|
|
|
def write_mappings(unmatched_transactions: List[Transaction], mappings_directory: str):
|
|
""" Write mappings for unmatched expenses for update by the user. """
|
|
if not unmatched_transactions:
|
|
return
|
|
fn = os.path.join(mappings_directory, "unmatched.csv")
|
|
with open(fn, 'a') as f:
|
|
writer = csv.writer(f)
|
|
for t in unmatched_transactions:
|
|
e = ["expenses", t.description,
|
|
f"credit={t.credit};date={t.date}"]
|
|
writer.writerow(e)
|
|
|
|
|
|
def process_csv_file(csv_file, mappings: List[CsvMapping], config: Config):
|
|
def csv_to_ldg_filename(csv_file: str, config: Config) -> str :
|
|
r = csv_file
|
|
r = r.replace(config.input_directory, config.output_directory)
|
|
r = r.replace(".csv", ".ldg")
|
|
return r
|
|
|
|
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
|
cs = [c for c in csv_configs
|
|
if re.match(c.file_match_regex, csv_file)]
|
|
if not cs:
|
|
raise Exception(f"No config for {csv_file=}.")
|
|
elif len(cs) > 1:
|
|
raise Exception(f"More than one config for {csv_file=}.")
|
|
return cs[0]
|
|
|
|
ledger_file = csv_to_ldg_filename(csv_file, config)
|
|
csv_config = get_csv_config(csv_file, config.csv_configs)
|
|
transactions = get_transactions(csv_file, csv_config)
|
|
unmatched_transactions = apply_mappings(transactions, mappings)
|
|
write_mappings(unmatched_transactions, config.mappings_directory)
|
|
render_to_file(transactions, csv_file, ledger_file)
|
|
|
|
|
|
def process_ldg_file(ldg_file: str, config: Config):
|
|
file_age = lambda file: time.time() - os.path.getmtime(file)
|
|
dest_file = ldg_file.replace(config.input_directory, config.output_directory)
|
|
status = "no change"
|
|
if not os.path.isfile(dest_file):
|
|
status = "new"
|
|
shutil.copy(ldg_file, dest_file)
|
|
if file_age(dest_file) > file_age(ldg_file):
|
|
shutil.copy(ldg_file, dest_file)
|
|
status = "update"
|
|
if status != "no change":
|
|
logging.info(f"{ldg_file:30} -> {dest_file:30} | {status}")
|
|
|
|
|
|
def main(config):
|
|
input_files = get_files(config.input_directory)
|
|
config.csv_configs = [CsvConfig(**c) for c in config.csv_configs]
|
|
mappings = get_mappings(config.mappings_directory)
|
|
for f in input_files:
|
|
if f.endswith(".csv"):
|
|
process_csv_file(f, mappings, config)
|
|
elif f.endswith(".ldg"):
|
|
process_ldg_file(f, config)
|
|
else:
|
|
m = f"Unsupported file type for '{f}'."
|
|
raise Exception(m)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(stream=sys.stdout,
|
|
level=logging.INFO,
|
|
format='%(message)s')
|
|
try:
|
|
config_file = sys.argv[1]
|
|
except IndexError:
|
|
config_file = "config.json"
|
|
with open(config_file, 'r') as f:
|
|
config = Config(**json.load(f))
|
|
main(config)
|
|
|