Compare commits

...

5 Commits

Author SHA1 Message Date
fbab1c9174 Make fzf prompt more legible 2026-02-05 14:37:27 -05:00
54871d04cd Update ledgerai to read existing transactions from beancount file 2025-12-20 15:46:34 -05:00
f56c559c84 Include ldg files instead of appending
Do not use dcontext for precision for now. It does not seem
to be necessary.
2025-12-19 14:57:52 -05:00
a190ddc524 Add source info and sort transactions 2025-12-09 21:27:48 -05:00
12408c33f4 Implement numpy based prediction feature 2025-03-16 09:58:21 -04:00
11 changed files with 1494 additions and 1210 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,6 @@
CLAUDE.md
# ---> Python
uv.lock
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]

2074
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,35 +7,28 @@ name = "toldg"
version = "0.1.0"
description = "Tool to generate ledger files from csv"
readme = "README.md"
requires-python = ">=3.12,<4.0"
requires-python = ">=3.13,<4.0"
license = {text = "MIT"}
authors = [
{name = "Felix Martin", email = "mail@felixm.de"}
]
dependencies = [
"fava (>=1.30.1,<2.0.0)",
"pydantic (>=2.10.6,<3.0.0)",
"beancount (>=3.1.0,<4.0.0)",
"rich (>=13.9.4,<14.0.0)",
"numpy (>=2.2.3,<3.0.0)"
"fava",
"pydantic",
"beancount",
"rich",
"numpy",
"ty",
"ruff",
]
[tool.poetry.group.dev.dependencies]
pre-commit = "^4.1.0"
black = "^25.1.0"
isort = "^6.0.1"
pytest = "^8.3.4"
[project.scripts]
toldg = "toldg.__main__:main"
[tool.setuptools]
package-dir = {"" = "src"}
[tool.black]
line-length = 88
target-version = ["py312"]
[tool.ruff]
target-version = "py313"
line-length = 100
[tool.isort]
profile = "black"
line_length = 88

View File

@@ -1,11 +1,15 @@
import logging
import sys
import beancount
import io
from rich.logging import RichHandler
from toldg.process import process_csv_files, process_ldg_files
from toldg.train import train
from toldg.utils import load_config, remove_if_exists, write_meta
from toldg.utils import load_config, remove_if_exists
from toldg.models import Config
from toldg.write import render_to_file
def init_logging():
@@ -17,16 +21,86 @@ def init_logging():
)
def get_new_transactions(transactions: list, csv_transactions: list) -> list:
key_to_transaction = {
(transaction.meta["source_file"], transaction.meta["source_index"]): transaction
for transaction in transactions
}
assert len(transactions) == len(key_to_transaction), "Transaction keys must be unique"
new_transactions = []
for csv_transaction in csv_transactions:
key = (csv_transaction.csv_file, csv_transaction.index)
if key in key_to_transaction:
existing_transaction = key_to_transaction[key]
if existing_transaction.meta["source_row"] != csv_transaction.row:
msg = f"Consistency error: CSV transaction {csv_transaction} is different to {existing_transaction}"
logging.error(msg)
raise SystemExit(1)
else:
new_transactions.append(csv_transaction)
logging.info(f"Got {len(new_transactions)} new and {len(transactions)} existing transactions.")
return new_transactions
def update_ledger(config: Config):
def beancount_entry_to_string(entry) -> str:
buf = io.StringIO()
beancount.parser.printer.print_entry(entry, file=buf)
return buf.getvalue().strip()
def is_transaction(transaction) -> bool:
return isinstance(transaction, beancount.core.data.Transaction)
filename = config.output_file
transactions, errors, options_map = beancount.loader.load_file(filename)
if errors:
logging.error(f"errors in '{filename}'")
for err in errors:
logging.error(err)
raise SystemExit(1)
transactions.sort(key=lambda e: e.date)
# Note(felixm): Only write back transactions from the main beancount file.
# The issue is that `beancount.loader.load_file` does not allow for a full
# round trip; some of the transactions get swallowed. Therefore, treat all files
# that are not the main beancount file as input only files. This means
# these input only files can only be edited by hand, but the user can use
# them to set options for beancount and fava, and add other types of
# transactions that would otherwise disappear after the round trip. I have seen
# tickets on GitHub about changing this API so that everything can be
# written back as is, but until then, this works well for my use-case.
transactions = [e for e in transactions if e.meta["filename"] == str(filename.absolute())]
csv_transactions = process_csv_files(config)
new_transactions = get_new_transactions(transactions, csv_transactions)
remove_if_exists(config.output_file)
process_ldg_files(config)
with open(filename, "a") as f:
prev_item_was_transaction = False
for transaction in transactions:
if prev_item_was_transaction:
f.write("\n")
elif not prev_item_was_transaction and is_transaction(transaction):
f.write("\n")
f.write(beancount_entry_to_string(transaction))
f.write("\n")
prev_item_was_transaction = is_transaction(transaction)
render_to_file(new_transactions, config)
logging.info(f"Ledger file '{filename}' was written successfully.")
def main():
init_logging()
config = load_config()
if len(sys.argv) > 2 and sys.argv[2] == "train":
train(config)
else:
remove_if_exists(config.output_file)
write_meta(config)
process_ldg_files(config)
process_csv_files(config)
update_ledger(config)
if __name__ == "__main__":

