Compare commits

...

2 Commits

Author SHA1 Message Date
a190ddc524 Add source info and sort transactions 2025-12-09 21:27:48 -05:00
12408c33f4 Implement numpy based prediction feature 2025-03-16 09:58:21 -04:00
8 changed files with 1544 additions and 963 deletions

2003
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,7 @@
import logging import logging
import sys import sys
import beancount
import io
from rich.logging import RichHandler from rich.logging import RichHandler
@@ -17,6 +19,38 @@ def init_logging():
) )
def load_and_write_back(filename):
entries, errors, options_map = beancount.loader.load_file(filename)
def beancount_entry_to_string(entry) -> str:
buf = io.StringIO()
# beancount.parser.printer.print_entry(entry, dcontext=options_map['dcontext'], file=buf)
beancount.parser.printer.print_entry(entry, file=buf)
return buf.getvalue().strip()
def is_transaction(entry) -> bool:
return isinstance(entry, beancount.core.data.Transaction)
prev_entry_was_transaction = False
if errors:
print(f"errors in generated '{filename}'")
for err in errors:
print(err)
else:
entries.sort(key=lambda e: e.date)
with open(filename, "w") as f:
f.write('option "operating_currency" "USD"\n')
for entry in entries:
if prev_entry_was_transaction:
f.write("\n")
elif not prev_entry_was_transaction and is_transaction(entry):
f.write("\n")
f.write(beancount_entry_to_string(entry))
f.write("\n")
prev_entry_was_transaction = is_transaction(entry)
def main(): def main():
init_logging() init_logging()
config = load_config() config = load_config()
@@ -27,6 +61,7 @@ def main():
write_meta(config) write_meta(config)
process_ldg_files(config) process_ldg_files(config)
process_csv_files(config) process_csv_files(config)
load_and_write_back(config.output_file)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -50,6 +50,7 @@ class Config(BaseModel):
output_file: Path = Path("output.ldg") output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig] csv_configs: List[CsvConfig]
categories: List[str] categories: List[str]
model: Path = Path("transaction_classifier.pkl")
class Mapping(BaseModel): class Mapping(BaseModel):
@@ -80,4 +81,9 @@ class Transaction(BaseModel):
description: str description: str
csv_file: str csv_file: str
row: str row: str
index: int
mapping: Optional[Mapping] = None mapping: Optional[Mapping] = None
def key(self):
return self.csv_file + ", " + self.row

View File

