Compare commits

...

5 Commits

Author SHA1 Message Date
fbab1c9174 Make fzf prompt more legible 2026-02-05 14:37:27 -05:00
54871d04cd Update ledgerai to read existing transactions from beancount file 2025-12-20 15:46:34 -05:00
f56c559c84 Include ldg files instead of appending
Do not use dcontext for precision for now. It does not seem
to be necessary.
2025-12-19 14:57:52 -05:00
a190ddc524 Add source info and sort transactions 2025-12-09 21:27:48 -05:00
12408c33f4 Implement numpy based prediction feature 2025-03-16 09:58:21 -04:00
11 changed files with 1494 additions and 1210 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,6 @@
CLAUDE.md
# ---> Python # ---> Python
uv.lock
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

2074
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -7,35 +7,28 @@ name = "toldg"
version = "0.1.0" version = "0.1.0"
description = "Tool to generate ledger files from csv" description = "Tool to generate ledger files from csv"
readme = "README.md" readme = "README.md"
requires-python = ">=3.12,<4.0" requires-python = ">=3.13,<4.0"
license = {text = "MIT"} license = {text = "MIT"}
authors = [ authors = [
{name = "Felix Martin", email = "mail@felixm.de"} {name = "Felix Martin", email = "mail@felixm.de"}
] ]
dependencies = [ dependencies = [
"fava (>=1.30.1,<2.0.0)", "fava",
"pydantic (>=2.10.6,<3.0.0)", "pydantic",
"beancount (>=3.1.0,<4.0.0)", "beancount",
"rich (>=13.9.4,<14.0.0)", "rich",
"numpy (>=2.2.3,<3.0.0)" "numpy",
"ty",
"ruff",
] ]
[tool.poetry.group.dev.dependencies]
pre-commit = "^4.1.0"
black = "^25.1.0"
isort = "^6.0.1"
pytest = "^8.3.4"
[project.scripts] [project.scripts]
toldg = "toldg.__main__:main" toldg = "toldg.__main__:main"
[tool.setuptools] [tool.setuptools]
package-dir = {"" = "src"} package-dir = {"" = "src"}
[tool.black] [tool.ruff]
line-length = 88 target-version = "py313"
target-version = ["py312"] line-length = 100
[tool.isort]
profile = "black"
line_length = 88

View File

