Refactor ledger processing to explicit mapping which will make automated classfication easy

This commit is contained in:
2023-06-24 22:54:31 +02:00
parent b6de0e5514
commit ba0c906e3c
9 changed files with 421 additions and 8 deletions

60
src/models.py Normal file
View File

@@ -0,0 +1,60 @@
from pydantic import BaseModel, Extra
from typing import List
from pathlib import Path
from typing import List
class CsvConfig(BaseModel):
"""
Class to define how to parse a certain CSV file. We use the
file_match_regex attribute to decide whether to apply a config for a file.
If multiple configs match a single file we raise an exception.
"""
class Config:
extra = Extra.forbid
account1: str
file_match_regex: str
fields: List[str]
input_date_format: str = "%m/%d/%Y"
output_date_format: str = "%Y/%m/%d"
skip: int = 1
delimiter: str = ","
quotechar: str = "\""
currency: str = "$"
class Config(BaseModel):
"""
Basic class for the configuration of this script.
- input_directory: we search for ldg and csv files recursively here
- output_directory: for all input files we do name.replace(input_directory,
output_directory)
- mappings_directory: directory of CSV mapping files
- csv_configs: configuration for the different input files
"""
class Config:
extra = Extra.forbid
input_directory: Path
mappings_file: Path
output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig]
class Transaction(BaseModel):
"""
Class for ledger transaction to render into ldg file.
"""
class Config:
extra = Extra.forbid
currency: str
debit: str
credit: str
date: str
account1: str
account2: str
description: str
csv_file: str
row: str

102
src/process.py Normal file
View File

@@ -0,0 +1,102 @@
import csv
import logging
import re
import sys
import datetime
import src.utils
import src.write
from src.models import Config, CsvConfig, Transaction
from typing import List, Dict
def process_ldg_files(config: Config):
for ldg_file in src.utils.get_ldg_files(config.input_directory):
with open(ldg_file, 'r') as f_in:
with open(config.output_file, 'a') as f_out:
f_out.write(f_in.read())
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs
if re.match(c.file_match_regex, csv_file)]
if not cs:
logging.critical(f"No CSV config for {csv_file}.")
sys.exit(1)
elif len(cs) > 1:
logging.critical(f"Multiple CSV configs for {csv_file}.")
sys.exit(1)
return cs[0]
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def date_to_date(date: str) -> str:
d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format)
def flip_sign(amount: str) -> str:
return amount[1:] if amount.startswith("-") else "-" + amount
def row_to_transaction(row, fields):
""" The user can configure the mapping of CSV fields to the three
required fields date, amount and description via the CsvConfig. """
t = {field: row[index] for index, field in fields}
amount = t['amount']
return Transaction(
currency=config.currency,
debit=flip_sign(amount),
credit=amount,
date=date_to_date(t['date']),
account1=config.account1,
account2="account2",
description=t['description'],
csv_file=csv_file,
row=csv_file + ", " + ", ".join(row))
fields = [(i, f) for i, f in enumerate(config.fields) if f]
with open(csv_file, 'r') as f:
reader = csv.reader(f, delimiter=config.delimiter,
quotechar=config.quotechar)
for _ in range(config.skip):
next(reader)
transactions = [row_to_transaction(row, fields)
for row in reader if row]
return transactions
def find_duplicates(transactions: List[Transaction]):
rows = set()
for t in transactions:
row = t.row
if row in rows:
logging.critical(f"'{row}' is duplicated.")
logging.critical("Exit because of duplicated transactions.")
sys.exit(1)
else:
rows.add(row)
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, str]):
unused_mappings = set(mappings.keys())
for t in transactions:
if t.row in mappings:
t.account2 = mappings[t.row]
unused_mappings.discard(t.row)
else:
logging.warning(f"No mapping for '{t}'.")
for row in unused_mappings:
logging.warning(f"Unused mapping '{row}' -> {mappings[row]}.")
def process_csv_files(config: Config):
csv_files = src.utils.get_csv_files(config.input_directory)
transactions = []
for csv_file in csv_files:
csv_file = str(csv_file)
csv_config = get_csv_config(csv_file, config.csv_configs)
transactions += get_transactions(csv_file, csv_config)
find_duplicates(transactions)
mappings = src.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings)
src.utils.write_mappings(transactions, config.mappings_file)
src.write.render_to_file(transactions, config.output_file)

72
src/utils.py Normal file
View File

@@ -0,0 +1,72 @@
import logging
import os
import sys
import logging
import json
from pathlib import Path
from typing import List, Dict
from src.models import Config, Transaction
from pydantic import ValidationError
def get_files(directory: Path, ending="") -> List[Path]:
""" Gets files from directory recursively in lexigraphic order. """
return [Path(os.path.join(subdir, f))
for subdir, _, files in os.walk(directory)
for f in files
if f.endswith(ending)]
def get_csv_files(directory: Path) -> List[Path]:
return get_files(directory, ".csv")
def get_ldg_files(directory: Path) -> List[Path]:
return get_files(directory, ".ldg")
def load_config() -> Config:
try:
config_file = Path(sys.argv[1])
except IndexError:
logging.critical("Provide configuration file as first argument.")
sys.exit(1)
try:
with open(config_file, 'r') as f:
config = Config(**json.load(f))
except ValidationError as e:
logging.critical(f"Could not validate {config_file}.")
logging.info(e)
sys.exit(1)
except FileNotFoundError:
logging.critical(f"Could not find {config_file}.")
sys.exit(1)
return config
def write_mappings(transactions: List[Transaction], mappings_file: Path):
mappings = {}
for t in transactions:
try:
mappings[t.account2.strip()].append(t.row)
except KeyError:
mappings[t.account2.strip()] = [t.row]
with open(mappings_file, "w") as f:
json.dump({k: sorted(v) for k, v in sorted(mappings.items())}, f, indent=4)
def read_mappings(mappings_file: Path) -> Dict[str, str]:
with open(mappings_file, 'r') as f:
account2_to_rows = json.load(f)
return {row: category
for category, rows in account2_to_rows.items()
for row in rows}
def remove_if_exists(output_file: Path):
try:
os.remove(output_file)
except OSError:
pass

17
src/write.py Normal file
View File

@@ -0,0 +1,17 @@
from pathlib import Path
from typing import List
from src.models import Transaction
LEDGER_TRANSACTION_TEMPLATE = """
{t.date} {t.description} ; {t.row}
{t.account2} {t.currency} {t.debit}
{t.account1} {t.currency} {t.credit}
"""
def render_to_file(transactions: List[Transaction], ledger_file: Path):
content = "".join([LEDGER_TRANSACTION_TEMPLATE.format(t=t)
for t in transactions])
with open(ledger_file, 'a') as f:
f.write(content)