Compare commits

..

2 Commits

Author SHA1 Message Date
a190ddc524 Add source info and sort transactions 2025-12-09 21:27:48 -05:00
12408c33f4 Implement numpy based prediction feature 2025-03-16 09:58:21 -04:00
9 changed files with 1502 additions and 1331 deletions

2003
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,7 @@
import logging
import sys
import beancount
import io
from rich.logging import RichHandler
@@ -17,6 +19,38 @@ def init_logging():
)
def load_and_write_back(filename):
entries, errors, options_map = beancount.loader.load_file(filename)
def beancount_entry_to_string(entry) -> str:
buf = io.StringIO()
# beancount.parser.printer.print_entry(entry, dcontext=options_map['dcontext'], file=buf)
beancount.parser.printer.print_entry(entry, file=buf)
return buf.getvalue().strip()
def is_transaction(entry) -> bool:
return isinstance(entry, beancount.core.data.Transaction)
prev_entry_was_transaction = False
if errors:
print(f"errors in generated '{filename}'")
for err in errors:
print(err)
else:
entries.sort(key=lambda e: e.date)
with open(filename, "w") as f:
f.write('option "operating_currency" "USD"\n')
for entry in entries:
if prev_entry_was_transaction:
f.write("\n")
elif not prev_entry_was_transaction and is_transaction(entry):
f.write("\n")
f.write(beancount_entry_to_string(entry))
f.write("\n")
prev_entry_was_transaction = is_transaction(entry)
def main():
init_logging()
config = load_config()
@@ -27,6 +61,7 @@ def main():
write_meta(config)
process_ldg_files(config)
process_csv_files(config)
load_and_write_back(config.output_file)
if __name__ == "__main__":

View File