View File

@@ -5,12 +5,12 @@ import sys
EXECUTABLE_NAME = "fzf.exe" if sys.platform == "win32" else "fzf"
def iterfzf(iterable, prompt="> "):
cmd = [EXECUTABLE_NAME, "--prompt=" + prompt]
def iterfzf(iterable, prompt="> ", header=None, height="50%"):
cmd = [EXECUTABLE_NAME, "--prompt=" + prompt, "--height=" + height, "--reverse"]
if header:
cmd.append("--header=" + header)
encoding = sys.getdefaultencoding()
proc = subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None
)
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None)
if proc.stdin is None:
return None
try:
@@ -24,7 +24,10 @@ def iterfzf(iterable, prompt="> "):
return None
if proc.stdout is None:
return None
decode = lambda t: t.decode(encoding)
def decode(t):
return t.decode(encoding)
output = [decode(ln.strip(b"\r\n\0")) for ln in iter(proc.stdout.readline, b"")]
try:
return output[0]

View File

@@ -50,6 +50,7 @@ class Config(BaseModel):
output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig]
categories: List[str]
model: Path = Path("transaction_classifier.pkl")
class Mapping(BaseModel):
@@ -80,4 +81,8 @@ class Transaction(BaseModel):
description: str
csv_file: str
row: str
index: int
mapping: Optional[Mapping] = None
def key(self):
return self.csv_file + ", " + self.row

View File