@@ -1,43 +1,406 @@
from typing import List import json
import logging
import os
import pickle
import re
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
import numpy as np
from toldg.fzf import iterfzf from toldg.fzf import iterfzf
from toldg.models import UNKNOWN_CATEGORY, Transaction from toldg.models import Transaction
def get_sort_categories(): class Tokenizer:
def sort_categories(row: str, categories: List[str]): """Simple tokenizer for transaction descriptions."""
if learn is None:
return def __init__(self, min_count: int = 2, lowercase: bool = True):
_, _, probs = learn.predict(row) self.min_count = min_count
cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist())) self.lowercase = lowercase
categories.sort( self.vocab = {} # word -> index
key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True self.inverse_vocab = {} # index -> word
def fit(self, texts: List[str]) -> None:
"""Build vocabulary from texts."""
word_counts = Counter()
for text in texts:
tokens = self._tokenize(text)
word_counts.update(tokens)
# Filter words by minimum count
filtered_words = [
word for word, count in word_counts.items() if count >= self.min_count
]
# Build vocabulary
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens."""
if self.lowercase:
text = text.lower()
# Simple tokenization: alphanumeric sequences
tokens = re.findall(r"\b\w+\b", text)
return tokens
def transform(self, text: str) -> Dict[int, int]:
"""Convert text to sparse vector (word index -> count)."""
tokens = self._tokenize(text)
counts = Counter()
for token in tokens:
if token in self.vocab:
counts[self.vocab[token]] += 1
return dict(counts)
def vocab_size(self) -> int:
"""Return vocabulary size."""
return len(self.vocab)
class LogisticRegression:
"""Multi-class logistic regression classifier."""
def __init__(
self,
input_dim: int,
output_dim: int,
learning_rate: float = 0.01,
reg_lambda: float = 0.01,
max_iterations: int = 1000,
):
self.input_dim = input_dim
self.output_dim = output_dim
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
self.max_iterations = max_iterations
# Initialize weights and bias
# weights shape: (output_dim, input_dim)
self.weights = np.random.randn(output_dim, input_dim) * 0.01
self.bias = np.zeros((output_dim, 1))
def softmax(self, z: np.ndarray) -> np.ndarray:
"""Compute softmax function."""
# Subtract max for numerical stability
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
def forward(self, x: np.ndarray) -> np.ndarray:
"""Forward pass: compute probabilities."""
# x shape: (input_dim, batch_size)
# output shape: (output_dim, batch_size)
z = np.dot(self.weights, x) + self.bias
return self.softmax(z)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute cross-entropy loss with L2 regularization."""
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Cross-entropy loss
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
# L2 regularization
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
return ce_loss + reg_loss
def backward(
self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for backpropagation."""
# x shape: (input_dim, batch_size)
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Gradient of loss with respect to scores (dL/dz)
dz = y_pred - y_true
# Gradient of loss with respect to weights (dL/dW)
dw = (1 / m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
# Gradient of loss with respect to bias (dL/db)
db = (1 / m) * np.sum(dz, axis=1, keepdims=True)
return dw, db
def train(
self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True
) -> List[float]:
"""Train model on batched data."""
# Convert sparse vectors to dense matrix
batch_size = len(x_batch)
x_dense = np.zeros((self.input_dim, batch_size))
for i, x_sparse in enumerate(x_batch):
for idx, val in x_sparse.items():
x_dense[idx, i] = val
# Convert labels to one-hot encoding
y_one_hot = np.zeros((self.output_dim, batch_size))
for i, y in enumerate(y_batch):
y_one_hot[y, i] = 1
losses = []
for iteration in range(self.max_iterations):
# Forward pass
y_pred = self.forward(x_dense)
# Compute loss
loss = self.compute_loss(y_pred, y_one_hot)
losses.append(loss)
if verbose and iteration % 100 == 0:
print(f"Iteration {iteration}: loss = {loss:.4f}")
# Check convergence
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
if verbose:
print(f"Converged at iteration {iteration}")
break
# Backward pass
dw, db = self.backward(x_dense, y_pred, y_one_hot)
# Update parameters
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
return losses
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
"""Predict class probabilities for a single sparse vector."""
# Convert sparse vector to dense vector
x_dense = np.zeros((self.input_dim, 1))
for idx, val in x_sparse.items():
x_dense[idx, 0] = val
# Forward pass
return self.forward(x_dense).flatten()
def predict(self, x_sparse: Dict[int, int]) -> int:
"""Predict class for a single sparse vector."""
probs = self.predict_proba(x_sparse)
return np.argmax(probs)
class TransactionClassifier:
"""Transaction classifier using Bag of Words and Logistic Regression."""
def __init__(self, model_path: Optional[Path] = None):
self.tokenizer = None
self.model = None
self.categories = []
self.category_to_idx = {}
self.idx_to_category = {}
if model_path and os.path.exists(model_path):
self.load(model_path)
def fit(
self,
transactions: List[Transaction],
categories: List[str],
verbose: bool = True,
) -> None:
"""Train classifier on transactions."""
# Extract texts and labels
texts = [t.row for t in transactions]
labels = [t.account2 for t in transactions]
# Build category mapping
self.categories = sorted(set(categories))
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
# Map labels to indices
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
# Initialize and fit tokenizer
self.tokenizer = Tokenizer(min_count=2)
self.tokenizer.fit(texts)
if verbose:
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
# Transform texts to feature vectors
x_vectors = [self.tokenizer.transform(text) for text in texts]
# Initialize and train model
self.model = LogisticRegression(
input_dim=self.tokenizer.vocab_size(),
output_dim=len(self.categories),
learning_rate=0.05,
reg_lambda=0.01,
max_iterations=3000,
) )
learn = None self.model.train(x_vectors, label_indices, verbose=verbose)
try:
from fastai.text.all import load_learner
learn = load_learner("export.pkl") def predict(self, text: str) -> Tuple[str, float, List[float]]:
except ModuleNotFoundError: """
user_input = input("No fastai module. Type yes to continue anyway.") Predict category for a transaction text.
if user_input.strip().lower() != "yes":
raise Exception("fastai module missing") Returns:
tuple: (predicted_category, confidence, all_probabilities)
"""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Get predicted class
pred_idx = np.argmax(probs)
pred_category = self.idx_to_category[pred_idx]
confidence = probs[pred_idx]
return pred_category, confidence, probs
def sort_categories(self, text: str, categories: List[str]) -> None:
"""Sort categories by prediction probability for given text."""
if not self.model or not self.tokenizer:
return
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Create mapping from category to probability
cat_to_prob = {}
for idx, prob in enumerate(probs):
if idx in self.idx_to_category:
cat = self.idx_to_category[idx]
cat_to_prob[cat] = prob
# Sort categories by probability
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
def save(self, path: Path) -> None:
"""Save model to file."""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
model_data = {
"tokenizer_vocab": self.tokenizer.vocab,
"tokenizer_inverse_vocab": self.tokenizer.inverse_vocab,
"tokenizer_min_count": self.tokenizer.min_count,
"tokenizer_lowercase": self.tokenizer.lowercase,
"model_weights": self.model.weights,
"model_bias": self.model.bias,
"categories": self.categories,
"category_to_idx": self.category_to_idx,
"idx_to_category": self.idx_to_category,
}
with open(path, "wb") as f:
pickle.dump(model_data, f)
def load(self, path: Path) -> None:
"""Load model from file."""
with open(path, "rb") as f:
model_data = pickle.load(f)
# Restore tokenizer
self.tokenizer = Tokenizer(
min_count=model_data["tokenizer_min_count"],
lowercase=model_data["tokenizer_lowercase"],
)
self.tokenizer.vocab = model_data["tokenizer_vocab"]
self.tokenizer.inverse_vocab = model_data["tokenizer_inverse_vocab"]
# Restore categories
self.categories = model_data["categories"]
self.category_to_idx = model_data["category_to_idx"]
self.idx_to_category = model_data["idx_to_category"]
# Restore model
input_dim = len(self.tokenizer.vocab)
output_dim = len(self.categories)
self.model = LogisticRegression(input_dim, output_dim)
self.model.weights = model_data["model_weights"]
self.model.bias = model_data["model_bias"]
def train_classifier(
transactions: List[Transaction],
categories: List[str],
output_path: Path = Path("transaction_classifier.pkl"),
):
"""Train transaction classifier and save to file."""
global _classifier
# Filter transactions with account2
valid_transactions = [t for t in transactions if t.account2 in categories]
if len(valid_transactions) < 10:
logging.warning("Not enough transactions for training. Need at least 10.")
return
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
# Initialize and train classifier
_classifier = TransactionClassifier()
_classifier.fit(valid_transactions, categories, verbose=True)
# Save classifier
_classifier.save(output_path)
logging.info(f"Classifier saved to {output_path}")
def get_sort_categories(model_path: Path):
"""Get function to sort categories by prediction probability."""
_classifier = None
def sort_categories(row: str, categories: List[str]):
if _classifier is None:
return
_classifier.sort_categories(row, categories)
try:
model_path = Path("transaction_classifier.pkl")
_classifier = TransactionClassifier(model_path)
if _classifier.model is None:
logging.warning("No trained model found. Categories will not be sorted.")
except Exception as e:
logging.warning(f"Error loading classifier: {e}")
logging.warning("Categories will not be sorted.")
return sort_categories return sort_categories
def add_account2(transactions: List[Transaction], categories: List[str]): def add_account2(
unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions)) model_path: Path, transactions: List[Transaction], categories: List[str]
):
"""Add account2 to unmapped transactions."""
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
if len(unmapped_transactions) == 0: if len(unmapped_transactions) == 0:
return return
sort_categories = get_sort_categories()
sort_categories = get_sort_categories(model_path)
for t in unmapped_transactions: for t in unmapped_transactions:
sort_categories(t.row, categories) sort_categories(t.row, categories)
add_account2_interactive(t, categories) add_account2_interactive(t, categories)
def add_account2_interactive(transaction: Transaction, categories: List[str]): def add_account2_interactive(transaction: Transaction, categories: List[str]):
"""Interactively add account2 to a transaction."""
t = transaction t = transaction
account2 = None account2 = None
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > " prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "

View File

@@ -3,7 +3,6 @@ import datetime
import logging import logging
import re import re
import sys import sys
from typing import Any, Dict, List
import toldg.models import toldg.models
import toldg.predict import toldg.predict
@@ -19,7 +18,7 @@ def process_ldg_files(config: Config):
f_out.write(f_in.read()) f_out.write(f_in.read())
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig: def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)] cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
if not cs: if not cs:
logging.critical(f"No CSV config for {csv_file}.") logging.critical(f"No CSV config for {csv_file}.")
@@ -30,7 +29,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
return cs[0] return cs[0]
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]: def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
def date_to_date(date: str) -> str: def date_to_date(date: str) -> str:
d = datetime.datetime.strptime(date, config.input_date_format) d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format) return d.strftime(config.output_date_format)
@@ -38,7 +37,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def flip_sign(amount: str) -> str: def flip_sign(amount: str) -> str:
return amount[1:] if amount.startswith("-") else "-" + amount return amount[1:] if amount.startswith("-") else "-" + amount
def row_to_transaction(row, fields): def row_to_transaction(idx, row, fields):
"""The user can configure the mapping of CSV fields to the three """The user can configure the mapping of CSV fields to the three
required fields date, amount and description via the CsvConfig.""" required fields date, amount and description via the CsvConfig."""
t = {field: row[index] for index, field in fields} t = {field: row[index] for index, field in fields}
@@ -52,7 +51,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
account2=toldg.models.UNKNOWN_CATEGORY, account2=toldg.models.UNKNOWN_CATEGORY,
description=t["description"], description=t["description"],
csv_file=csv_file, csv_file=csv_file,
row=csv_file + ", " + ", ".join(row), row=", ".join(row),
index=idx,
) )
fields = [(i, f) for i, f in enumerate(config.fields) if f] fields = [(i, f) for i, f in enumerate(config.fields) if f]
@@ -60,15 +60,17 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar) reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
for _ in range(config.skip): for _ in range(config.skip):
next(reader) next(reader)
transactions = [row_to_transaction(row, fields) for row in reader if row] rows = [row for row in reader if row]
transactions = [row_to_transaction(i, row, fields)
for i, row in enumerate(reversed(rows))]
return transactions return transactions
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]): def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
"""Apply mappings to transactions.""" """Apply mappings to transactions."""
for t in transactions: for t in transactions:
if t.row in mappings: if t.key() in mappings:
mapping = mappings[t.row] mapping = mappings[t.key()]
assert isinstance(mapping, Mapping) assert isinstance(mapping, Mapping)
assert ( assert (
mapping.count > 0 mapping.count > 0
@@ -82,7 +84,7 @@ def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]
assert mapping.count == 0, f"{mapping} was not used as often as expected!" assert mapping.count == 0, f"{mapping} was not used as often as expected!"
def process_csv_files(config: Config) -> List[Transaction]: def process_csv_files(config: Config) -> list[Transaction]:
csv_files = toldg.utils.get_csv_files(config.input_directory) csv_files = toldg.utils.get_csv_files(config.input_directory)
transactions = [] transactions = []
for csv_file in csv_files: for csv_file in csv_files:
@@ -92,7 +94,7 @@ def process_csv_files(config: Config) -> List[Transaction]:
mappings = toldg.utils.read_mappings(config.mappings_file) mappings = toldg.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings) apply_mappings(transactions, mappings)
toldg.predict.add_account2(transactions, config.categories) toldg.predict.add_account2(config.model, transactions, config.categories)
toldg.utils.write_mappings(transactions, config.mappings_file) toldg.utils.write_mappings(transactions, config.mappings_file)
toldg.write.render_to_file(transactions, config) toldg.write.render_to_file(transactions, config)
return transactions return transactions

View File

@@ -1,9 +1,14 @@
from toldg.models import Config, CsvConfig, Mapping, Transaction import logging
from pathlib import Path
from toldg.models import Config
from toldg.predict import train_classifier
from toldg.process import process_csv_files from toldg.process import process_csv_files
def train(config: Config): def train(config: Config):
print("[train] start") """Train a transaction classifier from csv files."""
# Process transactions to get training data
transactions = process_csv_files(config) transactions = process_csv_files(config)
for t in transactions: train_classifier(transactions, config.categories, config.model)
pass logging.info("Training completed")

View File

@@ -70,7 +70,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
mappings = read_mappings(mappings_file) mappings = read_mappings(mappings_file)
for t in transactions: for t in transactions:
if t.row in mappings: if t.key() in mappings:
pass pass
else: else:
mapping = Mapping( mapping = Mapping(
@@ -79,7 +79,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
"narration": t.description, "narration": t.description,
} }
) )
mappings[t.row] = mapping mappings[t.key()] = mapping
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()} mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
with open(mappings_file, "w") as f: with open(mappings_file, "w") as f:

View File

@@ -6,14 +6,16 @@ from toldg.utils import category_to_bean
BEANCOUNT_TRANSACTION_TEMPLATE = """ BEANCOUNT_TRANSACTION_TEMPLATE = """
{t.date} * {description}{tags} {t.date} * {description}{tags}
{t.account2:<40} {t.debit:<6} {t.currency} {account2:<40} {t.debit:<6} {t.currency}
{t.account1:<40} {t.credit:<6} {t.currency} {account1:<40} {t.credit:<6} {t.currency}
source_file: "{t.csv_file}"
source_index: {t.index}
source_row: "{t.row}"
""" """
def format(t): def format(t):
t.date = t.date.replace("/", "-") t.date = t.date.replace("/", "-")
tags = "" tags = ""
description = None description = None
if t.mapping: if t.mapping:
@@ -40,13 +42,15 @@ def format(t):
if not t.credit.startswith("-"): if not t.credit.startswith("-"):
t.credit = " " + t.credit t.credit = " " + t.credit
t.account1 = category_to_bean(t.account1)
t.account2 = category_to_bean(t.account2)
if t.currency == "EUR": if t.currency == "EUR":
t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",") t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",")
t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",") t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",")
return BEANCOUNT_TRANSACTION_TEMPLATE.format( return BEANCOUNT_TRANSACTION_TEMPLATE.format(
t=t, description=description, tags=tags t=t,
description=description,
tags=tags,
account1=category_to_bean(t.account1),
account2=category_to_bean(t.account2),
) )
@@ -54,3 +58,4 @@ def render_to_file(transactions: List[Transaction], config: Config):
content = "".join(format(t) for t in transactions) content = "".join(format(t) for t in transactions)
with open(config.output_file, "a") as f: with open(config.output_file, "a") as f:
f.write(content) f.write(content)