@@ -1,393 +0,0 @@
import json
import logging
import re
import pickle
import os
from collections import Counter, defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set, Any
import numpy as np
from toldg.fzf import iterfzf
from toldg.models import Transaction
class Tokenizer:
"""Simple tokenizer for transaction descriptions."""
def __init__(self, min_count: int = 2, lowercase: bool = True):
self.min_count = min_count
self.lowercase = lowercase
self.vocab = {} # word -> index
self.inverse_vocab = {} # index -> word
def fit(self, texts: List[str]) -> None:
"""Build vocabulary from texts."""
word_counts = Counter()
for text in texts:
tokens = self._tokenize(text)
word_counts.update(tokens)
# Filter words by minimum count
filtered_words = [word for word, count in word_counts.items()
if count >= self.min_count]
# Build vocabulary
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens."""
if self.lowercase:
text = text.lower()
# Simple tokenization: alphanumeric sequences
tokens = re.findall(r'\b\w+\b', text)
return tokens
def transform(self, text: str) -> Dict[int, int]:
"""Convert text to sparse vector (word index -> count)."""
tokens = self._tokenize(text)
counts = Counter()
for token in tokens:
if token in self.vocab:
counts[self.vocab[token]] += 1
return dict(counts)
def vocab_size(self) -> int:
"""Return vocabulary size."""
return len(self.vocab)
class LogisticRegression:
"""Multi-class logistic regression classifier."""
def __init__(self, input_dim: int, output_dim: int,
learning_rate: float = 0.01,
reg_lambda: float = 0.01,
max_iterations: int = 1000):
self.input_dim = input_dim
self.output_dim = output_dim
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
self.max_iterations = max_iterations
# Initialize weights and bias
# weights shape: (output_dim, input_dim)
self.weights = np.random.randn(output_dim, input_dim) * 0.01
self.bias = np.zeros((output_dim, 1))
def softmax(self, z: np.ndarray) -> np.ndarray:
"""Compute softmax function."""
# Subtract max for numerical stability
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
def forward(self, x: np.ndarray) -> np.ndarray:
"""Forward pass: compute probabilities."""
# x shape: (input_dim, batch_size)
# output shape: (output_dim, batch_size)
z = np.dot(self.weights, x) + self.bias
return self.softmax(z)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute cross-entropy loss with L2 regularization."""
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Cross-entropy loss
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
# L2 regularization
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
return ce_loss + reg_loss
def backward(self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for backpropagation."""
# x shape: (input_dim, batch_size)
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Gradient of loss with respect to scores (dL/dz)
dz = y_pred - y_true
# Gradient of loss with respect to weights (dL/dW)
dw = (1/m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
# Gradient of loss with respect to bias (dL/db)
db = (1/m) * np.sum(dz, axis=1, keepdims=True)
return dw, db
def train(self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True) -> List[float]:
"""Train model on batched data."""
# Convert sparse vectors to dense matrix
batch_size = len(x_batch)
x_dense = np.zeros((self.input_dim, batch_size))
for i, x_sparse in enumerate(x_batch):
for idx, val in x_sparse.items():
x_dense[idx, i] = val
# Convert labels to one-hot encoding
y_one_hot = np.zeros((self.output_dim, batch_size))
for i, y in enumerate(y_batch):
y_one_hot[y, i] = 1
losses = []
for iteration in range(self.max_iterations):
# Forward pass
y_pred = self.forward(x_dense)
# Compute loss
loss = self.compute_loss(y_pred, y_one_hot)
losses.append(loss)
if verbose and iteration % 100 == 0:
print(f"Iteration {iteration}: loss = {loss:.4f}")
# Check convergence
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
if verbose:
print(f"Converged at iteration {iteration}")
break
# Backward pass
dw, db = self.backward(x_dense, y_pred, y_one_hot)
# Update parameters
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
return losses
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
"""Predict class probabilities for a single sparse vector."""
# Convert sparse vector to dense vector
x_dense = np.zeros((self.input_dim, 1))
for idx, val in x_sparse.items():
x_dense[idx, 0] = val
# Forward pass
return self.forward(x_dense).flatten()
def predict(self, x_sparse: Dict[int, int]) -> int:
"""Predict class for a single sparse vector."""
probs = self.predict_proba(x_sparse)
return np.argmax(probs)
class TransactionClassifier:
"""Transaction classifier using Bag of Words and Logistic Regression."""
def __init__(self, model_path: Optional[Path] = None):
self.tokenizer = None
self.model = None
self.categories = []
self.category_to_idx = {}
self.idx_to_category = {}
if model_path and os.path.exists(model_path):
self.load(model_path)
def fit(self, transactions: List[Transaction], categories: List[str], verbose: bool = True) -> None:
"""Train classifier on transactions."""
# Extract texts and labels
texts = [t.row for t in transactions]
labels = [t.account2 for t in transactions]
# Build category mapping
self.categories = sorted(set(categories))
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
# Map labels to indices
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
# Initialize and fit tokenizer
self.tokenizer = Tokenizer(min_count=2)
self.tokenizer.fit(texts)
if verbose:
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
# Transform texts to feature vectors
x_vectors = [self.tokenizer.transform(text) for text in texts]
# Initialize and train model
self.model = LogisticRegression(
input_dim=self.tokenizer.vocab_size(),
output_dim=len(self.categories),
learning_rate=0.05,
reg_lambda=0.01,
max_iterations=2000
)
self.model.train(x_vectors, label_indices, verbose=verbose)
def predict(self, text: str) -> Tuple[str, float, List[float]]:
"""
Predict category for a transaction text.
Returns:
tuple: (predicted_category, confidence, all_probabilities)
"""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Get predicted class
pred_idx = np.argmax(probs)
pred_category = self.idx_to_category[pred_idx]
confidence = probs[pred_idx]
return pred_category, confidence, probs
def sort_categories(self, text: str, categories: List[str]) -> None:
"""Sort categories by prediction probability for given text."""
if not self.model or not self.tokenizer:
return
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Create mapping from category to probability
cat_to_prob = {}
for idx, prob in enumerate(probs):
if idx in self.idx_to_category:
cat = self.idx_to_category[idx]
cat_to_prob[cat] = prob
# Sort categories by probability
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
def save(self, path: Path) -> None:
"""Save model to file."""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
model_data = {
'tokenizer_vocab': self.tokenizer.vocab,
'tokenizer_inverse_vocab': self.tokenizer.inverse_vocab,
'tokenizer_min_count': self.tokenizer.min_count,
'tokenizer_lowercase': self.tokenizer.lowercase,
'model_weights': self.model.weights,
'model_bias': self.model.bias,
'categories': self.categories,
'category_to_idx': self.category_to_idx,
'idx_to_category': self.idx_to_category
}
with open(path, 'wb') as f:
pickle.dump(model_data, f)
def load(self, path: Path) -> None:
"""Load model from file."""
with open(path, 'rb') as f:
model_data = pickle.load(f)
# Restore tokenizer
self.tokenizer = Tokenizer(
min_count=model_data['tokenizer_min_count'],
lowercase=model_data['tokenizer_lowercase']
)
self.tokenizer.vocab = model_data['tokenizer_vocab']
self.tokenizer.inverse_vocab = model_data['tokenizer_inverse_vocab']
# Restore categories
self.categories = model_data['categories']
self.category_to_idx = model_data['category_to_idx']
self.idx_to_category = model_data['idx_to_category']
# Restore model
input_dim = len(self.tokenizer.vocab)
output_dim = len(self.categories)
self.model = LogisticRegression(input_dim, output_dim)
self.model.weights = model_data['model_weights']
self.model.bias = model_data['model_bias']
# Global classifier instance
_classifier = None
def get_sort_categories():
"""Get function to sort categories by prediction probability."""
global _classifier
def sort_categories(row: str, categories: List[str]):
if _classifier is None:
return
_classifier.sort_categories(row, categories)
try:
model_path = Path("transaction_classifier.pkl")
_classifier = TransactionClassifier(model_path)
if _classifier.model is None:
logging.warning("No trained model found. Categories will not be sorted.")
except Exception as e:
logging.warning(f"Error loading classifier: {e}")
logging.warning("Categories will not be sorted.")
return sort_categories
def add_account2(transactions: List[Transaction], categories: List[str]):
"""Add account2 to unmapped transactions."""
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
if len(unmapped_transactions) == 0:
return
sort_categories = get_sort_categories()
for t in unmapped_transactions:
sort_categories(t.row, categories)
add_account2_interactive(t, categories)
def add_account2_interactive(transaction: Transaction, categories: List[str]):
"""Interactively add account2 to a transaction."""
t = transaction
account2 = None
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
while account2 is None:
account2 = iterfzf(categories, prompt=prompt)
transaction.account2 = account2
print(f"Assigned category '{account2}'.")
def train_classifier(transactions: List[Transaction], categories: List[str], output_path: Path = Path("transaction_classifier.pkl")):
"""Train transaction classifier and save to file."""
global _classifier
# Filter transactions with account2
valid_transactions = [t for t in transactions if t.account2 in categories]
if len(valid_transactions) < 10:
logging.warning("Not enough transactions for training. Need at least 10.")
return
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
# Initialize and train classifier
_classifier = TransactionClassifier()
_classifier.fit(valid_transactions, categories, verbose=True)
# Save classifier
_classifier.save(output_path)
logging.info(f"Classifier saved to {output_path}")

View File

@@ -81,4 +81,9 @@ class Transaction(BaseModel):
description: str
csv_file: str
row: str
index: int
mapping: Optional[Mapping] = None
def key(self):
return self.csv_file + ", " + self.row

View File

@@ -1,12 +1,368 @@
import json
import logging
import os
import pickle
import re
from collections import Counter, defaultdict
from pathlib import Path
from typing import List
from typing import Any, Dict, List, Optional, Set, Tuple
import numpy as np
from toldg.fzf import iterfzf
from toldg.ml_predict import TransactionClassifier
from toldg.models import Transaction
class Tokenizer:
"""Simple tokenizer for transaction descriptions."""
def __init__(self, min_count: int = 2, lowercase: bool = True):
self.min_count = min_count
self.lowercase = lowercase
self.vocab = {} # word -> index
self.inverse_vocab = {} # index -> word
def fit(self, texts: List[str]) -> None:
"""Build vocabulary from texts."""
word_counts = Counter()
for text in texts:
tokens = self._tokenize(text)
word_counts.update(tokens)
# Filter words by minimum count
filtered_words = [
word for word, count in word_counts.items() if count >= self.min_count
]
# Build vocabulary
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens."""
if self.lowercase:
text = text.lower()
# Simple tokenization: alphanumeric sequences
tokens = re.findall(r"\b\w+\b", text)
return tokens
def transform(self, text: str) -> Dict[int, int]:
"""Convert text to sparse vector (word index -> count)."""
tokens = self._tokenize(text)
counts = Counter()
for token in tokens:
if token in self.vocab:
counts[self.vocab[token]] += 1
return dict(counts)
def vocab_size(self) -> int:
"""Return vocabulary size."""
return len(self.vocab)
class LogisticRegression:
"""Multi-class logistic regression classifier."""
def __init__(
self,
input_dim: int,
output_dim: int,
learning_rate: float = 0.01,
reg_lambda: float = 0.01,
max_iterations: int = 1000,
):
self.input_dim = input_dim
self.output_dim = output_dim
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
self.max_iterations = max_iterations
# Initialize weights and bias
# weights shape: (output_dim, input_dim)
self.weights = np.random.randn(output_dim, input_dim) * 0.01
self.bias = np.zeros((output_dim, 1))
def softmax(self, z: np.ndarray) -> np.ndarray:
"""Compute softmax function."""
# Subtract max for numerical stability
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
def forward(self, x: np.ndarray) -> np.ndarray:
"""Forward pass: compute probabilities."""
# x shape: (input_dim, batch_size)
# output shape: (output_dim, batch_size)
z = np.dot(self.weights, x) + self.bias
return self.softmax(z)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute cross-entropy loss with L2 regularization."""
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Cross-entropy loss
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
# L2 regularization
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
return ce_loss + reg_loss
def backward(
self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for backpropagation."""
# x shape: (input_dim, batch_size)
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Gradient of loss with respect to scores (dL/dz)
dz = y_pred - y_true
# Gradient of loss with respect to weights (dL/dW)
dw = (1 / m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
# Gradient of loss with respect to bias (dL/db)
db = (1 / m) * np.sum(dz, axis=1, keepdims=True)
return dw, db
def train(
self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True
) -> List[float]:
"""Train model on batched data."""
# Convert sparse vectors to dense matrix
batch_size = len(x_batch)
x_dense = np.zeros((self.input_dim, batch_size))
for i, x_sparse in enumerate(x_batch):
for idx, val in x_sparse.items():
x_dense[idx, i] = val
# Convert labels to one-hot encoding
y_one_hot = np.zeros((self.output_dim, batch_size))
for i, y in enumerate(y_batch):
y_one_hot[y, i] = 1
losses = []
for iteration in range(self.max_iterations):
# Forward pass
y_pred = self.forward(x_dense)
# Compute loss
loss = self.compute_loss(y_pred, y_one_hot)
losses.append(loss)
if verbose and iteration % 100 == 0:
print(f"Iteration {iteration}: loss = {loss:.4f}")
# Check convergence
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
if verbose:
print(f"Converged at iteration {iteration}")
break
# Backward pass
dw, db = self.backward(x_dense, y_pred, y_one_hot)
# Update parameters
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
return losses
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
"""Predict class probabilities for a single sparse vector."""
# Convert sparse vector to dense vector
x_dense = np.zeros((self.input_dim, 1))
for idx, val in x_sparse.items():
x_dense[idx, 0] = val
# Forward pass
return self.forward(x_dense).flatten()
def predict(self, x_sparse: Dict[int, int]) -> int:
"""Predict class for a single sparse vector."""
probs = self.predict_proba(x_sparse)
return np.argmax(probs)
class TransactionClassifier:
"""Transaction classifier using Bag of Words and Logistic Regression."""
def __init__(self, model_path: Optional[Path] = None):
self.tokenizer = None
self.model = None
self.categories = []
self.category_to_idx = {}
self.idx_to_category = {}
if model_path and os.path.exists(model_path):
self.load(model_path)
def fit(
self,
transactions: List[Transaction],
categories: List[str],
verbose: bool = True,
) -> None:
"""Train classifier on transactions."""
# Extract texts and labels
texts = [t.row for t in transactions]
labels = [t.account2 for t in transactions]
# Build category mapping
self.categories = sorted(set(categories))
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
# Map labels to indices
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
# Initialize and fit tokenizer
self.tokenizer = Tokenizer(min_count=2)
self.tokenizer.fit(texts)
if verbose:
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
# Transform texts to feature vectors
x_vectors = [self.tokenizer.transform(text) for text in texts]
# Initialize and train model
self.model = LogisticRegression(
input_dim=self.tokenizer.vocab_size(),
output_dim=len(self.categories),
learning_rate=0.05,
reg_lambda=0.01,
max_iterations=3000,
)
self.model.train(x_vectors, label_indices, verbose=verbose)
def predict(self, text: str) -> Tuple[str, float, List[float]]:
"""
Predict category for a transaction text.
Returns:
tuple: (predicted_category, confidence, all_probabilities)
"""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Get predicted class
pred_idx = np.argmax(probs)
pred_category = self.idx_to_category[pred_idx]
confidence = probs[pred_idx]
return pred_category, confidence, probs
def sort_categories(self, text: str, categories: List[str]) -> None:
"""Sort categories by prediction probability for given text."""
if not self.model or not self.tokenizer:
return
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Create mapping from category to probability
cat_to_prob = {}
for idx, prob in enumerate(probs):
if idx in self.idx_to_category:
cat = self.idx_to_category[idx]
cat_to_prob[cat] = prob
# Sort categories by probability
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
def save(self, path: Path) -> None:
"""Save model to file."""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
model_data = {
"tokenizer_vocab": self.tokenizer.vocab,
"tokenizer_inverse_vocab": self.tokenizer.inverse_vocab,
"tokenizer_min_count": self.tokenizer.min_count,
"tokenizer_lowercase": self.tokenizer.lowercase,
"model_weights": self.model.weights,
"model_bias": self.model.bias,
"categories": self.categories,
"category_to_idx": self.category_to_idx,
"idx_to_category": self.idx_to_category,
}
with open(path, "wb") as f:
pickle.dump(model_data, f)
def load(self, path: Path) -> None:
"""Load model from file."""
with open(path, "rb") as f:
model_data = pickle.load(f)
# Restore tokenizer
self.tokenizer = Tokenizer(
min_count=model_data["tokenizer_min_count"],
lowercase=model_data["tokenizer_lowercase"],
)
self.tokenizer.vocab = model_data["tokenizer_vocab"]
self.tokenizer.inverse_vocab = model_data["tokenizer_inverse_vocab"]
# Restore categories
self.categories = model_data["categories"]
self.category_to_idx = model_data["category_to_idx"]
self.idx_to_category = model_data["idx_to_category"]
# Restore model
input_dim = len(self.tokenizer.vocab)
output_dim = len(self.categories)
self.model = LogisticRegression(input_dim, output_dim)
self.model.weights = model_data["model_weights"]
self.model.bias = model_data["model_bias"]
def train_classifier(
transactions: List[Transaction],
categories: List[str],
output_path: Path = Path("transaction_classifier.pkl"),
):
"""Train transaction classifier and save to file."""
global _classifier
# Filter transactions with account2
valid_transactions = [t for t in transactions if t.account2 in categories]
if len(valid_transactions) < 10:
logging.warning("Not enough transactions for training. Need at least 10.")
return
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
# Initialize and train classifier
_classifier = TransactionClassifier()
_classifier.fit(valid_transactions, categories, verbose=True)
# Save classifier
_classifier.save(output_path)
logging.info(f"Classifier saved to {output_path}")
def get_sort_categories(model_path: Path):
"""Get function to sort categories by prediction probability."""
_classifier = None

View File

@@ -3,7 +3,6 @@ import datetime
import logging
import re
import sys
from typing import Any, Dict, List
import toldg.models
import toldg.predict
@@ -19,7 +18,7 @@ def process_ldg_files(config: Config):
f_out.write(f_in.read())
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
if not cs:
logging.critical(f"No CSV config for {csv_file}.")
@@ -30,7 +29,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
return cs[0]
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
def date_to_date(date: str) -> str:
d = datetime.datetime.strptime(date, config.input_date_format)
return d.strftime(config.output_date_format)
@@ -38,7 +37,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
def flip_sign(amount: str) -> str:
return amount[1:] if amount.startswith("-") else "-" + amount
def row_to_transaction(row, fields):
def row_to_transaction(idx, row, fields):
"""The user can configure the mapping of CSV fields to the three
required fields date, amount and description via the CsvConfig."""
t = {field: row[index] for index, field in fields}
@@ -52,7 +51,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
account2=toldg.models.UNKNOWN_CATEGORY,
description=t["description"],
csv_file=csv_file,
row=csv_file + ", " + ", ".join(row),
row=", ".join(row),
index=idx,
)
fields = [(i, f) for i, f in enumerate(config.fields) if f]
@@ -60,15 +60,17 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
for _ in range(config.skip):
next(reader)
transactions = [row_to_transaction(row, fields) for row in reader if row]
rows = [row for row in reader if row]
transactions = [row_to_transaction(i, row, fields)
for i, row in enumerate(reversed(rows))]
return transactions
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
"""Apply mappings to transactions."""
for t in transactions:
if t.row in mappings:
mapping = mappings[t.row]
if t.key() in mappings:
mapping = mappings[t.key()]
assert isinstance(mapping, Mapping)
assert (
mapping.count > 0
@@ -82,7 +84,7 @@ def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
def process_csv_files(config: Config) -> List[Transaction]:
def process_csv_files(config: Config) -> list[Transaction]:
csv_files = toldg.utils.get_csv_files(config.input_directory)
transactions = []
for csv_file in csv_files:

View File

@@ -1,16 +1,14 @@
import logging
from pathlib import Path
from toldg.ml_predict import train_classifier
from toldg.models import Config
from toldg.predict import train_classifier
from toldg.process import process_csv_files
def train(config: Config):
"""Train a transaction classifier from csv files."""
logging.info("[train] Starting transaction classifier training")
# Process transactions to get training data
transactions = process_csv_files(config)
output_path = Path("transaction_classifier.pkl")
train_classifier(transactions, config.categories, config.model)
logging.info("[train] Training completed")
logging.info("Training completed")

View File

@@ -70,7 +70,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
mappings = read_mappings(mappings_file)
for t in transactions:
if t.row in mappings:
if t.key() in mappings:
pass
else:
mapping = Mapping(
@@ -79,7 +79,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
"narration": t.description,
}
)
mappings[t.row] = mapping
mappings[t.key()] = mapping
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
with open(mappings_file, "w") as f:

View File

@@ -8,12 +8,14 @@ BEANCOUNT_TRANSACTION_TEMPLATE = """
{t.date} * {description}{tags}
{account2:<40} {t.debit:<6} {t.currency}
{account1:<40} {t.credit:<6} {t.currency}
source_file: "{t.csv_file}"
source_index: {t.index}
source_row: "{t.row}"
"""
def format(t):
t.date = t.date.replace("/", "-")
tags = ""
description = None
if t.mapping:
@@ -56,3 +58,4 @@ def render_to_file(transactions: List[Transaction], config: Config):
content = "".join(format(t) for t in transactions)
with open(config.output_file, "a") as f:
f.write(content)