Compare commits

...

1 Commits

Author SHA1 Message Date
3ea2602b03 Implement numpy based prediction feature 2025-03-16 09:47:11 -04:00
6 changed files with 438 additions and 28 deletions

393
src/toldg/ml_predict.py Normal file
View File

@@ -0,0 +1,393 @@
import json
import logging
import re
import pickle
import os
from collections import Counter, defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Set, Any
import numpy as np
from toldg.fzf import iterfzf
from toldg.models import Transaction
class Tokenizer:
"""Simple tokenizer for transaction descriptions."""
def __init__(self, min_count: int = 2, lowercase: bool = True):
self.min_count = min_count
self.lowercase = lowercase
self.vocab = {} # word -> index
self.inverse_vocab = {} # index -> word
def fit(self, texts: List[str]) -> None:
"""Build vocabulary from texts."""
word_counts = Counter()
for text in texts:
tokens = self._tokenize(text)
word_counts.update(tokens)
# Filter words by minimum count
filtered_words = [word for word, count in word_counts.items()
if count >= self.min_count]
# Build vocabulary
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens."""
if self.lowercase:
text = text.lower()
# Simple tokenization: alphanumeric sequences
tokens = re.findall(r'\b\w+\b', text)
return tokens
def transform(self, text: str) -> Dict[int, int]:
"""Convert text to sparse vector (word index -> count)."""
tokens = self._tokenize(text)
counts = Counter()
for token in tokens:
if token in self.vocab:
counts[self.vocab[token]] += 1
return dict(counts)
def vocab_size(self) -> int:
"""Return vocabulary size."""
return len(self.vocab)
class LogisticRegression:
"""Multi-class logistic regression classifier."""
def __init__(self, input_dim: int, output_dim: int,
learning_rate: float = 0.01,
reg_lambda: float = 0.01,
max_iterations: int = 1000):
self.input_dim = input_dim
self.output_dim = output_dim
self.learning_rate = learning_rate
self.reg_lambda = reg_lambda
self.max_iterations = max_iterations
# Initialize weights and bias
# weights shape: (output_dim, input_dim)
self.weights = np.random.randn(output_dim, input_dim) * 0.01
self.bias = np.zeros((output_dim, 1))
def softmax(self, z: np.ndarray) -> np.ndarray:
"""Compute softmax function."""
# Subtract max for numerical stability
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
def forward(self, x: np.ndarray) -> np.ndarray:
"""Forward pass: compute probabilities."""
# x shape: (input_dim, batch_size)
# output shape: (output_dim, batch_size)
z = np.dot(self.weights, x) + self.bias
return self.softmax(z)
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
"""Compute cross-entropy loss with L2 regularization."""
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Cross-entropy loss
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
# L2 regularization
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
return ce_loss + reg_loss
def backward(self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for backpropagation."""
# x shape: (input_dim, batch_size)
# y_pred shape: (output_dim, batch_size)
# y_true shape: (output_dim, batch_size) - one-hot encoded
m = y_true.shape[1]
# Gradient of loss with respect to scores (dL/dz)
dz = y_pred - y_true
# Gradient of loss with respect to weights (dL/dW)
dw = (1/m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
# Gradient of loss with respect to bias (dL/db)
db = (1/m) * np.sum(dz, axis=1, keepdims=True)
return dw, db
def train(self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True) -> List[float]:
"""Train model on batched data."""
# Convert sparse vectors to dense matrix
batch_size = len(x_batch)
x_dense = np.zeros((self.input_dim, batch_size))
for i, x_sparse in enumerate(x_batch):
for idx, val in x_sparse.items():
x_dense[idx, i] = val
# Convert labels to one-hot encoding
y_one_hot = np.zeros((self.output_dim, batch_size))
for i, y in enumerate(y_batch):
y_one_hot[y, i] = 1
losses = []
for iteration in range(self.max_iterations):
# Forward pass
y_pred = self.forward(x_dense)
# Compute loss
loss = self.compute_loss(y_pred, y_one_hot)
losses.append(loss)
if verbose and iteration % 100 == 0:
print(f"Iteration {iteration}: loss = {loss:.4f}")
# Check convergence
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
if verbose:
print(f"Converged at iteration {iteration}")
break
# Backward pass
dw, db = self.backward(x_dense, y_pred, y_one_hot)
# Update parameters
self.weights -= self.learning_rate * dw
self.bias -= self.learning_rate * db
return losses
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
"""Predict class probabilities for a single sparse vector."""
# Convert sparse vector to dense vector
x_dense = np.zeros((self.input_dim, 1))
for idx, val in x_sparse.items():
x_dense[idx, 0] = val
# Forward pass
return self.forward(x_dense).flatten()
def predict(self, x_sparse: Dict[int, int]) -> int:
"""Predict class for a single sparse vector."""
probs = self.predict_proba(x_sparse)
return np.argmax(probs)
class TransactionClassifier:
"""Transaction classifier using Bag of Words and Logistic Regression."""
def __init__(self, model_path: Optional[Path] = None):
self.tokenizer = None
self.model = None
self.categories = []
self.category_to_idx = {}
self.idx_to_category = {}
if model_path and os.path.exists(model_path):
self.load(model_path)
def fit(self, transactions: List[Transaction], categories: List[str], verbose: bool = True) -> None:
"""Train classifier on transactions."""
# Extract texts and labels
texts = [t.row for t in transactions]
labels = [t.account2 for t in transactions]
# Build category mapping
self.categories = sorted(set(categories))
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
# Map labels to indices
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
# Initialize and fit tokenizer
self.tokenizer = Tokenizer(min_count=2)
self.tokenizer.fit(texts)
if verbose:
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
# Transform texts to feature vectors
x_vectors = [self.tokenizer.transform(text) for text in texts]
# Initialize and train model
self.model = LogisticRegression(
input_dim=self.tokenizer.vocab_size(),
output_dim=len(self.categories),
learning_rate=0.05,
reg_lambda=0.01,
max_iterations=2000
)
self.model.train(x_vectors, label_indices, verbose=verbose)
def predict(self, text: str) -> Tuple[str, float, List[float]]:
"""
Predict category for a transaction text.
Returns:
tuple: (predicted_category, confidence, all_probabilities)
"""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Get predicted class
pred_idx = np.argmax(probs)
pred_category = self.idx_to_category[pred_idx]
confidence = probs[pred_idx]
return pred_category, confidence, probs
def sort_categories(self, text: str, categories: List[str]) -> None:
"""Sort categories by prediction probability for given text."""
if not self.model or not self.tokenizer:
return
# Transform text to feature vector
x_vector = self.tokenizer.transform(text)
# Predict probabilities
probs = self.model.predict_proba(x_vector)
# Create mapping from category to probability
cat_to_prob = {}
for idx, prob in enumerate(probs):
if idx in self.idx_to_category:
cat = self.idx_to_category[idx]
cat_to_prob[cat] = prob
# Sort categories by probability
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
def save(self, path: Path) -> None:
"""Save model to file."""
if not self.model or not self.tokenizer:
raise ValueError("Model not trained yet")
model_data = {
'tokenizer_vocab': self.tokenizer.vocab,
'tokenizer_inverse_vocab': self.tokenizer.inverse_vocab,
'tokenizer_min_count': self.tokenizer.min_count,
'tokenizer_lowercase': self.tokenizer.lowercase,
'model_weights': self.model.weights,
'model_bias': self.model.bias,
'categories': self.categories,
'category_to_idx': self.category_to_idx,
'idx_to_category': self.idx_to_category
}
with open(path, 'wb') as f:
pickle.dump(model_data, f)
def load(self, path: Path) -> None:
"""Load model from file."""
with open(path, 'rb') as f:
model_data = pickle.load(f)
# Restore tokenizer
self.tokenizer = Tokenizer(
min_count=model_data['tokenizer_min_count'],
lowercase=model_data['tokenizer_lowercase']
)
self.tokenizer.vocab = model_data['tokenizer_vocab']
self.tokenizer.inverse_vocab = model_data['tokenizer_inverse_vocab']
# Restore categories
self.categories = model_data['categories']
self.category_to_idx = model_data['category_to_idx']
self.idx_to_category = model_data['idx_to_category']
# Restore model
input_dim = len(self.tokenizer.vocab)
output_dim = len(self.categories)
self.model = LogisticRegression(input_dim, output_dim)
self.model.weights = model_data['model_weights']
self.model.bias = model_data['model_bias']
# Global classifier instance
_classifier = None
def get_sort_categories():
"""Get function to sort categories by prediction probability."""
global _classifier
def sort_categories(row: str, categories: List[str]):
if _classifier is None:
return
_classifier.sort_categories(row, categories)
try:
model_path = Path("transaction_classifier.pkl")
_classifier = TransactionClassifier(model_path)
if _classifier.model is None:
logging.warning("No trained model found. Categories will not be sorted.")
except Exception as e:
logging.warning(f"Error loading classifier: {e}")
logging.warning("Categories will not be sorted.")
return sort_categories
def add_account2(transactions: List[Transaction], categories: List[str]):
"""Add account2 to unmapped transactions."""
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
if len(unmapped_transactions) == 0:
return
sort_categories = get_sort_categories()
for t in unmapped_transactions:
sort_categories(t.row, categories)
add_account2_interactive(t, categories)
def add_account2_interactive(transaction: Transaction, categories: List[str]):
"""Interactively add account2 to a transaction."""
t = transaction
account2 = None
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
while account2 is None:
account2 = iterfzf(categories, prompt=prompt)
transaction.account2 = account2
print(f"Assigned category '{account2}'.")
def train_classifier(transactions: List[Transaction], categories: List[str], output_path: Path = Path("transaction_classifier.pkl")):
"""Train transaction classifier and save to file."""
global _classifier
# Filter transactions with account2
valid_transactions = [t for t in transactions if t.account2 in categories]
if len(valid_transactions) < 10:
logging.warning("Not enough transactions for training. Need at least 10.")
return
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
# Initialize and train classifier
_classifier = TransactionClassifier()
_classifier.fit(valid_transactions, categories, verbose=True)
# Save classifier
_classifier.save(output_path)
logging.info(f"Classifier saved to {output_path}")

View File

@@ -50,6 +50,7 @@ class Config(BaseModel):
output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig]
categories: List[str]
model: Path = Path("transaction_classifier.pkl")
class Mapping(BaseModel):

