From 3ea2602b030ac73035ed44cbaa87e500c99cd71a Mon Sep 17 00:00:00 2001 From: Felix Martin Date: Sun, 16 Mar 2025 09:47:11 -0400 Subject: [PATCH] Implement numpy based prediction feature --- src/toldg/ml_predict.py | 393 ++++++++++++++++++++++++++++++++++++++++ src/toldg/models.py | 1 + src/toldg/predict.py | 43 +++-- src/toldg/process.py | 2 +- src/toldg/train.py | 15 +- src/toldg/write.py | 12 +- 6 files changed, 438 insertions(+), 28 deletions(-) create mode 100644 src/toldg/ml_predict.py diff --git a/src/toldg/ml_predict.py b/src/toldg/ml_predict.py new file mode 100644 index 0000000..8204bec --- /dev/null +++ b/src/toldg/ml_predict.py @@ -0,0 +1,393 @@ +import json +import logging +import re +import pickle +import os +from collections import Counter, defaultdict +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Set, Any + +import numpy as np + +from toldg.fzf import iterfzf +from toldg.models import Transaction + + +class Tokenizer: + """Simple tokenizer for transaction descriptions.""" + + def __init__(self, min_count: int = 2, lowercase: bool = True): + self.min_count = min_count + self.lowercase = lowercase + self.vocab = {} # word -> index + self.inverse_vocab = {} # index -> word + + def fit(self, texts: List[str]) -> None: + """Build vocabulary from texts.""" + word_counts = Counter() + + for text in texts: + tokens = self._tokenize(text) + word_counts.update(tokens) + + # Filter words by minimum count + filtered_words = [word for word, count in word_counts.items() + if count >= self.min_count] + + # Build vocabulary + self.vocab = {word: idx for idx, word in enumerate(filtered_words)} + self.inverse_vocab = {idx: word for word, idx in self.vocab.items()} + + def _tokenize(self, text: str) -> List[str]: + """Split text into tokens.""" + if self.lowercase: + text = text.lower() + + # Simple tokenization: alphanumeric sequences + tokens = re.findall(r'\b\w+\b', text) + return tokens + + def transform(self, text: str) -> Dict[int, int]: + """Convert text to sparse vector (word index -> count).""" + tokens = self._tokenize(text) + counts = Counter() + + for token in tokens: + if token in self.vocab: + counts[self.vocab[token]] += 1 + + return dict(counts) + + def vocab_size(self) -> int: + """Return vocabulary size.""" + return len(self.vocab) + + +class LogisticRegression: + """Multi-class logistic regression classifier.""" + + def __init__(self, input_dim: int, output_dim: int, + learning_rate: float = 0.01, + reg_lambda: float = 0.01, + max_iterations: int = 1000): + self.input_dim = input_dim + self.output_dim = output_dim + self.learning_rate = learning_rate + self.reg_lambda = reg_lambda + self.max_iterations = max_iterations + + # Initialize weights and bias + # weights shape: (output_dim, input_dim) + self.weights = np.random.randn(output_dim, input_dim) * 0.01 + self.bias = np.zeros((output_dim, 1)) + + def softmax(self, z: np.ndarray) -> np.ndarray: + """Compute softmax function.""" + # Subtract max for numerical stability + exp_z = np.exp(z - np.max(z, axis=0, keepdims=True)) + return exp_z / np.sum(exp_z, axis=0, keepdims=True) + + def forward(self, x: np.ndarray) -> np.ndarray: + """Forward pass: compute probabilities.""" + # x shape: (input_dim, batch_size) + # output shape: (output_dim, batch_size) + z = np.dot(self.weights, x) + self.bias + return self.softmax(z) + + def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float: + """Compute cross-entropy loss with L2 regularization.""" + # y_pred shape: (output_dim, batch_size) + # y_true shape: (output_dim, batch_size) - one-hot encoded + m = y_true.shape[1] + + # Cross-entropy loss + ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m + + # L2 regularization + reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights)) + + return ce_loss + reg_loss + + def backward(self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + """Compute gradients for backpropagation.""" + # x shape: (input_dim, batch_size) + # y_pred shape: (output_dim, batch_size) + # y_true shape: (output_dim, batch_size) - one-hot encoded + m = y_true.shape[1] + + # Gradient of loss with respect to scores (dL/dz) + dz = y_pred - y_true + + # Gradient of loss with respect to weights (dL/dW) + dw = (1/m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights + + # Gradient of loss with respect to bias (dL/db) + db = (1/m) * np.sum(dz, axis=1, keepdims=True) + + return dw, db + + def train(self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True) -> List[float]: + """Train model on batched data.""" + # Convert sparse vectors to dense matrix + batch_size = len(x_batch) + x_dense = np.zeros((self.input_dim, batch_size)) + + for i, x_sparse in enumerate(x_batch): + for idx, val in x_sparse.items(): + x_dense[idx, i] = val + + # Convert labels to one-hot encoding + y_one_hot = np.zeros((self.output_dim, batch_size)) + for i, y in enumerate(y_batch): + y_one_hot[y, i] = 1 + + losses = [] + + for iteration in range(self.max_iterations): + # Forward pass + y_pred = self.forward(x_dense) + + # Compute loss + loss = self.compute_loss(y_pred, y_one_hot) + losses.append(loss) + + if verbose and iteration % 100 == 0: + print(f"Iteration {iteration}: loss = {loss:.4f}") + + # Check convergence + if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5: + if verbose: + print(f"Converged at iteration {iteration}") + break + + # Backward pass + dw, db = self.backward(x_dense, y_pred, y_one_hot) + + # Update parameters + self.weights -= self.learning_rate * dw + self.bias -= self.learning_rate * db + + return losses + + def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray: + """Predict class probabilities for a single sparse vector.""" + # Convert sparse vector to dense vector + x_dense = np.zeros((self.input_dim, 1)) + for idx, val in x_sparse.items(): + x_dense[idx, 0] = val + + # Forward pass + return self.forward(x_dense).flatten() + + def predict(self, x_sparse: Dict[int, int]) -> int: + """Predict class for a single sparse vector.""" + probs = self.predict_proba(x_sparse) + return np.argmax(probs) + + +class TransactionClassifier: + """Transaction classifier using Bag of Words and Logistic Regression.""" + + def __init__(self, model_path: Optional[Path] = None): + self.tokenizer = None + self.model = None + self.categories = [] + self.category_to_idx = {} + self.idx_to_category = {} + + if model_path and os.path.exists(model_path): + self.load(model_path) + + def fit(self, transactions: List[Transaction], categories: List[str], verbose: bool = True) -> None: + """Train classifier on transactions.""" + # Extract texts and labels + texts = [t.row for t in transactions] + labels = [t.account2 for t in transactions] + + # Build category mapping + self.categories = sorted(set(categories)) + self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)} + self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()} + + # Map labels to indices + label_indices = [self.category_to_idx.get(label, 0) for label in labels] + + # Initialize and fit tokenizer + self.tokenizer = Tokenizer(min_count=2) + self.tokenizer.fit(texts) + + if verbose: + print(f"Vocabulary size: {self.tokenizer.vocab_size()}") + + # Transform texts to feature vectors + x_vectors = [self.tokenizer.transform(text) for text in texts] + + # Initialize and train model + self.model = LogisticRegression( + input_dim=self.tokenizer.vocab_size(), + output_dim=len(self.categories), + learning_rate=0.05, + reg_lambda=0.01, + max_iterations=2000 + ) + + self.model.train(x_vectors, label_indices, verbose=verbose) + + def predict(self, text: str) -> Tuple[str, float, List[float]]: + """ + Predict category for a transaction text. + + Returns: + tuple: (predicted_category, confidence, all_probabilities) + """ + if not self.model or not self.tokenizer: + raise ValueError("Model not trained yet") + + # Transform text to feature vector + x_vector = self.tokenizer.transform(text) + + # Predict probabilities + probs = self.model.predict_proba(x_vector) + + # Get predicted class + pred_idx = np.argmax(probs) + pred_category = self.idx_to_category[pred_idx] + confidence = probs[pred_idx] + + return pred_category, confidence, probs + + def sort_categories(self, text: str, categories: List[str]) -> None: + """Sort categories by prediction probability for given text.""" + if not self.model or not self.tokenizer: + return + + # Transform text to feature vector + x_vector = self.tokenizer.transform(text) + + # Predict probabilities + probs = self.model.predict_proba(x_vector) + + # Create mapping from category to probability + cat_to_prob = {} + for idx, prob in enumerate(probs): + if idx in self.idx_to_category: + cat = self.idx_to_category[idx] + cat_to_prob[cat] = prob + + # Sort categories by probability + categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True) + + def save(self, path: Path) -> None: + """Save model to file.""" + if not self.model or not self.tokenizer: + raise ValueError("Model not trained yet") + + model_data = { + 'tokenizer_vocab': self.tokenizer.vocab, + 'tokenizer_inverse_vocab': self.tokenizer.inverse_vocab, + 'tokenizer_min_count': self.tokenizer.min_count, + 'tokenizer_lowercase': self.tokenizer.lowercase, + 'model_weights': self.model.weights, + 'model_bias': self.model.bias, + 'categories': self.categories, + 'category_to_idx': self.category_to_idx, + 'idx_to_category': self.idx_to_category + } + + with open(path, 'wb') as f: + pickle.dump(model_data, f) + + def load(self, path: Path) -> None: + """Load model from file.""" + with open(path, 'rb') as f: + model_data = pickle.load(f) + + # Restore tokenizer + self.tokenizer = Tokenizer( + min_count=model_data['tokenizer_min_count'], + lowercase=model_data['tokenizer_lowercase'] + ) + self.tokenizer.vocab = model_data['tokenizer_vocab'] + self.tokenizer.inverse_vocab = model_data['tokenizer_inverse_vocab'] + + # Restore categories + self.categories = model_data['categories'] + self.category_to_idx = model_data['category_to_idx'] + self.idx_to_category = model_data['idx_to_category'] + + # Restore model + input_dim = len(self.tokenizer.vocab) + output_dim = len(self.categories) + self.model = LogisticRegression(input_dim, output_dim) + self.model.weights = model_data['model_weights'] + self.model.bias = model_data['model_bias'] + + +# Global classifier instance +_classifier = None + + +def get_sort_categories(): + """Get function to sort categories by prediction probability.""" + global _classifier + + def sort_categories(row: str, categories: List[str]): + if _classifier is None: + return + _classifier.sort_categories(row, categories) + + try: + model_path = Path("transaction_classifier.pkl") + _classifier = TransactionClassifier(model_path) + if _classifier.model is None: + logging.warning("No trained model found. Categories will not be sorted.") + except Exception as e: + logging.warning(f"Error loading classifier: {e}") + logging.warning("Categories will not be sorted.") + + return sort_categories + + +def add_account2(transactions: List[Transaction], categories: List[str]): + """Add account2 to unmapped transactions.""" + unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions)) + if len(unmapped_transactions) == 0: + return + + sort_categories = get_sort_categories() + for t in unmapped_transactions: + sort_categories(t.row, categories) + add_account2_interactive(t, categories) + + +def add_account2_interactive(transaction: Transaction, categories: List[str]): + """Interactively add account2 to a transaction.""" + t = transaction + account2 = None + prompt = f"{t.account1} {t.date} {t.description} {t.debit} > " + while account2 is None: + account2 = iterfzf(categories, prompt=prompt) + transaction.account2 = account2 + print(f"Assigned category '{account2}'.") + + +def train_classifier(transactions: List[Transaction], categories: List[str], output_path: Path = Path("transaction_classifier.pkl")): + """Train transaction classifier and save to file.""" + global _classifier + + # Filter transactions with account2 + valid_transactions = [t for t in transactions if t.account2 in categories] + + if len(valid_transactions) < 10: + logging.warning("Not enough transactions for training. Need at least 10.") + return + + logging.info(f"Training classifier on {len(valid_transactions)} transactions") + + # Initialize and train classifier + _classifier = TransactionClassifier() + _classifier.fit(valid_transactions, categories, verbose=True) + + # Save classifier + _classifier.save(output_path) + logging.info(f"Classifier saved to {output_path}") diff --git a/src/toldg/models.py b/src/toldg/models.py index 2558a1f..40cbe61 100644 --- a/src/toldg/models.py +++ b/src/toldg/models.py @@ -50,6 +50,7 @@ class Config(BaseModel): output_file: Path = Path("output.ldg") csv_configs: List[CsvConfig] categories: List[str] + model: Path = Path("transaction_classifier.pkl") class Mapping(BaseModel): diff --git a/src/toldg/predict.py b/src/toldg/predict.py index 59a56ec..57d9e9d 100644 --- a/src/toldg/predict.py +++ b/src/toldg/predict.py @@ -1,43 +1,50 @@ +import logging +from pathlib import Path from typing import List from toldg.fzf import iterfzf -from toldg.models import UNKNOWN_CATEGORY, Transaction +from toldg.ml_predict import TransactionClassifier +from toldg.models import Transaction -def get_sort_categories(): +def get_sort_categories(model_path: Path): + """Get function to sort categories by prediction probability.""" + _classifier = None + def sort_categories(row: str, categories: List[str]): - if learn is None: + if _classifier is None: return - _, _, probs = learn.predict(row) - cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist())) - categories.sort( - key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True - ) + _classifier.sort_categories(row, categories) - learn = None try: - from fastai.text.all import load_learner - learn = load_learner("export.pkl") - except ModuleNotFoundError: - user_input = input("No fastai module. Type yes to continue anyway.") - if user_input.strip().lower() != "yes": - raise Exception("fastai module missing") + model_path = Path("transaction_classifier.pkl") + _classifier = TransactionClassifier(model_path) + if _classifier.model is None: + logging.warning("No trained model found. Categories will not be sorted.") + except Exception as e: + logging.warning(f"Error loading classifier: {e}") + logging.warning("Categories will not be sorted.") return sort_categories -def add_account2(transactions: List[Transaction], categories: List[str]): - unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions)) +def add_account2( + model_path: Path, transactions: List[Transaction], categories: List[str] +): + """Add account2 to unmapped transactions.""" + unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions)) if len(unmapped_transactions) == 0: return - sort_categories = get_sort_categories() + + sort_categories = get_sort_categories(model_path) for t in unmapped_transactions: sort_categories(t.row, categories) add_account2_interactive(t, categories) def add_account2_interactive(transaction: Transaction, categories: List[str]): + """Interactively add account2 to a transaction.""" t = transaction account2 = None prompt = f"{t.account1} {t.date} {t.description} {t.debit} > " diff --git a/src/toldg/process.py b/src/toldg/process.py index 8e8b3be..4551113 100644 --- a/src/toldg/process.py +++ b/src/toldg/process.py @@ -92,7 +92,7 @@ def process_csv_files(config: Config) -> List[Transaction]: mappings = toldg.utils.read_mappings(config.mappings_file) apply_mappings(transactions, mappings) - toldg.predict.add_account2(transactions, config.categories) + toldg.predict.add_account2(config.model, transactions, config.categories) toldg.utils.write_mappings(transactions, config.mappings_file) toldg.write.render_to_file(transactions, config) return transactions diff --git a/src/toldg/train.py b/src/toldg/train.py index e829fde..175f111 100644 --- a/src/toldg/train.py +++ b/src/toldg/train.py @@ -1,9 +1,16 @@ -from toldg.models import Config, CsvConfig, Mapping, Transaction +import logging +from pathlib import Path + +from toldg.ml_predict import train_classifier +from toldg.models import Config from toldg.process import process_csv_files def train(config: Config): - print("[train] start") + """Train a transaction classifier from csv files.""" + logging.info("[train] Starting transaction classifier training") + # Process transactions to get training data transactions = process_csv_files(config) - for t in transactions: - pass + output_path = Path("transaction_classifier.pkl") + train_classifier(transactions, config.categories, config.model) + logging.info("[train] Training completed") diff --git a/src/toldg/write.py b/src/toldg/write.py index 1a8bf85..0b4f9e7 100644 --- a/src/toldg/write.py +++ b/src/toldg/write.py @@ -6,8 +6,8 @@ from toldg.utils import category_to_bean BEANCOUNT_TRANSACTION_TEMPLATE = """ {t.date} * {description}{tags} - {t.account2:<40} {t.debit:<6} {t.currency} - {t.account1:<40} {t.credit:<6} {t.currency} + {account2:<40} {t.debit:<6} {t.currency} + {account1:<40} {t.credit:<6} {t.currency} """ @@ -40,13 +40,15 @@ def format(t): if not t.credit.startswith("-"): t.credit = " " + t.credit - t.account1 = category_to_bean(t.account1) - t.account2 = category_to_bean(t.account2) if t.currency == "EUR": t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",") t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",") return BEANCOUNT_TRANSACTION_TEMPLATE.format( - t=t, description=description, tags=tags + t=t, + description=description, + tags=tags, + account1=category_to_bean(t.account1), + account2=category_to_bean(t.account2), )