generated from felixm/defaultpy
Implement numpy based prediction feature
This commit is contained in:
393
src/toldg/ml_predict.py
Normal file
393
src/toldg/ml_predict.py
Normal file
@@ -0,0 +1,393 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import pickle
|
||||
import os
|
||||
from collections import Counter, defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple, Set, Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from toldg.fzf import iterfzf
|
||||
from toldg.models import Transaction
|
||||
|
||||
|
||||
class Tokenizer:
|
||||
"""Simple tokenizer for transaction descriptions."""
|
||||
|
||||
def __init__(self, min_count: int = 2, lowercase: bool = True):
|
||||
self.min_count = min_count
|
||||
self.lowercase = lowercase
|
||||
self.vocab = {} # word -> index
|
||||
self.inverse_vocab = {} # index -> word
|
||||
|
||||
def fit(self, texts: List[str]) -> None:
|
||||
"""Build vocabulary from texts."""
|
||||
word_counts = Counter()
|
||||
|
||||
for text in texts:
|
||||
tokens = self._tokenize(text)
|
||||
word_counts.update(tokens)
|
||||
|
||||
# Filter words by minimum count
|
||||
filtered_words = [word for word, count in word_counts.items()
|
||||
if count >= self.min_count]
|
||||
|
||||
# Build vocabulary
|
||||
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
|
||||
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
|
||||
|
||||
def _tokenize(self, text: str) -> List[str]:
|
||||
"""Split text into tokens."""
|
||||
if self.lowercase:
|
||||
text = text.lower()
|
||||
|
||||
# Simple tokenization: alphanumeric sequences
|
||||
tokens = re.findall(r'\b\w+\b', text)
|
||||
return tokens
|
||||
|
||||
def transform(self, text: str) -> Dict[int, int]:
|
||||
"""Convert text to sparse vector (word index -> count)."""
|
||||
tokens = self._tokenize(text)
|
||||
counts = Counter()
|
||||
|
||||
for token in tokens:
|
||||
if token in self.vocab:
|
||||
counts[self.vocab[token]] += 1
|
||||
|
||||
return dict(counts)
|
||||
|
||||
def vocab_size(self) -> int:
|
||||
"""Return vocabulary size."""
|
||||
return len(self.vocab)
|
||||
|
||||
|
||||
class LogisticRegression:
|
||||
"""Multi-class logistic regression classifier."""
|
||||
|
||||
def __init__(self, input_dim: int, output_dim: int,
|
||||
learning_rate: float = 0.01,
|
||||
reg_lambda: float = 0.01,
|
||||
max_iterations: int = 1000):
|
||||
self.input_dim = input_dim
|
||||
self.output_dim = output_dim
|
||||
self.learning_rate = learning_rate
|
||||
self.reg_lambda = reg_lambda
|
||||
self.max_iterations = max_iterations
|
||||
|
||||
# Initialize weights and bias
|
||||
# weights shape: (output_dim, input_dim)
|
||||
self.weights = np.random.randn(output_dim, input_dim) * 0.01
|
||||
self.bias = np.zeros((output_dim, 1))
|
||||
|
||||
def softmax(self, z: np.ndarray) -> np.ndarray:
|
||||
"""Compute softmax function."""
|
||||
# Subtract max for numerical stability
|
||||
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
|
||||
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
|
||||
|
||||
def forward(self, x: np.ndarray) -> np.ndarray:
|
||||
"""Forward pass: compute probabilities."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# output shape: (output_dim, batch_size)
|
||||
z = np.dot(self.weights, x) + self.bias
|
||||
return self.softmax(z)
|
||||
|
||||
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
|
||||
"""Compute cross-entropy loss with L2 regularization."""
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Cross-entropy loss
|
||||
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
|
||||
|
||||
# L2 regularization
|
||||
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
|
||||
|
||||
return ce_loss + reg_loss
|
||||
|
||||
def backward(self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Compute gradients for backpropagation."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Gradient of loss with respect to scores (dL/dz)
|
||||
dz = y_pred - y_true
|
||||
|
||||
# Gradient of loss with respect to weights (dL/dW)
|
||||
dw = (1/m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
|
||||
|
||||
# Gradient of loss with respect to bias (dL/db)
|
||||
db = (1/m) * np.sum(dz, axis=1, keepdims=True)
|
||||
|
||||
return dw, db
|
||||
|
||||
def train(self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True) -> List[float]:
|
||||
"""Train model on batched data."""
|
||||
# Convert sparse vectors to dense matrix
|
||||
batch_size = len(x_batch)
|
||||
x_dense = np.zeros((self.input_dim, batch_size))
|
||||
|
||||
for i, x_sparse in enumerate(x_batch):
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, i] = val
|
||||
|
||||
# Convert labels to one-hot encoding
|
||||
y_one_hot = np.zeros((self.output_dim, batch_size))
|
||||
for i, y in enumerate(y_batch):
|
||||
y_one_hot[y, i] = 1
|
||||
|
||||
losses = []
|
||||
|
||||
for iteration in range(self.max_iterations):
|
||||
# Forward pass
|
||||
y_pred = self.forward(x_dense)
|
||||
|
||||
# Compute loss
|
||||
loss = self.compute_loss(y_pred, y_one_hot)
|
||||
losses.append(loss)
|
||||
|
||||
if verbose and iteration % 100 == 0:
|
||||
print(f"Iteration {iteration}: loss = {loss:.4f}")
|
||||
|
||||
# Check convergence
|
||||
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
|
||||
if verbose:
|
||||
print(f"Converged at iteration {iteration}")
|
||||
break
|
||||
|
||||
# Backward pass
|
||||
dw, db = self.backward(x_dense, y_pred, y_one_hot)
|
||||
|
||||
# Update parameters
|
||||
self.weights -= self.learning_rate * dw
|
||||
self.bias -= self.learning_rate * db
|
||||
|
||||
return losses
|
||||
|
||||
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
|
||||
"""Predict class probabilities for a single sparse vector."""
|
||||
# Convert sparse vector to dense vector
|
||||
x_dense = np.zeros((self.input_dim, 1))
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, 0] = val
|
||||
|
||||
# Forward pass
|
||||
return self.forward(x_dense).flatten()
|
||||
|
||||
def predict(self, x_sparse: Dict[int, int]) -> int:
|
||||
"""Predict class for a single sparse vector."""
|
||||
probs = self.predict_proba(x_sparse)
|
||||
return np.argmax(probs)
|
||||
|
||||
|
||||
class TransactionClassifier:
|
||||
"""Transaction classifier using Bag of Words and Logistic Regression."""
|
||||
|
||||
def __init__(self, model_path: Optional[Path] = None):
|
||||
self.tokenizer = None
|
||||
self.model = None
|
||||
self.categories = []
|
||||
self.category_to_idx = {}
|
||||
self.idx_to_category = {}
|
||||
|
||||
if model_path and os.path.exists(model_path):
|
||||
self.load(model_path)
|
||||
|
||||
def fit(self, transactions: List[Transaction], categories: List[str], verbose: bool = True) -> None:
|
||||
"""Train classifier on transactions."""
|
||||
# Extract texts and labels
|
||||
texts = [t.row for t in transactions]
|
||||
labels = [t.account2 for t in transactions]
|
||||
|
||||
# Build category mapping
|
||||
self.categories = sorted(set(categories))
|
||||
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
|
||||
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
|
||||
|
||||
# Map labels to indices
|
||||
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
|
||||
|
||||
# Initialize and fit tokenizer
|
||||
self.tokenizer = Tokenizer(min_count=2)
|
||||
self.tokenizer.fit(texts)
|
||||
|
||||
if verbose:
|
||||
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
|
||||
|
||||
# Transform texts to feature vectors
|
||||
x_vectors = [self.tokenizer.transform(text) for text in texts]
|
||||
|
||||
# Initialize and train model
|
||||
self.model = LogisticRegression(
|
||||
input_dim=self.tokenizer.vocab_size(),
|
||||
output_dim=len(self.categories),
|
||||
learning_rate=0.05,
|
||||
reg_lambda=0.01,
|
||||
max_iterations=2000
|
||||
)
|
||||
|
||||
self.model.train(x_vectors, label_indices, verbose=verbose)
|
||||
|
||||
def predict(self, text: str) -> Tuple[str, float, List[float]]:
|
||||
"""
|
||||
Predict category for a transaction text.
|
||||
|
||||
Returns:
|
||||
tuple: (predicted_category, confidence, all_probabilities)
|
||||
"""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Get predicted class
|
||||
pred_idx = np.argmax(probs)
|
||||
pred_category = self.idx_to_category[pred_idx]
|
||||
confidence = probs[pred_idx]
|
||||
|
||||
return pred_category, confidence, probs
|
||||
|
||||
def sort_categories(self, text: str, categories: List[str]) -> None:
|
||||
"""Sort categories by prediction probability for given text."""
|
||||
if not self.model or not self.tokenizer:
|
||||
return
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Create mapping from category to probability
|
||||
cat_to_prob = {}
|
||||
for idx, prob in enumerate(probs):
|
||||
if idx in self.idx_to_category:
|
||||
cat = self.idx_to_category[idx]
|
||||
cat_to_prob[cat] = prob
|
||||
|
||||
# Sort categories by probability
|
||||
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
|
||||
|
||||
def save(self, path: Path) -> None:
|
||||
"""Save model to file."""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
model_data = {
|
||||
'tokenizer_vocab': self.tokenizer.vocab,
|
||||
'tokenizer_inverse_vocab': self.tokenizer.inverse_vocab,
|
||||
'tokenizer_min_count': self.tokenizer.min_count,
|
||||
'tokenizer_lowercase': self.tokenizer.lowercase,
|
||||
'model_weights': self.model.weights,
|
||||
'model_bias': self.model.bias,
|
||||
'categories': self.categories,
|
||||
'category_to_idx': self.category_to_idx,
|
||||
'idx_to_category': self.idx_to_category
|
||||
}
|
||||
|
||||
with open(path, 'wb') as f:
|
||||
pickle.dump(model_data, f)
|
||||
|
||||
def load(self, path: Path) -> None:
|
||||
"""Load model from file."""
|
||||
with open(path, 'rb') as f:
|
||||
model_data = pickle.load(f)
|
||||
|
||||
# Restore tokenizer
|
||||
self.tokenizer = Tokenizer(
|
||||
min_count=model_data['tokenizer_min_count'],
|
||||
lowercase=model_data['tokenizer_lowercase']
|
||||
)
|
||||
self.tokenizer.vocab = model_data['tokenizer_vocab']
|
||||
self.tokenizer.inverse_vocab = model_data['tokenizer_inverse_vocab']
|
||||
|
||||
# Restore categories
|
||||
self.categories = model_data['categories']
|
||||
self.category_to_idx = model_data['category_to_idx']
|
||||
self.idx_to_category = model_data['idx_to_category']
|
||||
|
||||
# Restore model
|
||||
input_dim = len(self.tokenizer.vocab)
|
||||
output_dim = len(self.categories)
|
||||
self.model = LogisticRegression(input_dim, output_dim)
|
||||
self.model.weights = model_data['model_weights']
|
||||
self.model.bias = model_data['model_bias']
|
||||
|
||||
|
||||
# Global classifier instance
|
||||
_classifier = None
|
||||
|
||||
|
||||
def get_sort_categories():
|
||||
"""Get function to sort categories by prediction probability."""
|
||||
global _classifier
|
||||
|
||||
def sort_categories(row: str, categories: List[str]):
|
||||
if _classifier is None:
|
||||
return
|
||||
_classifier.sort_categories(row, categories)
|
||||
|
||||
try:
|
||||
model_path = Path("transaction_classifier.pkl")
|
||||
_classifier = TransactionClassifier(model_path)
|
||||
if _classifier.model is None:
|
||||
logging.warning("No trained model found. Categories will not be sorted.")
|
||||
except Exception as e:
|
||||
logging.warning(f"Error loading classifier: {e}")
|
||||
logging.warning("Categories will not be sorted.")
|
||||
|
||||
return sort_categories
|
||||
|
||||
|
||||
def add_account2(transactions: List[Transaction], categories: List[str]):
|
||||
"""Add account2 to unmapped transactions."""
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
|
||||
if len(unmapped_transactions) == 0:
|
||||
return
|
||||
|
||||
sort_categories = get_sort_categories()
|
||||
for t in unmapped_transactions:
|
||||
sort_categories(t.row, categories)
|
||||
add_account2_interactive(t, categories)
|
||||
|
||||
|
||||
def add_account2_interactive(transaction: Transaction, categories: List[str]):
|
||||
"""Interactively add account2 to a transaction."""
|
||||
t = transaction
|
||||
account2 = None
|
||||
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
|
||||
while account2 is None:
|
||||
account2 = iterfzf(categories, prompt=prompt)
|
||||
transaction.account2 = account2
|
||||
print(f"Assigned category '{account2}'.")
|
||||
|
||||
|
||||
def train_classifier(transactions: List[Transaction], categories: List[str], output_path: Path = Path("transaction_classifier.pkl")):
|
||||
"""Train transaction classifier and save to file."""
|
||||
global _classifier
|
||||
|
||||
# Filter transactions with account2
|
||||
valid_transactions = [t for t in transactions if t.account2 in categories]
|
||||
|
||||
if len(valid_transactions) < 10:
|
||||
logging.warning("Not enough transactions for training. Need at least 10.")
|
||||
return
|
||||
|
||||
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
|
||||
|
||||
# Initialize and train classifier
|
||||
_classifier = TransactionClassifier()
|
||||
_classifier.fit(valid_transactions, categories, verbose=True)
|
||||
|
||||
# Save classifier
|
||||
_classifier.save(output_path)
|
||||
logging.info(f"Classifier saved to {output_path}")
|
||||
@@ -50,6 +50,7 @@ class Config(BaseModel):
|
||||
output_file: Path = Path("output.ldg")
|
||||
csv_configs: List[CsvConfig]
|
||||
categories: List[str]
|
||||
model: Path = Path("transaction_classifier.pkl")
|
||||
|
||||
|
||||
class Mapping(BaseModel):
|
||||
|
||||
@@ -1,43 +1,50 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from toldg.fzf import iterfzf
|
||||
from toldg.models import UNKNOWN_CATEGORY, Transaction
|
||||
from toldg.ml_predict import TransactionClassifier
|
||||
from toldg.models import Transaction
|
||||
|
||||
|
||||
def get_sort_categories():
|
||||
def get_sort_categories(model_path: Path):
|
||||
"""Get function to sort categories by prediction probability."""
|
||||
_classifier = None
|
||||
|
||||
def sort_categories(row: str, categories: List[str]):
|
||||
if learn is None:
|
||||
if _classifier is None:
|
||||
return
|
||||
_, _, probs = learn.predict(row)
|
||||
cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist()))
|
||||
categories.sort(
|
||||
key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True
|
||||
)
|
||||
_classifier.sort_categories(row, categories)
|
||||
|
||||
learn = None
|
||||
try:
|
||||
from fastai.text.all import load_learner
|
||||
|
||||
learn = load_learner("export.pkl")
|
||||
except ModuleNotFoundError:
|
||||
user_input = input("No fastai module. Type yes to continue anyway.")
|
||||
if user_input.strip().lower() != "yes":
|
||||
raise Exception("fastai module missing")
|
||||
model_path = Path("transaction_classifier.pkl")
|
||||
_classifier = TransactionClassifier(model_path)
|
||||
if _classifier.model is None:
|
||||
logging.warning("No trained model found. Categories will not be sorted.")
|
||||
except Exception as e:
|
||||
logging.warning(f"Error loading classifier: {e}")
|
||||
logging.warning("Categories will not be sorted.")
|
||||
|
||||
return sort_categories
|
||||
|
||||
|
||||
def add_account2(transactions: List[Transaction], categories: List[str]):
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions))
|
||||
def add_account2(
|
||||
model_path: Path, transactions: List[Transaction], categories: List[str]
|
||||
):
|
||||
"""Add account2 to unmapped transactions."""
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
|
||||
if len(unmapped_transactions) == 0:
|
||||
return
|
||||
sort_categories = get_sort_categories()
|
||||
|
||||
sort_categories = get_sort_categories(model_path)
|
||||
for t in unmapped_transactions:
|
||||
sort_categories(t.row, categories)
|
||||
add_account2_interactive(t, categories)
|
||||
|
||||
|
||||
def add_account2_interactive(transaction: Transaction, categories: List[str]):
|
||||
"""Interactively add account2 to a transaction."""
|
||||
t = transaction
|
||||
account2 = None
|
||||
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
|
||||
|
||||
@@ -92,7 +92,7 @@ def process_csv_files(config: Config) -> List[Transaction]:
|
||||
|
||||
mappings = toldg.utils.read_mappings(config.mappings_file)
|
||||
apply_mappings(transactions, mappings)
|
||||
toldg.predict.add_account2(transactions, config.categories)
|
||||
toldg.predict.add_account2(config.model, transactions, config.categories)
|
||||
toldg.utils.write_mappings(transactions, config.mappings_file)
|
||||
toldg.write.render_to_file(transactions, config)
|
||||
return transactions
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
from toldg.models import Config, CsvConfig, Mapping, Transaction
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from toldg.ml_predict import train_classifier
|
||||
from toldg.models import Config
|
||||
from toldg.process import process_csv_files
|
||||
|
||||
|
||||
def train(config: Config):
|
||||
print("[train] start")
|
||||
"""Train a transaction classifier from csv files."""
|
||||
logging.info("[train] Starting transaction classifier training")
|
||||
# Process transactions to get training data
|
||||
transactions = process_csv_files(config)
|
||||
for t in transactions:
|
||||
pass
|
||||
output_path = Path("transaction_classifier.pkl")
|
||||
train_classifier(transactions, config.categories, config.model)
|
||||
logging.info("[train] Training completed")
|
||||
|
||||
@@ -6,8 +6,8 @@ from toldg.utils import category_to_bean
|
||||
|
||||
BEANCOUNT_TRANSACTION_TEMPLATE = """
|
||||
{t.date} * {description}{tags}
|
||||
{t.account2:<40} {t.debit:<6} {t.currency}
|
||||
{t.account1:<40} {t.credit:<6} {t.currency}
|
||||
{account2:<40} {t.debit:<6} {t.currency}
|
||||
{account1:<40} {t.credit:<6} {t.currency}
|
||||
"""
|
||||
|
||||
|
||||
@@ -40,13 +40,15 @@ def format(t):
|
||||
if not t.credit.startswith("-"):
|
||||
t.credit = " " + t.credit
|
||||
|
||||
t.account1 = category_to_bean(t.account1)
|
||||
t.account2 = category_to_bean(t.account2)
|
||||
if t.currency == "EUR":
|
||||
t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",")
|
||||
t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",")
|
||||
return BEANCOUNT_TRANSACTION_TEMPLATE.format(
|
||||
t=t, description=description, tags=tags
|
||||
t=t,
|
||||
description=description,
|
||||
tags=tags,
|
||||
account1=category_to_bean(t.account1),
|
||||
account2=category_to_bean(t.account2),
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user