View File

@@ -1,43 +1,50 @@
import logging
from pathlib import Path
from typing import List
from toldg.fzf import iterfzf
from toldg.models import UNKNOWN_CATEGORY, Transaction
from toldg.ml_predict import TransactionClassifier
from toldg.models import Transaction
def get_sort_categories():
def get_sort_categories(model_path: Path):
"""Get function to sort categories by prediction probability."""
_classifier = None
def sort_categories(row: str, categories: List[str]):
if learn is None:
if _classifier is None:
return
_, _, probs = learn.predict(row)
cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist()))
categories.sort(
key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True
)
_classifier.sort_categories(row, categories)
learn = None
try:
from fastai.text.all import load_learner
learn = load_learner("export.pkl")
except ModuleNotFoundError:
user_input = input("No fastai module. Type yes to continue anyway.")
if user_input.strip().lower() != "yes":
raise Exception("fastai module missing")
model_path = Path("transaction_classifier.pkl")
_classifier = TransactionClassifier(model_path)
if _classifier.model is None:
logging.warning("No trained model found. Categories will not be sorted.")
except Exception as e:
logging.warning(f"Error loading classifier: {e}")
logging.warning("Categories will not be sorted.")
return sort_categories
def add_account2(transactions: List[Transaction], categories: List[str]):
unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions))
def add_account2(
model_path: Path, transactions: List[Transaction], categories: List[str]
):
"""Add account2 to unmapped transactions."""
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
if len(unmapped_transactions) == 0:
return
sort_categories = get_sort_categories()
sort_categories = get_sort_categories(model_path)
for t in unmapped_transactions:
sort_categories(t.row, categories)
add_account2_interactive(t, categories)
def add_account2_interactive(transaction: Transaction, categories: List[str]):
"""Interactively add account2 to a transaction."""
t = transaction
account2 = None
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "

