Update project structure and move to beancount

This commit is contained in:
2025-03-02 11:08:33 -05:00
parent 886bcdbdd1
commit 08c50e776e
17 changed files with 1844 additions and 296 deletions

113
src/toldg/utils.py Normal file
View File

@@ -0,0 +1,113 @@
import json
import logging
import os
import sys
from pathlib import Path
from typing import Dict, List
from pydantic import ValidationError
from toldg.models import Config, Transaction
def get_files(directory: Path, ending="") -> List[Path]:
"""Gets files from directory recursively in lexigraphic order."""
return [
Path(os.path.join(subdir, f))
for subdir, _, files in os.walk(directory)
for f in files
if f.endswith(ending)
]
def get_csv_files(directory: Path) -> List[Path]:
return get_files(directory, ".csv")
def get_ldg_files(directory: Path) -> List[Path]:
return get_files(directory, ".ldg")
def load_config() -> Config:
try:
config_file = Path(sys.argv[1])
except IndexError:
logging.critical("Provide configuration file as first argument.")
sys.exit(1)
try:
with open(config_file, "r") as f:
config = Config(**json.load(f))
except ValidationError as e:
logging.critical(f"Could not validate {config_file}.")
logging.info(e)
sys.exit(1)
except FileNotFoundError:
logging.critical(f"Could not find {config_file}.")
sys.exit(1)
return config
def category_to_bean(c: str) -> str:
sections = map(list, c.split(":"))
new_sections = []
for section in sections:
section[0] = section[0].upper()
new_sections.append("".join(section))
return ":".join(new_sections)
def write_meta(config: Config):
with open(config.output_file, "a") as f:
for category in config.categories:
f.write(f"2017-01-01 open {category_to_bean(category)}\n")
f.write("\n")
f.write('option "operating_currency" "USD"\n\n')
# Commodity section is not required for beancount
# for commodity in config.commodities:
# f.write(f"commodity {commodity}\n")
# f.write("\n")
def write_mappings(transactions: List[Transaction], mappings_file: Path):
mappings = {}
for t in transactions:
try:
mappings[t.account2.strip()].append(t.row)
except KeyError:
mappings[t.account2.strip()] = [t.row]
with open(mappings_file, "w") as f:
json.dump({k: sorted(v) for k, v in sorted(mappings.items())}, f, indent=4)
def read_mappings(mappings_file: Path) -> Dict[str, str]:
with open(mappings_file, "r") as f:
account2_to_rows = json.load(f)
return {
row: category for category, rows in account2_to_rows.items() for row in rows
}
def read_descriptions(descriptions_file: Path) -> Dict[str, str]:
"""I am basic so the description file is currently a double row based
format where the first row matches the CSV row and the second one is the
description."""
descriptions = {}
current_row = None
with open(descriptions_file, "r") as f:
for line in f.readlines():
if current_row is None:
current_row = line.rstrip("\n")
else:
descriptions[current_row] = line.rstrip("\n")
current_row = None
return descriptions
def remove_if_exists(output_file: Path):
try:
os.remove(output_file)
except OSError:
pass