@@ -1,11 +1,15 @@
import logging import logging
import sys import sys
import beancount
import io
from rich.logging import RichHandler from rich.logging import RichHandler
from toldg.process import process_csv_files, process_ldg_files from toldg.process import process_csv_files, process_ldg_files
from toldg.train import train from toldg.train import train
from toldg.utils import load_config, remove_if_exists, write_meta from toldg.utils import load_config, remove_if_exists
from toldg.models import Config
from toldg.write import render_to_file
def init_logging(): def init_logging():
@@ -17,16 +21,86 @@ def init_logging():
) )
def get_new_transactions(transactions: list, csv_transactions: list) -> list:
key_to_transaction = {
(transaction.meta["source_file"], transaction.meta["source_index"]): transaction
for transaction in transactions
}
assert len(transactions) == len(key_to_transaction), "Transaction keys must be unique"
new_transactions = []
for csv_transaction in csv_transactions:
key = (csv_transaction.csv_file, csv_transaction.index)
if key in key_to_transaction:
existing_transaction = key_to_transaction[key]
if existing_transaction.meta["source_row"] != csv_transaction.row:
msg = f"Consistency error: CSV transaction {csv_transaction} is different to {existing_transaction}"
logging.error(msg)
raise SystemExit(1)
else:
new_transactions.append(csv_transaction)
logging.info(f"Got {len(new_transactions)} new and {len(transactions)} existing transactions.")
return new_transactions
def update_ledger(config: Config):
def beancount_entry_to_string(entry) -> str:
buf = io.StringIO()
beancount.parser.printer.print_entry(entry, file=buf)
return buf.getvalue().strip()
def is_transaction(transaction) -> bool:
return isinstance(transaction, beancount.core.data.Transaction)
filename = config.output_file
transactions, errors, options_map = beancount.loader.load_file(filename)
if errors:
logging.error(f"errors in '{filename}'")
for err in errors:
logging.error(err)
raise SystemExit(1)
transactions.sort(key=lambda e: e.date)
# Note(felixm): Only write back transactions from the main beancount file.
# The issue is that `beancount.loader.load_file` does not allow for a full
# round trip; some of the transactions get swallowed. Therefore, treat all files
# that are not the main beancount file as input only files. This means
# these input only files can only be edited by hand, but the user can use
# them to set options for beancount and fava, and add other types of
# transactions that would otherwise disappear after the round trip. I have seen
# tickets on GitHub about changing this API so that everything can be
# written back as is, but until then, this works well for my use-case.
transactions = [e for e in transactions if e.meta["filename"] == str(filename.absolute())]
csv_transactions = process_csv_files(config)
new_transactions = get_new_transactions(transactions, csv_transactions)
remove_if_exists(config.output_file)
process_ldg_files(config)
with open(filename, "a") as f:
prev_item_was_transaction = False
for transaction in transactions:
if prev_item_was_transaction:
f.write("\n")
elif not prev_item_was_transaction and is_transaction(transaction):
f.write("\n")
f.write(beancount_entry_to_string(transaction))
f.write("\n")
prev_item_was_transaction = is_transaction(transaction)
render_to_file(new_transactions, config)
logging.info(f"Ledger file '{filename}' was written successfully.")
def main(): def main():
init_logging() init_logging()
config = load_config() config = load_config()
if len(sys.argv) > 2 and sys.argv[2] == "train": if len(sys.argv) > 2 and sys.argv[2] == "train":
train(config) train(config)
else: else:
remove_if_exists(config.output_file) update_ledger(config)
write_meta(config)
process_ldg_files(config)
process_csv_files(config)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -5,12 +5,12 @@ import sys
EXECUTABLE_NAME = "fzf.exe" if sys.platform == "win32" else "fzf" EXECUTABLE_NAME = "fzf.exe" if sys.platform == "win32" else "fzf"
def iterfzf(iterable, prompt="> "): def iterfzf(iterable, prompt="> ", header=None, height="50%"):
cmd = [EXECUTABLE_NAME, "--prompt=" + prompt] cmd = [EXECUTABLE_NAME, "--prompt=" + prompt, "--height=" + height, "--reverse"]
if header:
cmd.append("--header=" + header)
encoding = sys.getdefaultencoding() encoding = sys.getdefaultencoding()
proc = subprocess.Popen( proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None)
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None
)
if proc.stdin is None: if proc.stdin is None:
return None return None
try: try:
@@ -24,7 +24,10 @@ def iterfzf(iterable, prompt="> "):
return None return None
if proc.stdout is None: if proc.stdout is None:
return None return None
decode = lambda t: t.decode(encoding)
def decode(t):
return t.decode(encoding)
output = [decode(ln.strip(b"\r\n\0")) for ln in iter(proc.stdout.readline, b"")] output = [decode(ln.strip(b"\r\n\0")) for ln in iter(proc.stdout.readline, b"")]
try: try:
return output[0] return output[0]

View File

@@ -50,6 +50,7 @@ class Config(BaseModel):
output_file: Path = Path("output.ldg") output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig] csv_configs: List[CsvConfig]
categories: List[str] categories: List[str]
model: Path = Path("transaction_classifier.pkl")
class Mapping(BaseModel): class Mapping(BaseModel):
@@ -80,4 +81,8 @@ class Transaction(BaseModel):
description: str description: str
csv_file: str csv_file: str
row: str row: str
index: int
mapping: Optional[Mapping] = None mapping: Optional[Mapping] = None
def key(self):
return self.csv_file + ", " + self.row

View File

