generated from felixm/defaultpy
Compare commits
2 Commits
3ea2602b03
...
a190ddc524
| Author | SHA1 | Date | |
|---|---|---|---|
| a190ddc524 | |||
| 12408c33f4 |
2003
poetry.lock
generated
2003
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,7 @@
|
||||
import logging
|
||||
import sys
|
||||
import beancount
|
||||
import io
|
||||
|
||||
from rich.logging import RichHandler
|
||||
|
||||
@@ -17,6 +19,38 @@ def init_logging():
|
||||
)
|
||||
|
||||
|
||||
|
||||
def load_and_write_back(filename):
|
||||
entries, errors, options_map = beancount.loader.load_file(filename)
|
||||
|
||||
def beancount_entry_to_string(entry) -> str:
|
||||
buf = io.StringIO()
|
||||
# beancount.parser.printer.print_entry(entry, dcontext=options_map['dcontext'], file=buf)
|
||||
beancount.parser.printer.print_entry(entry, file=buf)
|
||||
return buf.getvalue().strip()
|
||||
|
||||
def is_transaction(entry) -> bool:
|
||||
return isinstance(entry, beancount.core.data.Transaction)
|
||||
|
||||
prev_entry_was_transaction = False
|
||||
if errors:
|
||||
print(f"errors in generated '{filename}'")
|
||||
for err in errors:
|
||||
print(err)
|
||||
else:
|
||||
entries.sort(key=lambda e: e.date)
|
||||
with open(filename, "w") as f:
|
||||
f.write('option "operating_currency" "USD"\n')
|
||||
for entry in entries:
|
||||
if prev_entry_was_transaction:
|
||||
f.write("\n")
|
||||
elif not prev_entry_was_transaction and is_transaction(entry):
|
||||
f.write("\n")
|
||||
f.write(beancount_entry_to_string(entry))
|
||||
f.write("\n")
|
||||
prev_entry_was_transaction = is_transaction(entry)
|
||||
|
||||
|
||||
def main():
|
||||
init_logging()
|
||||
config = load_config()
|
||||
@@ -27,6 +61,7 @@ def main():
|
||||
write_meta(config)
|
||||
process_ldg_files(config)
|
||||
process_csv_files(config)
|
||||
load_and_write_back(config.output_file)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -50,6 +50,7 @@ class Config(BaseModel):
|
||||
output_file: Path = Path("output.ldg")
|
||||
csv_configs: List[CsvConfig]
|
||||
categories: List[str]
|
||||
model: Path = Path("transaction_classifier.pkl")
|
||||
|
||||
|
||||
class Mapping(BaseModel):
|
||||
@@ -80,4 +81,9 @@ class Transaction(BaseModel):
|
||||
description: str
|
||||
csv_file: str
|
||||
row: str
|
||||
index: int
|
||||
mapping: Optional[Mapping] = None
|
||||
|
||||
def key(self):
|
||||
return self.csv_file + ", " + self.row
|
||||
|
||||
|
||||
@@ -1,43 +1,406 @@
|
||||
from typing import List
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import re
|
||||
from collections import Counter, defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from toldg.fzf import iterfzf
|
||||
from toldg.models import UNKNOWN_CATEGORY, Transaction
|
||||
from toldg.models import Transaction
|
||||
|
||||
|
||||
def get_sort_categories():
|
||||
def sort_categories(row: str, categories: List[str]):
|
||||
if learn is None:
|
||||
return
|
||||
_, _, probs = learn.predict(row)
|
||||
cat_to_prob = dict(zip(learn.dls.vocab[1], probs.tolist()))
|
||||
categories.sort(
|
||||
key=lambda c: cat_to_prob[c] if c in cat_to_prob else 0.0, reverse=True
|
||||
class Tokenizer:
|
||||
"""Simple tokenizer for transaction descriptions."""
|
||||
|
||||
def __init__(self, min_count: int = 2, lowercase: bool = True):
|
||||
self.min_count = min_count
|
||||
self.lowercase = lowercase
|
||||
self.vocab = {} # word -> index
|
||||
self.inverse_vocab = {} # index -> word
|
||||
|
||||
def fit(self, texts: List[str]) -> None:
|
||||
"""Build vocabulary from texts."""
|
||||
word_counts = Counter()
|
||||
|
||||
for text in texts:
|
||||
tokens = self._tokenize(text)
|
||||
word_counts.update(tokens)
|
||||
|
||||
# Filter words by minimum count
|
||||
filtered_words = [
|
||||
word for word, count in word_counts.items() if count >= self.min_count
|
||||
]
|
||||
|
||||
# Build vocabulary
|
||||
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
|
||||
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
|
||||
|
||||
def _tokenize(self, text: str) -> List[str]:
|
||||
"""Split text into tokens."""
|
||||
if self.lowercase:
|
||||
text = text.lower()
|
||||
|
||||
# Simple tokenization: alphanumeric sequences
|
||||
tokens = re.findall(r"\b\w+\b", text)
|
||||
return tokens
|
||||
|
||||
def transform(self, text: str) -> Dict[int, int]:
|
||||
"""Convert text to sparse vector (word index -> count)."""
|
||||
tokens = self._tokenize(text)
|
||||
counts = Counter()
|
||||
|
||||
for token in tokens:
|
||||
if token in self.vocab:
|
||||
counts[self.vocab[token]] += 1
|
||||
|
||||
return dict(counts)
|
||||
|
||||
def vocab_size(self) -> int:
|
||||
"""Return vocabulary size."""
|
||||
return len(self.vocab)
|
||||
|
||||
|
||||
class LogisticRegression:
|
||||
"""Multi-class logistic regression classifier."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_dim: int,
|
||||
output_dim: int,
|
||||
learning_rate: float = 0.01,
|
||||
reg_lambda: float = 0.01,
|
||||
max_iterations: int = 1000,
|
||||
):
|
||||
self.input_dim = input_dim
|
||||
self.output_dim = output_dim
|
||||
self.learning_rate = learning_rate
|
||||
self.reg_lambda = reg_lambda
|
||||
self.max_iterations = max_iterations
|
||||
|
||||
# Initialize weights and bias
|
||||
# weights shape: (output_dim, input_dim)
|
||||
self.weights = np.random.randn(output_dim, input_dim) * 0.01
|
||||
self.bias = np.zeros((output_dim, 1))
|
||||
|
||||
def softmax(self, z: np.ndarray) -> np.ndarray:
|
||||
"""Compute softmax function."""
|
||||
# Subtract max for numerical stability
|
||||
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
|
||||
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
|
||||
|
||||
def forward(self, x: np.ndarray) -> np.ndarray:
|
||||
"""Forward pass: compute probabilities."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# output shape: (output_dim, batch_size)
|
||||
z = np.dot(self.weights, x) + self.bias
|
||||
return self.softmax(z)
|
||||
|
||||
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
|
||||
"""Compute cross-entropy loss with L2 regularization."""
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Cross-entropy loss
|
||||
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
|
||||
|
||||
# L2 regularization
|
||||
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
|
||||
|
||||
return ce_loss + reg_loss
|
||||
|
||||
def backward(
|
||||
self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray
|
||||
) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Compute gradients for backpropagation."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Gradient of loss with respect to scores (dL/dz)
|
||||
dz = y_pred - y_true
|
||||
|
||||
# Gradient of loss with respect to weights (dL/dW)
|
||||
dw = (1 / m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
|
||||
|
||||
# Gradient of loss with respect to bias (dL/db)
|
||||
db = (1 / m) * np.sum(dz, axis=1, keepdims=True)
|
||||
|
||||
return dw, db
|
||||
|
||||
def train(
|
||||
self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True
|
||||
) -> List[float]:
|
||||
"""Train model on batched data."""
|
||||
# Convert sparse vectors to dense matrix
|
||||
batch_size = len(x_batch)
|
||||
x_dense = np.zeros((self.input_dim, batch_size))
|
||||
|
||||
for i, x_sparse in enumerate(x_batch):
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, i] = val
|
||||
|
||||
# Convert labels to one-hot encoding
|
||||
y_one_hot = np.zeros((self.output_dim, batch_size))
|
||||
for i, y in enumerate(y_batch):
|
||||
y_one_hot[y, i] = 1
|
||||
|
||||
losses = []
|
||||
|
||||
for iteration in range(self.max_iterations):
|
||||
# Forward pass
|
||||
y_pred = self.forward(x_dense)
|
||||
|
||||
# Compute loss
|
||||
loss = self.compute_loss(y_pred, y_one_hot)
|
||||
losses.append(loss)
|
||||
|
||||
if verbose and iteration % 100 == 0:
|
||||
print(f"Iteration {iteration}: loss = {loss:.4f}")
|
||||
|
||||
# Check convergence
|
||||
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
|
||||
if verbose:
|
||||
print(f"Converged at iteration {iteration}")
|
||||
break
|
||||
|
||||
# Backward pass
|
||||
dw, db = self.backward(x_dense, y_pred, y_one_hot)
|
||||
|
||||
# Update parameters
|
||||
self.weights -= self.learning_rate * dw
|
||||
self.bias -= self.learning_rate * db
|
||||
|
||||
return losses
|
||||
|
||||
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
|
||||
"""Predict class probabilities for a single sparse vector."""
|
||||
# Convert sparse vector to dense vector
|
||||
x_dense = np.zeros((self.input_dim, 1))
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, 0] = val
|
||||
|
||||
# Forward pass
|
||||
return self.forward(x_dense).flatten()
|
||||
|
||||
def predict(self, x_sparse: Dict[int, int]) -> int:
|
||||
"""Predict class for a single sparse vector."""
|
||||
probs = self.predict_proba(x_sparse)
|
||||
return np.argmax(probs)
|
||||
|
||||
|
||||
class TransactionClassifier:
|
||||
"""Transaction classifier using Bag of Words and Logistic Regression."""
|
||||
|
||||
def __init__(self, model_path: Optional[Path] = None):
|
||||
self.tokenizer = None
|
||||
self.model = None
|
||||
self.categories = []
|
||||
self.category_to_idx = {}
|
||||
self.idx_to_category = {}
|
||||
|
||||
if model_path and os.path.exists(model_path):
|
||||
self.load(model_path)
|
||||
|
||||
def fit(
|
||||
self,
|
||||
transactions: List[Transaction],
|
||||
categories: List[str],
|
||||
verbose: bool = True,
|
||||
) -> None:
|
||||
"""Train classifier on transactions."""
|
||||
# Extract texts and labels
|
||||
texts = [t.row for t in transactions]
|
||||
labels = [t.account2 for t in transactions]
|
||||
|
||||
# Build category mapping
|
||||
self.categories = sorted(set(categories))
|
||||
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
|
||||
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
|
||||
|
||||
# Map labels to indices
|
||||
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
|
||||
|
||||
# Initialize and fit tokenizer
|
||||
self.tokenizer = Tokenizer(min_count=2)
|
||||
self.tokenizer.fit(texts)
|
||||
|
||||
if verbose:
|
||||
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
|
||||
|
||||
# Transform texts to feature vectors
|
||||
x_vectors = [self.tokenizer.transform(text) for text in texts]
|
||||
|
||||
# Initialize and train model
|
||||
self.model = LogisticRegression(
|
||||
input_dim=self.tokenizer.vocab_size(),
|
||||
output_dim=len(self.categories),
|
||||
learning_rate=0.05,
|
||||
reg_lambda=0.01,
|
||||
max_iterations=3000,
|
||||
)
|
||||
|
||||
learn = None
|
||||
try:
|
||||
from fastai.text.all import load_learner
|
||||
self.model.train(x_vectors, label_indices, verbose=verbose)
|
||||
|
||||
learn = load_learner("export.pkl")
|
||||
except ModuleNotFoundError:
|
||||
user_input = input("No fastai module. Type yes to continue anyway.")
|
||||
if user_input.strip().lower() != "yes":
|
||||
raise Exception("fastai module missing")
|
||||
def predict(self, text: str) -> Tuple[str, float, List[float]]:
|
||||
"""
|
||||
Predict category for a transaction text.
|
||||
|
||||
Returns:
|
||||
tuple: (predicted_category, confidence, all_probabilities)
|
||||
"""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Get predicted class
|
||||
pred_idx = np.argmax(probs)
|
||||
pred_category = self.idx_to_category[pred_idx]
|
||||
confidence = probs[pred_idx]
|
||||
|
||||
return pred_category, confidence, probs
|
||||
|
||||
def sort_categories(self, text: str, categories: List[str]) -> None:
|
||||
"""Sort categories by prediction probability for given text."""
|
||||
if not self.model or not self.tokenizer:
|
||||
return
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Create mapping from category to probability
|
||||
cat_to_prob = {}
|
||||
for idx, prob in enumerate(probs):
|
||||
if idx in self.idx_to_category:
|
||||
cat = self.idx_to_category[idx]
|
||||
cat_to_prob[cat] = prob
|
||||
|
||||
# Sort categories by probability
|
||||
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
|
||||
|
||||
def save(self, path: Path) -> None:
|
||||
"""Save model to file."""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
model_data = {
|
||||
"tokenizer_vocab": self.tokenizer.vocab,
|
||||
"tokenizer_inverse_vocab": self.tokenizer.inverse_vocab,
|
||||
"tokenizer_min_count": self.tokenizer.min_count,
|
||||
"tokenizer_lowercase": self.tokenizer.lowercase,
|
||||
"model_weights": self.model.weights,
|
||||
"model_bias": self.model.bias,
|
||||
"categories": self.categories,
|
||||
"category_to_idx": self.category_to_idx,
|
||||
"idx_to_category": self.idx_to_category,
|
||||
}
|
||||
|
||||
with open(path, "wb") as f:
|
||||
pickle.dump(model_data, f)
|
||||
|
||||
def load(self, path: Path) -> None:
|
||||
"""Load model from file."""
|
||||
with open(path, "rb") as f:
|
||||
model_data = pickle.load(f)
|
||||
|
||||
# Restore tokenizer
|
||||
self.tokenizer = Tokenizer(
|
||||
min_count=model_data["tokenizer_min_count"],
|
||||
lowercase=model_data["tokenizer_lowercase"],
|
||||
)
|
||||
self.tokenizer.vocab = model_data["tokenizer_vocab"]
|
||||
self.tokenizer.inverse_vocab = model_data["tokenizer_inverse_vocab"]
|
||||
|
||||
# Restore categories
|
||||
self.categories = model_data["categories"]
|
||||
self.category_to_idx = model_data["category_to_idx"]
|
||||
self.idx_to_category = model_data["idx_to_category"]
|
||||
|
||||
# Restore model
|
||||
input_dim = len(self.tokenizer.vocab)
|
||||
output_dim = len(self.categories)
|
||||
self.model = LogisticRegression(input_dim, output_dim)
|
||||
self.model.weights = model_data["model_weights"]
|
||||
self.model.bias = model_data["model_bias"]
|
||||
|
||||
|
||||
def train_classifier(
|
||||
transactions: List[Transaction],
|
||||
categories: List[str],
|
||||
output_path: Path = Path("transaction_classifier.pkl"),
|
||||
):
|
||||
"""Train transaction classifier and save to file."""
|
||||
global _classifier
|
||||
|
||||
# Filter transactions with account2
|
||||
valid_transactions = [t for t in transactions if t.account2 in categories]
|
||||
|
||||
if len(valid_transactions) < 10:
|
||||
logging.warning("Not enough transactions for training. Need at least 10.")
|
||||
return
|
||||
|
||||
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
|
||||
|
||||
# Initialize and train classifier
|
||||
_classifier = TransactionClassifier()
|
||||
_classifier.fit(valid_transactions, categories, verbose=True)
|
||||
|
||||
# Save classifier
|
||||
_classifier.save(output_path)
|
||||
logging.info(f"Classifier saved to {output_path}")
|
||||
|
||||
|
||||
def get_sort_categories(model_path: Path):
|
||||
"""Get function to sort categories by prediction probability."""
|
||||
_classifier = None
|
||||
|
||||
def sort_categories(row: str, categories: List[str]):
|
||||
if _classifier is None:
|
||||
return
|
||||
_classifier.sort_categories(row, categories)
|
||||
|
||||
try:
|
||||
|
||||
model_path = Path("transaction_classifier.pkl")
|
||||
_classifier = TransactionClassifier(model_path)
|
||||
if _classifier.model is None:
|
||||
logging.warning("No trained model found. Categories will not be sorted.")
|
||||
except Exception as e:
|
||||
logging.warning(f"Error loading classifier: {e}")
|
||||
logging.warning("Categories will not be sorted.")
|
||||
|
||||
return sort_categories
|
||||
|
||||
|
||||
def add_account2(transactions: List[Transaction], categories: List[str]):
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping == None, transactions))
|
||||
def add_account2(
|
||||
model_path: Path, transactions: List[Transaction], categories: List[str]
|
||||
):
|
||||
"""Add account2 to unmapped transactions."""
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
|
||||
if len(unmapped_transactions) == 0:
|
||||
return
|
||||
sort_categories = get_sort_categories()
|
||||
|
||||
sort_categories = get_sort_categories(model_path)
|
||||
for t in unmapped_transactions:
|
||||
sort_categories(t.row, categories)
|
||||
add_account2_interactive(t, categories)
|
||||
|
||||
|
||||
def add_account2_interactive(transaction: Transaction, categories: List[str]):
|
||||
"""Interactively add account2 to a transaction."""
|
||||
t = transaction
|
||||
account2 = None
|
||||
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
|
||||
|
||||
@@ -3,7 +3,6 @@ import datetime
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import toldg.models
|
||||
import toldg.predict
|
||||
@@ -19,7 +18,7 @@ def process_ldg_files(config: Config):
|
||||
f_out.write(f_in.read())
|
||||
|
||||
|
||||
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
||||
def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
|
||||
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
|
||||
if not cs:
|
||||
logging.critical(f"No CSV config for {csv_file}.")
|
||||
@@ -30,7 +29,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
||||
return cs[0]
|
||||
|
||||
|
||||
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
|
||||
def date_to_date(date: str) -> str:
|
||||
d = datetime.datetime.strptime(date, config.input_date_format)
|
||||
return d.strftime(config.output_date_format)
|
||||
@@ -38,7 +37,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
def flip_sign(amount: str) -> str:
|
||||
return amount[1:] if amount.startswith("-") else "-" + amount
|
||||
|
||||
def row_to_transaction(row, fields):
|
||||
def row_to_transaction(idx, row, fields):
|
||||
"""The user can configure the mapping of CSV fields to the three
|
||||
required fields date, amount and description via the CsvConfig."""
|
||||
t = {field: row[index] for index, field in fields}
|
||||
@@ -52,7 +51,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
account2=toldg.models.UNKNOWN_CATEGORY,
|
||||
description=t["description"],
|
||||
csv_file=csv_file,
|
||||
row=csv_file + ", " + ", ".join(row),
|
||||
row=", ".join(row),
|
||||
index=idx,
|
||||
)
|
||||
|
||||
fields = [(i, f) for i, f in enumerate(config.fields) if f]
|
||||
@@ -60,15 +60,17 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
|
||||
for _ in range(config.skip):
|
||||
next(reader)
|
||||
transactions = [row_to_transaction(row, fields) for row in reader if row]
|
||||
rows = [row for row in reader if row]
|
||||
transactions = [row_to_transaction(i, row, fields)
|
||||
for i, row in enumerate(reversed(rows))]
|
||||
return transactions
|
||||
|
||||
|
||||
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
|
||||
def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
|
||||
"""Apply mappings to transactions."""
|
||||
for t in transactions:
|
||||
if t.row in mappings:
|
||||
mapping = mappings[t.row]
|
||||
if t.key() in mappings:
|
||||
mapping = mappings[t.key()]
|
||||
assert isinstance(mapping, Mapping)
|
||||
assert (
|
||||
mapping.count > 0
|
||||
@@ -82,7 +84,7 @@ def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]
|
||||
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
|
||||
|
||||
|
||||
def process_csv_files(config: Config) -> List[Transaction]:
|
||||
def process_csv_files(config: Config) -> list[Transaction]:
|
||||
csv_files = toldg.utils.get_csv_files(config.input_directory)
|
||||
transactions = []
|
||||
for csv_file in csv_files:
|
||||
@@ -92,7 +94,7 @@ def process_csv_files(config: Config) -> List[Transaction]:
|
||||
|
||||
mappings = toldg.utils.read_mappings(config.mappings_file)
|
||||
apply_mappings(transactions, mappings)
|
||||
toldg.predict.add_account2(transactions, config.categories)
|
||||
toldg.predict.add_account2(config.model, transactions, config.categories)
|
||||
toldg.utils.write_mappings(transactions, config.mappings_file)
|
||||
toldg.write.render_to_file(transactions, config)
|
||||
return transactions
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
from toldg.models import Config, CsvConfig, Mapping, Transaction
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from toldg.models import Config
|
||||
from toldg.predict import train_classifier
|
||||
from toldg.process import process_csv_files
|
||||
|
||||
|
||||
def train(config: Config):
|
||||
print("[train] start")
|
||||
"""Train a transaction classifier from csv files."""
|
||||
# Process transactions to get training data
|
||||
transactions = process_csv_files(config)
|
||||
for t in transactions:
|
||||
pass
|
||||
train_classifier(transactions, config.categories, config.model)
|
||||
logging.info("Training completed")
|
||||
|
||||
@@ -70,7 +70,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
|
||||
|
||||
mappings = read_mappings(mappings_file)
|
||||
for t in transactions:
|
||||
if t.row in mappings:
|
||||
if t.key() in mappings:
|
||||
pass
|
||||
else:
|
||||
mapping = Mapping(
|
||||
@@ -79,7 +79,7 @@ def write_mappings(transactions: List[Transaction], mappings_file: Path):
|
||||
"narration": t.description,
|
||||
}
|
||||
)
|
||||
mappings[t.row] = mapping
|
||||
mappings[t.key()] = mapping
|
||||
|
||||
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
|
||||
with open(mappings_file, "w") as f:
|
||||
|
||||
@@ -6,14 +6,16 @@ from toldg.utils import category_to_bean
|
||||
|
||||
BEANCOUNT_TRANSACTION_TEMPLATE = """
|
||||
{t.date} * {description}{tags}
|
||||
{t.account2:<40} {t.debit:<6} {t.currency}
|
||||
{t.account1:<40} {t.credit:<6} {t.currency}
|
||||
{account2:<40} {t.debit:<6} {t.currency}
|
||||
{account1:<40} {t.credit:<6} {t.currency}
|
||||
source_file: "{t.csv_file}"
|
||||
source_index: {t.index}
|
||||
source_row: "{t.row}"
|
||||
"""
|
||||
|
||||
|
||||
def format(t):
|
||||
t.date = t.date.replace("/", "-")
|
||||
|
||||
tags = ""
|
||||
description = None
|
||||
if t.mapping:
|
||||
@@ -40,13 +42,15 @@ def format(t):
|
||||
if not t.credit.startswith("-"):
|
||||
t.credit = " " + t.credit
|
||||
|
||||
t.account1 = category_to_bean(t.account1)
|
||||
t.account2 = category_to_bean(t.account2)
|
||||
if t.currency == "EUR":
|
||||
t.debit = t.debit.replace(".", "|").replace(",", ".").replace("|", ",")
|
||||
t.credit = t.credit.replace(".", "|").replace(",", ".").replace("|", ",")
|
||||
return BEANCOUNT_TRANSACTION_TEMPLATE.format(
|
||||
t=t, description=description, tags=tags
|
||||
t=t,
|
||||
description=description,
|
||||
tags=tags,
|
||||
account1=category_to_bean(t.account1),
|
||||
account2=category_to_bean(t.account2),
|
||||
)
|
||||
|
||||
|
||||
@@ -54,3 +58,4 @@ def render_to_file(transactions: List[Transaction], config: Config):
|
||||
content = "".join(format(t) for t in transactions)
|
||||
with open(config.output_file, "a") as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user