generated from felixm/defaultpy
Add source info and sort transactions
This commit is contained in:
2003
poetry.lock
generated
2003
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,7 @@
|
|||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import beancount
|
||||||
|
import io
|
||||||
|
|
||||||
from rich.logging import RichHandler
|
from rich.logging import RichHandler
|
||||||
|
|
||||||
@@ -17,6 +19,38 @@ def init_logging():
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def load_and_write_back(filename):
|
||||||
|
entries, errors, options_map = beancount.loader.load_file(filename)
|
||||||
|
|
||||||
|
def beancount_entry_to_string(entry) -> str:
|
||||||
|
buf = io.StringIO()
|
||||||
|
# beancount.parser.printer.print_entry(entry, dcontext=options_map['dcontext'], file=buf)
|
||||||
|
beancount.parser.printer.print_entry(entry, file=buf)
|
||||||
|
return buf.getvalue().strip()
|
||||||
|
|
||||||
|
def is_transaction(entry) -> bool:
|
||||||
|
return isinstance(entry, beancount.core.data.Transaction)
|
||||||
|
|
||||||
|
prev_entry_was_transaction = False
|
||||||
|
if errors:
|
||||||
|
print(f"errors in generated '{filename}'")
|
||||||
|
for err in errors:
|
||||||
|
print(err)
|
||||||
|
else:
|
||||||
|
entries.sort(key=lambda e: e.date)
|
||||||
|
with open(filename, "w") as f:
|
||||||
|
f.write('option "operating_currency" "USD"\n')
|
||||||
|
for entry in entries:
|
||||||
|
if prev_entry_was_transaction:
|
||||||
|
f.write("\n")
|
||||||
|
elif not prev_entry_was_transaction and is_transaction(entry):
|
||||||
|
f.write("\n")
|
||||||
|
f.write(beancount_entry_to_string(entry))
|
||||||
|
f.write("\n")
|
||||||
|
prev_entry_was_transaction = is_transaction(entry)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
init_logging()
|
init_logging()
|
||||||
config = load_config()
|
config = load_config()
|
||||||
@@ -27,6 +61,7 @@ def main():
|
|||||||
write_meta(config)
|
write_meta(config)
|
||||||
process_ldg_files(config)
|
process_ldg_files(config)
|
||||||
process_csv_files(config)
|
process_csv_files(config)
|
||||||
|
load_and_write_back(config.output_file)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -81,4 +81,9 @@ class Transaction(BaseModel):
|
|||||||
description: str
|
description: str
|
||||||
csv_file: str
|
csv_file: str
|
||||||
row: str
|
row: str
|
||||||
|
index: int
|
||||||
mapping: Optional[Mapping] = None
|
mapping: Optional[Mapping] = None
|
||||||
|
|
||||||
|
def key(self):
|
||||||
|
return self.csv_file + ", " + self.row
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ import datetime
|
|||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
from typing import Any, Dict, List
|
|
||||||
|
|
||||||
import toldg.models
|
import toldg.models
|
||||||
import toldg.predict
|
import toldg.predict
|
||||||
@@ -19,7 +18,7 @@ def process_ldg_files(config: Config):
|
|||||||
f_out.write(f_in.read())
|
f_out.write(f_in.read())
|
||||||
|
|
||||||
|
|
||||||
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
|
||||||
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
|
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
|
||||||
if not cs:
|
if not cs:
|
||||||
logging.critical(f"No CSV config for {csv_file}.")
|
logging.critical(f"No CSV config for {csv_file}.")
|
||||||
@@ -30,7 +29,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
|||||||
return cs[0]
|
return cs[0]
|
||||||
|
|
||||||
|
|
||||||
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
|
||||||
def date_to_date(date: str) -> str:
|
def date_to_date(date: str) -> str:
|
||||||
d = datetime.datetime.strptime(date, config.input_date_format)
|
d = datetime.datetime.strptime(date, config.input_date_format)
|
||||||
return d.strftime(config.output_date_format)
|
return d.strftime(config.output_date_format)
|
||||||
@@ -38,7 +37,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
|||||||
def flip_sign(amount: str) -> str:
|
def flip_sign(amount: str) -> str:
|
||||||
return amount[1:] if amount.startswith("-") else "-" + amount
|
return amount[1:] if amount.startswith("-") else "-" + amount
|
||||||
|
|
||||||
def row_to_transaction(row, fields):
|
def row_to_transaction(idx, row, fields):
|
||||||
"""The user can configure the mapping of CSV fields to the three
|
"""The user can configure the mapping of CSV fields to the three
|
||||||
required fields date, amount and description via the CsvConfig."""
|
required fields date, amount and description via the CsvConfig."""
|
||||||
t = {field: row[index] for index, field in fields}
|
t = {field: row[index] for index, field in fields}
|
||||||
@@ -52,7 +51,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
|||||||
account2=toldg.models.UNKNOWN_CATEGORY,
|
account2=toldg.models.UNKNOWN_CATEGORY,
|
||||||
description=t["description"],
|
description=t["description"],
|
||||||
csv_file=csv_file,
|
csv_file=csv_file,
|
||||||
row=csv_file + ", " + ", ".join(row),
|
row=", ".join(row),
|
||||||
|
index=idx,
|
||||||
)
|
)
|
||||||
|
|
||||||
fields = [(i, f) for i, f in enumerate(config.fields) if f]
|
fields = [(i, f) for i, f in enumerate(config.fields) if f]
|
||||||
@@ -60,15 +60,17 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
|||||||
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
|
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
|
||||||
for _ in range(config.skip):
|
for _ in range(config.skip):
|
||||||
next(reader)
|
next(reader)
|
||||||
transactions = [row_to_transaction(row, fields) for row in reader if row]
|
rows = [row for row in reader if row]
|
||||||
|
transactions = [row_to_transaction(i, row, fields)
|
||||||
|
for i, row in enumerate(reversed(rows))]
|
||||||
return transactions
|
return transactions
|
||||||
|
|
||||||
|
|
||||||
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
|
def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
|
||||||
"""Apply mappings to transactions."""
|
"""Apply mappings to transactions."""
|
||||||
for t in transactions:
|
for t in transactions:
|
||||||
if t.row in mappings:
|
if t.key() in mappings:
|
||||||
mapping = mappings[t.row]
|
mapping = mappings[t.key()]
|
||||||
assert isinstance(mapping, Mapping)
|
assert isinstance(mapping, Mapping)
|
||||||
assert (
|
assert (
|
||||||
mapping.count > 0
|
mapping.count > 0
|
||||||
@@ -82,7 +84,7 @@ def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]
|
|||||||
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
|
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
|
||||||
|
|
||||||
|
|
||||||
def process_csv_files(config: Config) -> List[Transaction]:
|
def process_csv_files(config: Config) -> list[Transaction]:
|
||||||
csv_files = toldg.utils.get_csv_files(config.input_directory)
|
csv_files = toldg.utils.get_csv_files(config.input_directory)
|
||||||
transactions = []
|
transactions = []
|
||||||
for csv_file in csv_files:
|
for csv_file in csv_files:
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
|
|||||||
|
|
||||||
mappings = read_mappings(mappings_file)
|
mappings = read_mappings(mappings_file)
|
||||||
for t in transactions:
|
for t in transactions:
|
||||||
if t.row in mappings:
|
if t.key() in mappings:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
mapping = Mapping(
|
mapping = Mapping(
|
||||||
@@ -79,7 +79,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
|
|||||||
"narration": t.description,
|
"narration": t.description,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
mappings[t.row] = mapping
|
mappings[t.key()] = mapping
|
||||||
|
|
||||||
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
|
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
|
||||||
with open(mappings_file, "w") as f:
|
with open(mappings_file, "w") as f:
|
||||||
|
|||||||
@@ -8,12 +8,14 @@ BEANCOUNT_TRANSACTION_TEMPLATE = """
|
|||||||
{t.date} * {description}{tags}
|
{t.date} * {description}{tags}
|
||||||
{account2:<40} {t.debit:<6} {t.currency}
|
{account2:<40} {t.debit:<6} {t.currency}
|
||||||
{account1:<40} {t.credit:<6} {t.currency}
|
{account1:<40} {t.credit:<6} {t.currency}
|
||||||
|
source_file: "{t.csv_file}"
|
||||||
|
source_index: {t.index}
|
||||||
|
source_row: "{t.row}"
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
def format(t):
|
def format(t):
|
||||||
t.date = t.date.replace("/", "-")
|
t.date = t.date.replace("/", "-")
|
||||||
|
|
||||||
tags = ""
|
tags = ""
|
||||||
description = None
|
description = None
|
||||||
if t.mapping:
|
if t.mapping:
|
||||||
@@ -56,3 +58,4 @@ def render_to_file(transactions: List[Transaction], config: Config):
|
|||||||
content = "".join(format(t) for t in transactions)
|
content = "".join(format(t) for t in transactions)
|
||||||
with open(config.output_file, "a") as f:
|
with open(config.output_file, "a") as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user