@@ -1,47 +1,405 @@
from typing import List import logging
import os
import pickle
import re
from collections import Counter
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
from toldg.fzf import iterfzf from toldg.fzf import iterfzf
from toldg.models import UNKNOWN_CATEGORY, Transaction from toldg.models import Transaction
def get_sort_categories(): class Tokenizer:
def sort_categories(row: str, categories: List[str]): """Simple tokenizer for transaction descriptions."""
if learn is None:
return def __init__(self, min_count: int = 2, lowercase: bool = True):
_, _, probs = learn.predict(row) self.min_count = min_count
cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist())) self.lowercase = lowercase
categories.sort( self.vocab = {} # word -> index
key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True self.inverse_vocab = {} # index -> word
def fit(self, texts: List[str]) -> None:
"""Build vocabulary from texts."""
word_counts = Counter()
for text in texts:
tokens = self._tokenize(text)
word_counts.update(tokens)
# Filter words by minimum count
filtered_words = [word for word, count in word_counts.items() if count >= self.min_count]
# Build vocabulary
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens."""
if self.lowercase:
text = text.lower()
# Simple tokenization: alphanumeric sequences
tokens = re.findall(r"\b\w+\b", text)
return tokens
def transform(self, text: str) -> Dict[int, int]:
"""Convert text to sparse vector (word index -> count)."""
tokens = self._tokenize(text)
counts = Counter()
for token in tokens:
if token in self.vocab:
counts[self.vocab[token]] += 1
return dict(counts)
def vocab_size(self) -> int:
"""Return vocabulary size."""
return len(self.vocab)
class LogisticRegression:
"""Multi-class logistic regression classifier."""
def __init__(
self,
input_dim: int,
output_dim: int,
learning_rate: float = 0.01,
reg_lambda: float = 0.01,
max_iterations: int = 1000,
):
self.input_dim = input_dim
self.output_dim = output_dim
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
self.max_iterations = max_iterations
# Initialize weights and bias
# weights shape: (output_dim, input_dim)
self.weights = np.random.randn(output_dim, input_dim) * 0.01
self.bias = np.zeros((output_dim, 1))
def softmax(self, z: np.ndarray) -> np.ndarray:
"""Compute softmax function."""
# Subtract max for numerical stability
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
def forward(self, x: np.ndarray) -> np.ndarray:
"""Forward pass: compute probabilities."""
# x shape: (input_dim, batch_size)
# output shape: (output_dim, batch_size)
z = np.dot(self.weights, x) + self.bias
return self.softmax(z)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute cross-entropy loss with L2 regularization."""
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Cross-entropy loss
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
# L2 regularization
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
return ce_loss + reg_loss
def backward(
self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for backpropagation."""
# x shape: (input_dim, batch_size)
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Gradient of loss with respect to scores (dL/dz)
dz = y_pred - y_true
# Gradient of loss with respect to weights (dL/dW)
dw = (1 / m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
# Gradient of loss with respect to bias (dL/db)
db = (1 / m) * np.sum(dz, axis=1, keepdims=True)
return dw, db
def train(
self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True
) -> List[float]:
"""Train model on batched data."""
# Convert sparse vectors to dense matrix
batch_size = len(x_batch)
x_dense = np.zeros((self.input_dim, batch_size))
for i, x_sparse in enumerate(x_batch):
for idx, val in x_sparse.items():
x_dense[idx, i] = val
# Convert labels to one-hot encoding
y_one_hot = np.zeros((self.output_dim, batch_size))
for i, y in enumerate(y_batch):
y_one_hot[y, i] = 1
losses = []
for iteration in range(self.max_iterations):
# Forward pass
y_pred = self.forward(x_dense)
# Compute loss
loss = self.compute_loss(y_pred, y_one_hot)
losses.append(loss)
if verbose and iteration % 100 == 0:
print(f"Iteration {iteration}: loss = {loss:.4f}")
# Check convergence
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
if verbose:
print(f"Converged at iteration {iteration}")
break
# Backward pass
dw, db = self.backward(x_dense, y_pred, y_one_hot)
# Update parameters
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
return losses
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
"""Predict class probabilities for a single sparse vector."""
# Convert sparse vector to dense vector
x_dense = np.zeros((self.input_dim, 1))
for idx, val in x_sparse.items():
x_dense[idx, 0] = val
# Forward pass
return self.forward(x_dense).flatten()
def predict(self, x_sparse: Dict[int, int]) -> int:
"""Predict class for a single sparse vector."""
probs = self.predict_proba(x_sparse)
return np.argmax(probs)
class TransactionClassifier:
"""Transaction classifier using Bag of Words and Logistic Regression."""
def __init__(self, model_path: Optional[Path] = None):
self.tokenizer = None
self.model = None
self.categories = []
self.category_to_idx = {}
self.idx_to_category = {}
if model_path and os.path.exists(model_path):
self.load(model_path)
def fit(
self,
transactions: List[Transaction],
categories: List[str],
verbose: bool = True,
) -> None:
"""Train classifier on transactions."""
# Extract texts and labels
texts = [t.row for t in transactions]
labels = [t.account2 for t in transactions]
# Build category mapping
self.categories = sorted(set(categories))
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
# Map labels to indices
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
# Initialize and fit tokenizer
self.tokenizer = Tokenizer(min_count=2)
self.tokenizer.fit(texts)
if verbose:
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
# Transform texts to feature vectors
x_vectors = [self.tokenizer.transform(text) for text in texts]
# Initialize and train model
self.model = LogisticRegression(
input_dim=self.tokenizer.vocab_size(),
output_dim=len(self.categories),
learning_rate=0.05,
reg_lambda=0.01,
max_iterations=3000,
) )
learn = None self.model.train(x_vectors, label_indices, verbose=verbose)
try:
from fastai.text.all import load_learner
learn = load_learner("export.pkl") def predict(self, text: str) -> Tuple[str, float, List[float]]:
except ModuleNotFoundError: """
user_input = input("No fastai module. Type yes to continue anyway.") Predict category for a transaction text.
if user_input.strip().lower() != "yes":
raise Exception("fastai module missing") Returns:
tuple: (predicted_category, confidence, all_probabilities)
"""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Get predicted class
pred_idx = np.argmax(probs)
pred_category = self.idx_to_category[pred_idx]
confidence = probs[pred_idx]
return pred_category, confidence, probs
def sort_categories(self, text: str, categories: List[str]) -> None:
"""Sort categories by prediction probability for given text."""
if not self.model or not self.tokenizer:
return
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Create mapping from category to probability
cat_to_prob = {}
for idx, prob in enumerate(probs):
if idx in self.idx_to_category:
cat = self.idx_to_category[idx]
cat_to_prob[cat] = prob
# Sort categories by probability
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
def save(self, path: Path) -> None:
"""Save model to file."""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
model_data = {
"tokenizer_vocab": self.tokenizer.vocab,
"tokenizer_inverse_vocab": self.tokenizer.inverse_vocab,
"tokenizer_min_count": self.tokenizer.min_count,
"tokenizer_lowercase": self.tokenizer.lowercase,
"model_weights": self.model.weights,
"model_bias": self.model.bias,
"categories": self.categories,
"category_to_idx": self.category_to_idx,
"idx_to_category": self.idx_to_category,
}
with open(path, "wb") as f:
pickle.dump(model_data, f)
def load(self, path: Path) -> None:
"""Load model from file."""
with open(path, "rb") as f:
model_data = pickle.load(f)
# Restore tokenizer
self.tokenizer = Tokenizer(
min_count=model_data["tokenizer_min_count"],
lowercase=model_data["tokenizer_lowercase"],
)
self.tokenizer.vocab = model_data["tokenizer_vocab"]
self.tokenizer.inverse_vocab = model_data["tokenizer_inverse_vocab"]
# Restore categories
self.categories = model_data["categories"]
self.category_to_idx = model_data["category_to_idx"]
self.idx_to_category = model_data["idx_to_category"]
# Restore model
input_dim = len(self.tokenizer.vocab)
output_dim = len(self.categories)
self.model = LogisticRegression(input_dim, output_dim)
self.model.weights = model_data["model_weights"]
self.model.bias = model_data["model_bias"]
def train_classifier(
transactions: List[Transaction],
categories: List[str],
output_path: Path = Path("transaction_classifier.pkl"),
):
"""Train transaction classifier and save to file."""
global _classifier
# Filter transactions with account2
valid_transactions = [t for t in transactions if t.account2 in categories]
if len(valid_transactions) < 10:
logging.warning("Not enough transactions for training. Need at least 10.")
return
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
# Initialize and train classifier
_classifier = TransactionClassifier()
_classifier.fit(valid_transactions, categories, verbose=True)
# Save classifier
_classifier.save(output_path)
logging.info(f"Classifier saved to {output_path}")
def get_sort_categories(model_path: Path):
"""Get function to sort categories by prediction probability."""
_classifier = None
def sort_categories(row: str, categories: List[str]):
if _classifier is None:
return
_classifier.sort_categories(row, categories)
try:
model_path = Path("transaction_classifier.pkl")
_classifier = TransactionClassifier(model_path)
if _classifier.model is None:
logging.warning("No trained model found. Categories will not be sorted.")
except Exception as e:
logging.warning(f"Error loading classifier: {e}")
logging.warning("Categories will not be sorted.")
return sort_categories return sort_categories
def add_account2(transactions: List[Transaction], categories: List[str]): def add_account2(model_path: Path, transactions: List[Transaction], categories: List[str]):
unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions)) """Add account2 to unmapped transactions."""
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
if len(unmapped_transactions) == 0: if len(unmapped_transactions) == 0:
return return
sort_categories = get_sort_categories()
sort_categories = get_sort_categories(model_path)
for t in unmapped_transactions: for t in unmapped_transactions:
sort_categories(t.row, categories) sort_categories(t.row, categories)
add_account2_interactive(t, categories) add_account2_interactive(t, categories)
def add_account2_interactive(transaction: Transaction, categories: List[str]): def add_account2_interactive(transaction: Transaction, categories: List[str]):
"""Interactively add account2 to a transaction."""
t = transaction t = transaction
account2 = None account2 = None
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > " header = f"{t.account1} | {t.date} | {t.description} | {t.debit}"
logging.warning(f"No mapping for '{t}'.")
while account2 is None: while account2 is None:
account2 = iterfzf(categories, prompt=prompt) account2 = iterfzf(categories, header=header)
transaction.account2 = account2 transaction.account2 = account2
print(f"Assigned category '{account2}'.") print(f"Assigned category '{account2}'.")

View File

@@ -1,9 +1,9 @@
import csv import csv
import datetime import datetime
import logging import logging
import os
import re import re
import sys import sys
from typing import Any, Dict, List
import toldg.models import toldg.models
import toldg.predict import toldg.predict
@@ -13,13 +13,13 @@ from toldg.models import Config, CsvConfig, Mapping, Transaction
def process_ldg_files(config: Config): def process_ldg_files(config: Config):
for ldg_file in toldg.utils.get_ldg_files(config.input_directory):
with open(ldg_file, "r") as f_in:
with open(config.output_file, "a") as f_out: with open(config.output_file, "a") as f_out:
f_out.write(f_in.read()) for ldg_file in toldg.utils.get_ldg_files(config.input_directory):
ldg_rel = os.path.relpath(ldg_file, os.path.dirname(config.output_file))
f_out.write(f'include "{ldg_rel}"\n')
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig: def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)] cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
if not cs: if not cs:
logging.critical(f"No CSV config for {csv_file}.") logging.critical(f"No CSV config for {csv_file}.")
@@ -30,7 +30,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
return cs[0] return cs[0]
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]: def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
def date_to_date(date: str) -> str: def date_to_date(date: str) -> str:
d = datetime.datetime.strptime(date, config.input_date_format) d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format) return d.strftime(config.output_date_format)
@@ -38,7 +38,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def flip_sign(amount: str) -> str: def flip_sign(amount: str) -> str:
return amount[1:] if amount.startswith("-") else "-" + amount return amount[1:] if amount.startswith("-") else "-" + amount
def row_to_transaction(row, fields): def row_to_transaction(idx, row, fields):
"""The user can configure the mapping of CSV fields to the three """The user can configure the mapping of CSV fields to the three
required fields date, amount and description via the CsvConfig.""" required fields date, amount and description via the CsvConfig."""
t = {field: row[index] for index, field in fields} t = {field: row[index] for index, field in fields}
@@ -52,7 +52,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
account2=toldg.models.UNKNOWN_CATEGORY, account2=toldg.models.UNKNOWN_CATEGORY,
description=t["description"], description=t["description"],
csv_file=csv_file, csv_file=csv_file,
row=csv_file + ", " + ", ".join(row), row=", ".join(row),
index=idx,
) )
fields = [(i, f) for i, f in enumerate(config.fields) if f] fields = [(i, f) for i, f in enumerate(config.fields) if f]
@@ -60,29 +61,31 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar) reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
for _ in range(config.skip): for _ in range(config.skip):
next(reader) next(reader)
transactions = [row_to_transaction(row, fields) for row in reader if row] rows = [row for row in reader if row]
transactions = [row_to_transaction(i, row, fields) for i, row in enumerate(reversed(rows))]
return transactions return transactions
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]): def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
"""Apply mappings to transactions.""" """Apply mappings to transactions."""
unmapped_count = 0
for t in transactions: for t in transactions:
if t.row in mappings: if t.key() in mappings:
mapping = mappings[t.row] mapping = mappings[t.key()]
assert isinstance(mapping, Mapping) assert isinstance(mapping, Mapping)
assert ( assert mapping.count > 0, f"{mapping} used by {t} but count is not greater than '0'."
mapping.count > 0
), f"{mapping} used by {t} but count is not greater than '0'."
mapping.count -= 1 mapping.count -= 1
t.mapping = mapping t.mapping = mapping
else: else:
logging.warning(f"No mapping for '{t}'.") unmapped_count += 1
if unmapped_count > 0:
logging.info(f"{unmapped_count} transactions without mappings.")
for mapping in mappings.values(): for mapping in mappings.values():
assert mapping.count == 0, f"{mapping} was not used as often as expected!" assert mapping.count == 0, f"{mapping} was not used as often as expected!"
def process_csv_files(config: Config) -> List[Transaction]: def process_csv_files(config: Config) -> list[Transaction]:
csv_files = toldg.utils.get_csv_files(config.input_directory) csv_files = toldg.utils.get_csv_files(config.input_directory)
transactions = [] transactions = []
for csv_file in csv_files: for csv_file in csv_files:
@@ -92,7 +95,6 @@ def process_csv_files(config: Config) -> List[Transaction]:
mappings = toldg.utils.read_mappings(config.mappings_file) mappings = toldg.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings) apply_mappings(transactions, mappings)
toldg.predict.add_account2(transactions, config.categories) toldg.predict.add_account2(config.model, transactions, config.categories)
toldg.utils.write_mappings(transactions, config.mappings_file) toldg.utils.write_mappings(transactions, config.mappings_file)
toldg.write.render_to_file(transactions, config)
return transactions return transactions

View File

@@ -1,9 +1,13 @@
from toldg.models import Config, CsvConfig, Mapping, Transaction import logging
from toldg.models import Config
from toldg.predict import train_classifier
from toldg.process import process_csv_files from toldg.process import process_csv_files
def train(config: Config): def train(config: Config):
print("[train] start") """Train a transaction classifier from csv files."""
# Process transactions to get training data
transactions = process_csv_files(config) transactions = process_csv_files(config)
for t in transactions: train_classifier(transactions, config.categories, config.model)
pass logging.info("Training completed")

View File

@@ -3,7 +3,7 @@ import logging
import os import os
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Dict, List
from pydantic import ValidationError from pydantic import ValidationError
@@ -57,29 +57,19 @@ def category_to_bean(c: str) -> str:
return ":".join(new_sections) return ":".join(new_sections)
def write_meta(config: Config):
with open(config.output_file, "a") as f:
for category in config.categories:
f.write(f"2017-01-01 open {category_to_bean(category)}\n")
f.write("\n")
f.write('option "operating_currency" "USD"\n\n')
def write_mappings(transactions: List[Transaction], mappings_file: Path): def write_mappings(transactions: List[Transaction], mappings_file: Path):
"""Write transactions to the mappings file.""" """Write transactions to the mappings file."""
mappings = read_mappings(mappings_file) mappings = read_mappings(mappings_file)
for t in transactions: for t in transactions:
if t.row in mappings: if t.key() in mappings:
pass pass
else: else:
mapping = Mapping( mapping = Mapping(
**{ account2=t.account2.strip(),
"account2": t.account2.strip(), narration=t.description,
"narration": t.description,
}
) )
mappings[t.row] = mapping mappings[t.key()] = mapping
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()} mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
with open(mappings_file, "w") as f: with open(mappings_file, "w") as f:

View File

@@ -1,4 +1,3 @@
from pathlib import Path
from typing import List from typing import List
from toldg.models import Config, Transaction from toldg.models import Config, Transaction
@@ -6,14 +5,16 @@ from toldg.utils import category_to_bean
BEANCOUNT_TRANSACTION_TEMPLATE = """ BEANCOUNT_TRANSACTION_TEMPLATE = """
{t.date} * {description}{tags} {t.date} * {description}{tags}
{t.account2:<40} {t.debit:<6} {t.currency} source_file: "{t.csv_file}"
{t.account1:<40} {t.credit:<6} {t.currency} source_index: {t.index}
source_row: "{t.row}"
{account2:<40} {t.debit:<6} {t.currency}
{account1:<40} {t.credit:<6} {t.currency}
""" """
def format(t): def format(t):
t.date = t.date.replace("/", "-") t.date = t.date.replace("/", "-")
tags = "" tags = ""
description = None description = None
if t.mapping: if t.mapping:
@@ -40,13 +41,15 @@ def format(t):
if not t.credit.startswith("-"): if not t.credit.startswith("-"):
t.credit = " " + t.credit t.credit = " " + t.credit
t.account1 = category_to_bean(t.account1)
t.account2 = category_to_bean(t.account2)
if t.currency == "EUR": if t.currency == "EUR":
t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",") t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",")
t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",") t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",")
return BEANCOUNT_TRANSACTION_TEMPLATE.format( return BEANCOUNT_TRANSACTION_TEMPLATE.format(
t=t, description=description, tags=tags t=t,
description=description,
tags=tags,
account1=category_to_bean(t.account1),
account2=category_to_bean(t.account2),
) )