View File

@@ -92,7 +92,7 @@ def process_csv_files(config: Config) -> List[Transaction]:
mappings = toldg.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings)
toldg.predict.add_account2(transactions, config.categories)
toldg.predict.add_account2(config.model, transactions, config.categories)
toldg.utils.write_mappings(transactions, config.mappings_file)
toldg.write.render_to_file(transactions, config)
return transactions

View File

@@ -1,9 +1,16 @@
from toldg.models import Config, CsvConfig, Mapping, Transaction
import logging
from pathlib import Path
from toldg.ml_predict import train_classifier
from toldg.models import Config
from toldg.process import process_csv_files
def train(config: Config):
print("[train] start")
"""Train a transaction classifier from csv files."""
logging.info("[train] Starting transaction classifier training")
# Process transactions to get training data
transactions = process_csv_files(config)
for t in transactions:
pass
output_path = Path("transaction_classifier.pkl")
train_classifier(transactions, config.categories, config.model)
logging.info("[train] Training completed")

View File

@@ -6,8 +6,8 @@ from toldg.utils import category_to_bean
BEANCOUNT_TRANSACTION_TEMPLATE = """
{t.date} * {description}{tags}
{t.account2:<40} {t.debit:<6} {t.currency}
{t.account1:<40} {t.credit:<6} {t.currency}
{account2:<40} {t.debit:<6} {t.currency}
{account1:<40} {t.credit:<6} {t.currency}
"""
@@ -40,13 +40,15 @@ def format(t):
if not t.credit.startswith("-"):
t.credit = " " + t.credit
t.account1 = category_to_bean(t.account1)
t.account2 = category_to_bean(t.account2)
if t.currency == "EUR":
t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",")
t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",")
return BEANCOUNT_TRANSACTION_TEMPLATE.format(
t=t, description=description, tags=tags
t=t,
description=description,
tags=tags,
account1=category_to_bean(t.account1),
account2=category_to_bean(t.account2),
)