@@ -1,47 +1,405 @@
from typing import List
import logging
import os
import pickle
import re
from collections import Counter
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
from toldg.fzf import iterfzf
from toldg.models import UNKNOWN_CATEGORY, Transaction
from toldg.models import Transaction
def get_sort_categories():
def sort_categories(row: str, categories: List[str]):
if learn is None:
return
_, _, probs = learn.predict(row)
cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist()))
categories.sort(
key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True
class Tokenizer:
"""Simple tokenizer for transaction descriptions."""
def __init__(self, min_count: int = 2, lowercase: bool = True):
self.min_count = min_count
self.lowercase = lowercase
self.vocab = {} # word -> index
self.inverse_vocab = {} # index -> word
def fit(self, texts: List[str]) -> None:
"""Build vocabulary from texts."""
word_counts = Counter()
for text in texts:
tokens = self._tokenize(text)
word_counts.update(tokens)
# Filter words by minimum count
filtered_words = [word for word, count in word_counts.items() if count >= self.min_count]
# Build vocabulary
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens."""
if self.lowercase:
text = text.lower()
# Simple tokenization: alphanumeric sequences
tokens = re.findall(r"\b\w+\b", text)
return tokens
def transform(self, text: str) -> Dict[int, int]:
"""Convert text to sparse vector (word index -> count)."""
tokens = self._tokenize(text)
counts = Counter()
for token in tokens:
if token in self.vocab:
counts[self.vocab[token]] += 1
return dict(counts)
def vocab_size(self) -> int:
"""Return vocabulary size."""
return len(self.vocab)
class LogisticRegression:
"""Multi-class logistic regression classifier."""
def __init__(
self,
input_dim: int,
output_dim: int,
learning_rate: float = 0.01,
reg_lambda: float = 0.01,
max_iterations: int = 1000,
):
self.input_dim = input_dim
self.output_dim = output_dim
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
self.max_iterations = max_iterations
# Initialize weights and bias
# weights shape: (output_dim, input_dim)
self.weights = np.random.randn(output_dim, input_dim) * 0.01
self.bias = np.zeros((output_dim, 1))
def softmax(self, z: np.ndarray) -> np.ndarray:
"""Compute softmax function."""
# Subtract max for numerical stability
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
def forward(self, x: np.ndarray) -> np.ndarray:
"""Forward pass: compute probabilities."""
# x shape: (input_dim, batch_size)
# output shape: (output_dim, batch_size)
z = np.dot(self.weights, x) + self.bias
return self.softmax(z)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute cross-entropy loss with L2 regularization."""
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Cross-entropy loss
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
# L2 regularization
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
return ce_loss + reg_loss
def backward(
self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for backpropagation."""
# x shape: (input_dim, batch_size)
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Gradient of loss with respect to scores (dL/dz)
dz = y_pred - y_true
# Gradient of loss with respect to weights (dL/dW)
dw = (1 / m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
# Gradient of loss with respect to bias (dL/db)
db = (1 / m) * np.sum(dz, axis=1, keepdims=True)
return dw, db
def train(
self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True
) -> List[float]:
"""Train model on batched data."""
# Convert sparse vectors to dense matrix
batch_size = len(x_batch)
x_dense = np.zeros((self.input_dim, batch_size))
for i, x_sparse in enumerate(x_batch):
for idx, val in x_sparse.items():
x_dense[idx, i] = val
# Convert labels to one-hot encoding
y_one_hot = np.zeros((self.output_dim, batch_size))
for i, y in enumerate(y_batch):
y_one_hot[y, i] = 1
losses = []
for iteration in range(self.max_iterations):
# Forward pass
y_pred = self.forward(x_dense)
# Compute loss
loss = self.compute_loss(y_pred, y_one_hot)
losses.append(loss)
if verbose and iteration % 100 == 0:
print(f"Iteration {iteration}: loss = {loss:.4f}")
# Check convergence
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
if verbose:
print(f"Converged at iteration {iteration}")
break
# Backward pass
dw, db = self.backward(x_dense, y_pred, y_one_hot)
# Update parameters
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
return losses
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
"""Predict class probabilities for a single sparse vector."""
# Convert sparse vector to dense vector
x_dense = np.zeros((self.input_dim, 1))
for idx, val in x_sparse.items():
x_dense[idx, 0] = val
# Forward pass
return self.forward(x_dense).flatten()
def predict(self, x_sparse: Dict[int, int]) -> int:
"""Predict class for a single sparse vector."""
probs = self.predict_proba(x_sparse)
return np.argmax(probs)
class TransactionClassifier:
"""Transaction classifier using Bag of Words and Logistic Regression."""
def __init__(self, model_path: Optional[Path] = None):
self.tokenizer = None
self.model = None
self.categories = []
self.category_to_idx = {}
self.idx_to_category = {}
if model_path and os.path.exists(model_path):
self.load(model_path)
def fit(
self,
transactions: List[Transaction],
categories: List[str],
verbose: bool = True,
) -> None:
"""Train classifier on transactions."""
# Extract texts and labels
texts = [t.row for t in transactions]
labels = [t.account2 for t in transactions]
# Build category mapping
self.categories = sorted(set(categories))
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
# Map labels to indices
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
# Initialize and fit tokenizer
self.tokenizer = Tokenizer(min_count=2)
self.tokenizer.fit(texts)
if verbose:
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
# Transform texts to feature vectors
x_vectors = [self.tokenizer.transform(text) for text in texts]
# Initialize and train model
self.model = LogisticRegression(
input_dim=self.tokenizer.vocab_size(),
output_dim=len(self.categories),
learning_rate=0.05,
reg_lambda=0.01,
max_iterations=3000,
)
learn = None
try:
from fastai.text.all import load_learner
self.model.train(x_vectors, label_indices, verbose=verbose)
learn = load_learner("export.pkl")
except ModuleNotFoundError:
user_input = input("No fastai module. Type yes to continue anyway.")
if user_input.strip().lower() != "yes":
raise Exception("fastai module missing")
def predict(self, text: str) -> Tuple[str, float, List[float]]:
"""
Predict category for a transaction text.
Returns:
tuple: (predicted_category, confidence, all_probabilities)
"""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Get predicted class
pred_idx = np.argmax(probs)
pred_category = self.idx_to_category[pred_idx]
confidence = probs[pred_idx]
return pred_category, confidence, probs
def sort_categories(self, text: str, categories: List[str]) -> None:
"""Sort categories by prediction probability for given text."""
if not self.model or not self.tokenizer:
return
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Create mapping from category to probability
cat_to_prob = {}
for idx, prob in enumerate(probs):
if idx in self.idx_to_category:
cat = self.idx_to_category[idx]
cat_to_prob[cat] = prob
# Sort categories by probability
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
def save(self, path: Path) -> None:
"""Save model to file."""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
model_data = {
"tokenizer_vocab": self.tokenizer.vocab,
"tokenizer_inverse_vocab": self.tokenizer.inverse_vocab,
"tokenizer_min_count": self.tokenizer.min_count,
"tokenizer_lowercase": self.tokenizer.lowercase,
"model_weights": self.model.weights,
"model_bias": self.model.bias,
"categories": self.categories,
"category_to_idx": self.category_to_idx,
"idx_to_category": self.idx_to_category,
}
with open(path, "wb") as f:
pickle.dump(model_data, f)
def load(self, path: Path) -> None:
"""Load model from file."""
with open(path, "rb") as f:
model_data = pickle.load(f)
# Restore tokenizer
self.tokenizer = Tokenizer(
min_count=model_data["tokenizer_min_count"],
lowercase=model_data["tokenizer_lowercase"],
)
self.tokenizer.vocab = model_data["tokenizer_vocab"]
self.tokenizer.inverse_vocab = model_data["tokenizer_inverse_vocab"]
# Restore categories
self.categories = model_data["categories"]
self.category_to_idx = model_data["category_to_idx"]
self.idx_to_category = model_data["idx_to_category"]
# Restore model
input_dim = len(self.tokenizer.vocab)
output_dim = len(self.categories)
self.model = LogisticRegression(input_dim, output_dim)
self.model.weights = model_data["model_weights"]
self.model.bias = model_data["model_bias"]
def train_classifier(
transactions: List[Transaction],
categories: List[str],
output_path: Path = Path("transaction_classifier.pkl"),
):
"""Train transaction classifier and save to file."""
global _classifier
# Filter transactions with account2
valid_transactions = [t for t in transactions if t.account2 in categories]
if len(valid_transactions) < 10:
logging.warning("Not enough transactions for training. Need at least 10.")
return
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
# Initialize and train classifier
_classifier = TransactionClassifier()
_classifier.fit(valid_transactions, categories, verbose=True)
# Save classifier
_classifier.save(output_path)
logging.info(f"Classifier saved to {output_path}")
def get_sort_categories(model_path: Path):
"""Get function to sort categories by prediction probability."""
_classifier = None
def sort_categories(row: str, categories: List[str]):
if _classifier is None:
return
_classifier.sort_categories(row, categories)
try:
model_path = Path("transaction_classifier.pkl")
_classifier = TransactionClassifier(model_path)
if _classifier.model is None:
logging.warning("No trained model found. Categories will not be sorted.")
except Exception as e:
logging.warning(f"Error loading classifier: {e}")
logging.warning("Categories will not be sorted.")
return sort_categories
def add_account2(transactions: List[Transaction], categories: List[str]):
unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions))
def add_account2(model_path: Path, transactions: List[Transaction], categories: List[str]):
"""Add account2 to unmapped transactions."""
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
if len(unmapped_transactions) == 0:
return
sort_categories = get_sort_categories()
sort_categories = get_sort_categories(model_path)
for t in unmapped_transactions:
sort_categories(t.row, categories)
add_account2_interactive(t, categories)
def add_account2_interactive(transaction: Transaction, categories: List[str]):
"""Interactively add account2 to a transaction."""
t = transaction
account2 = None
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
header = f"{t.account1} | {t.date} | {t.description} | {t.debit}"
logging.warning(f"No mapping for '{t}'.")
while account2 is None:
account2 = iterfzf(categories, prompt=prompt)
account2 = iterfzf(categories, header=header)
transaction.account2 = account2
print(f"Assigned category '{account2}'.")

View File

@@ -1,9 +1,9 @@
import csv
import datetime
import logging
import os
import re
import sys
from typing import Any, Dict, List
import toldg.models
import toldg.predict
@@ -13,13 +13,13 @@ from toldg.models import Config, CsvConfig, Mapping, Transaction
def process_ldg_files(config: Config):
for ldg_file in toldg.utils.get_ldg_files(config.input_directory):
with open(ldg_file, "r") as f_in:
with open(config.output_file, "a") as f_out:
f_out.write(f_in.read())
with open(config.output_file, "a") as f_out:
for ldg_file in toldg.utils.get_ldg_files(config.input_directory):
ldg_rel = os.path.relpath(ldg_file, os.path.dirname(config.output_file))
f_out.write(f'include "{ldg_rel}"\n')
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
if not cs:
logging.critical(f"No CSV config for {csv_file}.")
@@ -30,7 +30,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
return cs[0]
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
def date_to_date(date: str) -> str:
d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format)
@@ -38,7 +38,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def flip_sign(amount: str) -> str:
return amount[1:] if amount.startswith("-") else "-" + amount
def row_to_transaction(row, fields):
def row_to_transaction(idx, row, fields):
"""The user can configure the mapping of CSV fields to the three
required fields date, amount and description via the CsvConfig."""
t = {field: row[index] for index, field in fields}
@@ -52,7 +52,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
account2=toldg.models.UNKNOWN_CATEGORY,
description=t["description"],
csv_file=csv_file,
row=csv_file + ", " + ", ".join(row),
row=", ".join(row),
index=idx,
)
fields = [(i, f) for i, f in enumerate(config.fields) if f]
@@ -60,29 +61,31 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
for _ in range(config.skip):
next(reader)
transactions = [row_to_transaction(row, fields) for row in reader if row]
rows = [row for row in reader if row]
transactions = [row_to_transaction(i, row, fields) for i, row in enumerate(reversed(rows))]
return transactions
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
"""Apply mappings to transactions."""
unmapped_count = 0
for t in transactions:
if t.row in mappings:
mapping = mappings[t.row]
if t.key() in mappings:
mapping = mappings[t.key()]
assert isinstance(mapping, Mapping)
assert (
mapping.count > 0
), f"{mapping} used by {t} but count is not greater than '0'."
assert mapping.count > 0, f"{mapping} used by {t} but count is not greater than '0'."
mapping.count -= 1
t.mapping = mapping
else:
logging.warning(f"No mapping for '{t}'.")
unmapped_count += 1
if unmapped_count > 0:
logging.info(f"{unmapped_count} transactions without mappings.")
for mapping in mappings.values():
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
def process_csv_files(config: Config) -> List[Transaction]:
def process_csv_files(config: Config) -> list[Transaction]:
csv_files = toldg.utils.get_csv_files(config.input_directory)
transactions = []
for csv_file in csv_files:
@@ -92,7 +95,6 @@ def process_csv_files(config: Config) -> List[Transaction]:
mappings = toldg.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings)
toldg.predict.add_account2(transactions, config.categories)
toldg.predict.add_account2(config.model, transactions, config.categories)
toldg.utils.write_mappings(transactions, config.mappings_file)
toldg.write.render_to_file(transactions, config)
return transactions

View File

@@ -1,9 +1,13 @@
from toldg.models import Config, CsvConfig, Mapping, Transaction
import logging
from toldg.models import Config
from toldg.predict import train_classifier
from toldg.process import process_csv_files
def train(config: Config):
print("[train] start")
"""Train a transaction classifier from csv files."""
# Process transactions to get training data
transactions = process_csv_files(config)
for t in transactions:
pass
train_classifier(transactions, config.categories, config.model)
logging.info("Training completed")

View File

@@ -3,7 +3,7 @@ import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Dict, List
from pydantic import ValidationError
@@ -57,29 +57,19 @@ def category_to_bean(c: str) -> str:
return ":".join(new_sections)
def write_meta(config: Config):
with open(config.output_file, "a") as f:
for category in config.categories:
f.write(f"2017-01-01 open {category_to_bean(category)}\n")
f.write("\n")
f.write('option "operating_currency" "USD"\n\n')
def write_mappings(transactions: List[Transaction], mappings_file: Path):
"""Write transactions to the mappings file."""
mappings = read_mappings(mappings_file)
for t in transactions:
if t.row in mappings:
if t.key() in mappings:
pass
else:
mapping = Mapping(
**{
"account2": t.account2.strip(),
"narration": t.description,
}
account2=t.account2.strip(),
narration=t.description,
)
mappings[t.row] = mapping
mappings[t.key()] = mapping
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
with open(mappings_file, "w") as f:

View File

@@ -1,4 +1,3 @@
from pathlib import Path
from typing import List
from toldg.models import Config, Transaction
@@ -6,14 +5,16 @@ from toldg.utils import category_to_bean
BEANCOUNT_TRANSACTION_TEMPLATE = """
{t.date} * {description}{tags}
{t.account2:<40} {t.debit:<6} {t.currency}
{t.account1:<40} {t.credit:<6} {t.currency}
source_file: "{t.csv_file}"
source_index: {t.index}
source_row: "{t.row}"
{account2:<40} {t.debit:<6} {t.currency}
{account1:<40} {t.credit:<6} {t.currency}
"""
def format(t):
t.date = t.date.replace("/", "-")
tags = ""
description = None
if t.mapping:
@@ -40,13 +41,15 @@ def format(t):
if not t.credit.startswith("-"):
t.credit = " " + t.credit
t.account1 = category_to_bean(t.account1)
t.account2 = category_to_bean(t.account2)
if t.currency == "EUR":
t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",")
t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",")
return BEANCOUNT_TRANSACTION_TEMPLATE.format(
t=t, description=description, tags=tags
t=t,
description=description,
tags=tags,
account1=category_to_bean(t.account1),
account2=category_to_bean(t.account2),
)