generated from felixm/defaultpy
Compare commits
5 Commits
3ea2602b03
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| fbab1c9174 | |||
| 54871d04cd | |||
| f56c559c84 | |||
| a190ddc524 | |||
| 12408c33f4 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,4 +1,6 @@
|
||||
CLAUDE.md
|
||||
# ---> Python
|
||||
uv.lock
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
||||
2074
poetry.lock
generated
2074
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -7,35 +7,28 @@ name = "toldg"
|
||||
version = "0.1.0"
|
||||
description = "Tool to generate ledger files from csv"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.12,<4.0"
|
||||
requires-python = ">=3.13,<4.0"
|
||||
license = {text = "MIT"}
|
||||
authors = [
|
||||
{name = "Felix Martin", email = "mail@felixm.de"}
|
||||
]
|
||||
dependencies = [
|
||||
"fava (>=1.30.1,<2.0.0)",
|
||||
"pydantic (>=2.10.6,<3.0.0)",
|
||||
"beancount (>=3.1.0,<4.0.0)",
|
||||
"rich (>=13.9.4,<14.0.0)",
|
||||
"numpy (>=2.2.3,<3.0.0)"
|
||||
"fava",
|
||||
"pydantic",
|
||||
"beancount",
|
||||
"rich",
|
||||
"numpy",
|
||||
"ty",
|
||||
"ruff",
|
||||
]
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pre-commit = "^4.1.0"
|
||||
black = "^25.1.0"
|
||||
isort = "^6.0.1"
|
||||
pytest = "^8.3.4"
|
||||
|
||||
[project.scripts]
|
||||
toldg = "toldg.__main__:main"
|
||||
|
||||
[tool.setuptools]
|
||||
package-dir = {"" = "src"}
|
||||
|
||||
[tool.black]
|
||||
line-length = 88
|
||||
target-version = ["py312"]
|
||||
[tool.ruff]
|
||||
target-version = "py313"
|
||||
line-length = 100
|
||||
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
line_length = 88
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
import logging
|
||||
import sys
|
||||
import beancount
|
||||
import io
|
||||
|
||||
from rich.logging import RichHandler
|
||||
|
||||
from toldg.process import process_csv_files, process_ldg_files
|
||||
from toldg.train import train
|
||||
from toldg.utils import load_config, remove_if_exists, write_meta
|
||||
from toldg.utils import load_config, remove_if_exists
|
||||
from toldg.models import Config
|
||||
from toldg.write import render_to_file
|
||||
|
||||
|
||||
def init_logging():
|
||||
@@ -17,16 +21,86 @@ def init_logging():
|
||||
)
|
||||
|
||||
|
||||
def get_new_transactions(transactions: list, csv_transactions: list) -> list:
|
||||
key_to_transaction = {
|
||||
(transaction.meta["source_file"], transaction.meta["source_index"]): transaction
|
||||
for transaction in transactions
|
||||
}
|
||||
assert len(transactions) == len(key_to_transaction), "Transaction keys must be unique"
|
||||
|
||||
new_transactions = []
|
||||
for csv_transaction in csv_transactions:
|
||||
key = (csv_transaction.csv_file, csv_transaction.index)
|
||||
if key in key_to_transaction:
|
||||
existing_transaction = key_to_transaction[key]
|
||||
if existing_transaction.meta["source_row"] != csv_transaction.row:
|
||||
msg = f"Consistency error: CSV transaction {csv_transaction} is different to {existing_transaction}"
|
||||
logging.error(msg)
|
||||
raise SystemExit(1)
|
||||
else:
|
||||
new_transactions.append(csv_transaction)
|
||||
logging.info(f"Got {len(new_transactions)} new and {len(transactions)} existing transactions.")
|
||||
return new_transactions
|
||||
|
||||
|
||||
def update_ledger(config: Config):
|
||||
def beancount_entry_to_string(entry) -> str:
|
||||
buf = io.StringIO()
|
||||
beancount.parser.printer.print_entry(entry, file=buf)
|
||||
return buf.getvalue().strip()
|
||||
|
||||
def is_transaction(transaction) -> bool:
|
||||
return isinstance(transaction, beancount.core.data.Transaction)
|
||||
|
||||
filename = config.output_file
|
||||
transactions, errors, options_map = beancount.loader.load_file(filename)
|
||||
|
||||
if errors:
|
||||
logging.error(f"errors in '{filename}'")
|
||||
for err in errors:
|
||||
logging.error(err)
|
||||
raise SystemExit(1)
|
||||
|
||||
transactions.sort(key=lambda e: e.date)
|
||||
|
||||
# Note(felixm): Only write back transactions from the main beancount file.
|
||||
# The issue is that `beancount.loader.load_file` does not allow for a full
|
||||
# round trip; some of the transactions get swallowed. Therefore, treat all files
|
||||
# that are not the main beancount file as input only files. This means
|
||||
# these input only files can only be edited by hand, but the user can use
|
||||
# them to set options for beancount and fava, and add other types of
|
||||
# transactions that would otherwise disappear after the round trip. I have seen
|
||||
# tickets on GitHub about changing this API so that everything can be
|
||||
# written back as is, but until then, this works well for my use-case.
|
||||
transactions = [e for e in transactions if e.meta["filename"] == str(filename.absolute())]
|
||||
|
||||
csv_transactions = process_csv_files(config)
|
||||
new_transactions = get_new_transactions(transactions, csv_transactions)
|
||||
remove_if_exists(config.output_file)
|
||||
process_ldg_files(config)
|
||||
|
||||
with open(filename, "a") as f:
|
||||
prev_item_was_transaction = False
|
||||
for transaction in transactions:
|
||||
if prev_item_was_transaction:
|
||||
f.write("\n")
|
||||
elif not prev_item_was_transaction and is_transaction(transaction):
|
||||
f.write("\n")
|
||||
f.write(beancount_entry_to_string(transaction))
|
||||
f.write("\n")
|
||||
prev_item_was_transaction = is_transaction(transaction)
|
||||
|
||||
render_to_file(new_transactions, config)
|
||||
logging.info(f"Ledger file '{filename}' was written successfully.")
|
||||
|
||||
|
||||
def main():
|
||||
init_logging()
|
||||
config = load_config()
|
||||
if len(sys.argv) > 2 and sys.argv[2] == "train":
|
||||
train(config)
|
||||
else:
|
||||
remove_if_exists(config.output_file)
|
||||
write_meta(config)
|
||||
process_ldg_files(config)
|
||||
process_csv_files(config)
|
||||
update_ledger(config)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -5,12 +5,12 @@ import sys
|
||||
EXECUTABLE_NAME = "fzf.exe" if sys.platform == "win32" else "fzf"
|
||||
|
||||
|
||||
def iterfzf(iterable, prompt="> "):
|
||||
cmd = [EXECUTABLE_NAME, "--prompt=" + prompt]
|
||||
def iterfzf(iterable, prompt="> ", header=None, height="50%"):
|
||||
cmd = [EXECUTABLE_NAME, "--prompt=" + prompt, "--height=" + height, "--reverse"]
|
||||
if header:
|
||||
cmd.append("--header=" + header)
|
||||
encoding = sys.getdefaultencoding()
|
||||
proc = subprocess.Popen(
|
||||
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None
|
||||
)
|
||||
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=None)
|
||||
if proc.stdin is None:
|
||||
return None
|
||||
try:
|
||||
@@ -24,7 +24,10 @@ def iterfzf(iterable, prompt="> "):
|
||||
return None
|
||||
if proc.stdout is None:
|
||||
return None
|
||||
decode = lambda t: t.decode(encoding)
|
||||
|
||||
def decode(t):
|
||||
return t.decode(encoding)
|
||||
|
||||
output = [decode(ln.strip(b"\r\n\0")) for ln in iter(proc.stdout.readline, b"")]
|
||||
try:
|
||||
return output[0]
|
||||
|
||||
@@ -1,393 +0,0 @@
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import pickle
|
||||
import os
|
||||
from collections import Counter, defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple, Set, Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
from toldg.fzf import iterfzf
|
||||
from toldg.models import Transaction
|
||||
|
||||
|
||||
class Tokenizer:
|
||||
"""Simple tokenizer for transaction descriptions."""
|
||||
|
||||
def __init__(self, min_count: int = 2, lowercase: bool = True):
|
||||
self.min_count = min_count
|
||||
self.lowercase = lowercase
|
||||
self.vocab = {} # word -> index
|
||||
self.inverse_vocab = {} # index -> word
|
||||
|
||||
def fit(self, texts: List[str]) -> None:
|
||||
"""Build vocabulary from texts."""
|
||||
word_counts = Counter()
|
||||
|
||||
for text in texts:
|
||||
tokens = self._tokenize(text)
|
||||
word_counts.update(tokens)
|
||||
|
||||
# Filter words by minimum count
|
||||
filtered_words = [word for word, count in word_counts.items()
|
||||
if count >= self.min_count]
|
||||
|
||||
# Build vocabulary
|
||||
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
|
||||
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
|
||||
|
||||
def _tokenize(self, text: str) -> List[str]:
|
||||
"""Split text into tokens."""
|
||||
if self.lowercase:
|
||||
text = text.lower()
|
||||
|
||||
# Simple tokenization: alphanumeric sequences
|
||||
tokens = re.findall(r'\b\w+\b', text)
|
||||
return tokens
|
||||
|
||||
def transform(self, text: str) -> Dict[int, int]:
|
||||
"""Convert text to sparse vector (word index -> count)."""
|
||||
tokens = self._tokenize(text)
|
||||
counts = Counter()
|
||||
|
||||
for token in tokens:
|
||||
if token in self.vocab:
|
||||
counts[self.vocab[token]] += 1
|
||||
|
||||
return dict(counts)
|
||||
|
||||
def vocab_size(self) -> int:
|
||||
"""Return vocabulary size."""
|
||||
return len(self.vocab)
|
||||
|
||||
|
||||
class LogisticRegression:
|
||||
"""Multi-class logistic regression classifier."""
|
||||
|
||||
def __init__(self, input_dim: int, output_dim: int,
|
||||
learning_rate: float = 0.01,
|
||||
reg_lambda: float = 0.01,
|
||||
max_iterations: int = 1000):
|
||||
self.input_dim = input_dim
|
||||
self.output_dim = output_dim
|
||||
self.learning_rate = learning_rate
|
||||
self.reg_lambda = reg_lambda
|
||||
self.max_iterations = max_iterations
|
||||
|
||||
# Initialize weights and bias
|
||||
# weights shape: (output_dim, input_dim)
|
||||
self.weights = np.random.randn(output_dim, input_dim) * 0.01
|
||||
self.bias = np.zeros((output_dim, 1))
|
||||
|
||||
def softmax(self, z: np.ndarray) -> np.ndarray:
|
||||
"""Compute softmax function."""
|
||||
# Subtract max for numerical stability
|
||||
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
|
||||
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
|
||||
|
||||
def forward(self, x: np.ndarray) -> np.ndarray:
|
||||
"""Forward pass: compute probabilities."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# output shape: (output_dim, batch_size)
|
||||
z = np.dot(self.weights, x) + self.bias
|
||||
return self.softmax(z)
|
||||
|
||||
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
|
||||
"""Compute cross-entropy loss with L2 regularization."""
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Cross-entropy loss
|
||||
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
|
||||
|
||||
# L2 regularization
|
||||
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
|
||||
|
||||
return ce_loss + reg_loss
|
||||
|
||||
def backward(self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Compute gradients for backpropagation."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Gradient of loss with respect to scores (dL/dz)
|
||||
dz = y_pred - y_true
|
||||
|
||||
# Gradient of loss with respect to weights (dL/dW)
|
||||
dw = (1/m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
|
||||
|
||||
# Gradient of loss with respect to bias (dL/db)
|
||||
db = (1/m) * np.sum(dz, axis=1, keepdims=True)
|
||||
|
||||
return dw, db
|
||||
|
||||
def train(self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True) -> List[float]:
|
||||
"""Train model on batched data."""
|
||||
# Convert sparse vectors to dense matrix
|
||||
batch_size = len(x_batch)
|
||||
x_dense = np.zeros((self.input_dim, batch_size))
|
||||
|
||||
for i, x_sparse in enumerate(x_batch):
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, i] = val
|
||||
|
||||
# Convert labels to one-hot encoding
|
||||
y_one_hot = np.zeros((self.output_dim, batch_size))
|
||||
for i, y in enumerate(y_batch):
|
||||
y_one_hot[y, i] = 1
|
||||
|
||||
losses = []
|
||||
|
||||
for iteration in range(self.max_iterations):
|
||||
# Forward pass
|
||||
y_pred = self.forward(x_dense)
|
||||
|
||||
# Compute loss
|
||||
loss = self.compute_loss(y_pred, y_one_hot)
|
||||
losses.append(loss)
|
||||
|
||||
if verbose and iteration % 100 == 0:
|
||||
print(f"Iteration {iteration}: loss = {loss:.4f}")
|
||||
|
||||
# Check convergence
|
||||
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
|
||||
if verbose:
|
||||
print(f"Converged at iteration {iteration}")
|
||||
break
|
||||
|
||||
# Backward pass
|
||||
dw, db = self.backward(x_dense, y_pred, y_one_hot)
|
||||
|
||||
# Update parameters
|
||||
self.weights -= self.learning_rate * dw
|
||||
self.bias -= self.learning_rate * db
|
||||
|
||||
return losses
|
||||
|
||||
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
|
||||
"""Predict class probabilities for a single sparse vector."""
|
||||
# Convert sparse vector to dense vector
|
||||
x_dense = np.zeros((self.input_dim, 1))
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, 0] = val
|
||||
|
||||
# Forward pass
|
||||
return self.forward(x_dense).flatten()
|
||||
|
||||
def predict(self, x_sparse: Dict[int, int]) -> int:
|
||||
"""Predict class for a single sparse vector."""
|
||||
probs = self.predict_proba(x_sparse)
|
||||
return np.argmax(probs)
|
||||
|
||||
|
||||
class TransactionClassifier:
|
||||
"""Transaction classifier using Bag of Words and Logistic Regression."""
|
||||
|
||||
def __init__(self, model_path: Optional[Path] = None):
|
||||
self.tokenizer = None
|
||||
self.model = None
|
||||
self.categories = []
|
||||
self.category_to_idx = {}
|
||||
self.idx_to_category = {}
|
||||
|
||||
if model_path and os.path.exists(model_path):
|
||||
self.load(model_path)
|
||||
|
||||
def fit(self, transactions: List[Transaction], categories: List[str], verbose: bool = True) -> None:
|
||||
"""Train classifier on transactions."""
|
||||
# Extract texts and labels
|
||||
texts = [t.row for t in transactions]
|
||||
labels = [t.account2 for t in transactions]
|
||||
|
||||
# Build category mapping
|
||||
self.categories = sorted(set(categories))
|
||||
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
|
||||
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
|
||||
|
||||
# Map labels to indices
|
||||
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
|
||||
|
||||
# Initialize and fit tokenizer
|
||||
self.tokenizer = Tokenizer(min_count=2)
|
||||
self.tokenizer.fit(texts)
|
||||
|
||||
if verbose:
|
||||
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
|
||||
|
||||
# Transform texts to feature vectors
|
||||
x_vectors = [self.tokenizer.transform(text) for text in texts]
|
||||
|
||||
# Initialize and train model
|
||||
self.model = LogisticRegression(
|
||||
input_dim=self.tokenizer.vocab_size(),
|
||||
output_dim=len(self.categories),
|
||||
learning_rate=0.05,
|
||||
reg_lambda=0.01,
|
||||
max_iterations=2000
|
||||
)
|
||||
|
||||
self.model.train(x_vectors, label_indices, verbose=verbose)
|
||||
|
||||
def predict(self, text: str) -> Tuple[str, float, List[float]]:
|
||||
"""
|
||||
Predict category for a transaction text.
|
||||
|
||||
Returns:
|
||||
tuple: (predicted_category, confidence, all_probabilities)
|
||||
"""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Get predicted class
|
||||
pred_idx = np.argmax(probs)
|
||||
pred_category = self.idx_to_category[pred_idx]
|
||||
confidence = probs[pred_idx]
|
||||
|
||||
return pred_category, confidence, probs
|
||||
|
||||
def sort_categories(self, text: str, categories: List[str]) -> None:
|
||||
"""Sort categories by prediction probability for given text."""
|
||||
if not self.model or not self.tokenizer:
|
||||
return
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Create mapping from category to probability
|
||||
cat_to_prob = {}
|
||||
for idx, prob in enumerate(probs):
|
||||
if idx in self.idx_to_category:
|
||||
cat = self.idx_to_category[idx]
|
||||
cat_to_prob[cat] = prob
|
||||
|
||||
# Sort categories by probability
|
||||
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
|
||||
|
||||
def save(self, path: Path) -> None:
|
||||
"""Save model to file."""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
model_data = {
|
||||
'tokenizer_vocab': self.tokenizer.vocab,
|
||||
'tokenizer_inverse_vocab': self.tokenizer.inverse_vocab,
|
||||
'tokenizer_min_count': self.tokenizer.min_count,
|
||||
'tokenizer_lowercase': self.tokenizer.lowercase,
|
||||
'model_weights': self.model.weights,
|
||||
'model_bias': self.model.bias,
|
||||
'categories': self.categories,
|
||||
'category_to_idx': self.category_to_idx,
|
||||
'idx_to_category': self.idx_to_category
|
||||
}
|
||||
|
||||
with open(path, 'wb') as f:
|
||||
pickle.dump(model_data, f)
|
||||
|
||||
def load(self, path: Path) -> None:
|
||||
"""Load model from file."""
|
||||
with open(path, 'rb') as f:
|
||||
model_data = pickle.load(f)
|
||||
|
||||
# Restore tokenizer
|
||||
self.tokenizer = Tokenizer(
|
||||
min_count=model_data['tokenizer_min_count'],
|
||||
lowercase=model_data['tokenizer_lowercase']
|
||||
)
|
||||
self.tokenizer.vocab = model_data['tokenizer_vocab']
|
||||
self.tokenizer.inverse_vocab = model_data['tokenizer_inverse_vocab']
|
||||
|
||||
# Restore categories
|
||||
self.categories = model_data['categories']
|
||||
self.category_to_idx = model_data['category_to_idx']
|
||||
self.idx_to_category = model_data['idx_to_category']
|
||||
|
||||
# Restore model
|
||||
input_dim = len(self.tokenizer.vocab)
|
||||
output_dim = len(self.categories)
|
||||
self.model = LogisticRegression(input_dim, output_dim)
|
||||
self.model.weights = model_data['model_weights']
|
||||
self.model.bias = model_data['model_bias']
|
||||
|
||||
|
||||
# Global classifier instance
|
||||
_classifier = None
|
||||
|
||||
|
||||
def get_sort_categories():
|
||||
"""Get function to sort categories by prediction probability."""
|
||||
global _classifier
|
||||
|
||||
def sort_categories(row: str, categories: List[str]):
|
||||
if _classifier is None:
|
||||
return
|
||||
_classifier.sort_categories(row, categories)
|
||||
|
||||
try:
|
||||
model_path = Path("transaction_classifier.pkl")
|
||||
_classifier = TransactionClassifier(model_path)
|
||||
if _classifier.model is None:
|
||||
logging.warning("No trained model found. Categories will not be sorted.")
|
||||
except Exception as e:
|
||||
logging.warning(f"Error loading classifier: {e}")
|
||||
logging.warning("Categories will not be sorted.")
|
||||
|
||||
return sort_categories
|
||||
|
||||
|
||||
def add_account2(transactions: List[Transaction], categories: List[str]):
|
||||
"""Add account2 to unmapped transactions."""
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
|
||||
if len(unmapped_transactions) == 0:
|
||||
return
|
||||
|
||||
sort_categories = get_sort_categories()
|
||||
for t in unmapped_transactions:
|
||||
sort_categories(t.row, categories)
|
||||
add_account2_interactive(t, categories)
|
||||
|
||||
|
||||
def add_account2_interactive(transaction: Transaction, categories: List[str]):
|
||||
"""Interactively add account2 to a transaction."""
|
||||
t = transaction
|
||||
account2 = None
|
||||
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
|
||||
while account2 is None:
|
||||
account2 = iterfzf(categories, prompt=prompt)
|
||||
transaction.account2 = account2
|
||||
print(f"Assigned category '{account2}'.")
|
||||
|
||||
|
||||
def train_classifier(transactions: List[Transaction], categories: List[str], output_path: Path = Path("transaction_classifier.pkl")):
|
||||
"""Train transaction classifier and save to file."""
|
||||
global _classifier
|
||||
|
||||
# Filter transactions with account2
|
||||
valid_transactions = [t for t in transactions if t.account2 in categories]
|
||||
|
||||
if len(valid_transactions) < 10:
|
||||
logging.warning("Not enough transactions for training. Need at least 10.")
|
||||
return
|
||||
|
||||
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
|
||||
|
||||
# Initialize and train classifier
|
||||
_classifier = TransactionClassifier()
|
||||
_classifier.fit(valid_transactions, categories, verbose=True)
|
||||
|
||||
# Save classifier
|
||||
_classifier.save(output_path)
|
||||
logging.info(f"Classifier saved to {output_path}")
|
||||
@@ -81,4 +81,8 @@ class Transaction(BaseModel):
|
||||
description: str
|
||||
csv_file: str
|
||||
row: str
|
||||
index: int
|
||||
mapping: Optional[Mapping] = None
|
||||
|
||||
def key(self):
|
||||
return self.csv_file + ", " + self.row
|
||||
|
||||
@@ -1,12 +1,365 @@
|
||||
import logging
|
||||
import os
|
||||
import pickle
|
||||
import re
|
||||
from collections import Counter
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
from toldg.fzf import iterfzf
|
||||
from toldg.ml_predict import TransactionClassifier
|
||||
from toldg.models import Transaction
|
||||
|
||||
|
||||
class Tokenizer:
|
||||
"""Simple tokenizer for transaction descriptions."""
|
||||
|
||||
def __init__(self, min_count: int = 2, lowercase: bool = True):
|
||||
self.min_count = min_count
|
||||
self.lowercase = lowercase
|
||||
self.vocab = {} # word -> index
|
||||
self.inverse_vocab = {} # index -> word
|
||||
|
||||
def fit(self, texts: List[str]) -> None:
|
||||
"""Build vocabulary from texts."""
|
||||
word_counts = Counter()
|
||||
|
||||
for text in texts:
|
||||
tokens = self._tokenize(text)
|
||||
word_counts.update(tokens)
|
||||
|
||||
# Filter words by minimum count
|
||||
filtered_words = [word for word, count in word_counts.items() if count >= self.min_count]
|
||||
|
||||
# Build vocabulary
|
||||
self.vocab = {word: idx for idx, word in enumerate(filtered_words)}
|
||||
self.inverse_vocab = {idx: word for word, idx in self.vocab.items()}
|
||||
|
||||
def _tokenize(self, text: str) -> List[str]:
|
||||
"""Split text into tokens."""
|
||||
if self.lowercase:
|
||||
text = text.lower()
|
||||
|
||||
# Simple tokenization: alphanumeric sequences
|
||||
tokens = re.findall(r"\b\w+\b", text)
|
||||
return tokens
|
||||
|
||||
def transform(self, text: str) -> Dict[int, int]:
|
||||
"""Convert text to sparse vector (word index -> count)."""
|
||||
tokens = self._tokenize(text)
|
||||
counts = Counter()
|
||||
|
||||
for token in tokens:
|
||||
if token in self.vocab:
|
||||
counts[self.vocab[token]] += 1
|
||||
|
||||
return dict(counts)
|
||||
|
||||
def vocab_size(self) -> int:
|
||||
"""Return vocabulary size."""
|
||||
return len(self.vocab)
|
||||
|
||||
|
||||
class LogisticRegression:
|
||||
"""Multi-class logistic regression classifier."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_dim: int,
|
||||
output_dim: int,
|
||||
learning_rate: float = 0.01,
|
||||
reg_lambda: float = 0.01,
|
||||
max_iterations: int = 1000,
|
||||
):
|
||||
self.input_dim = input_dim
|
||||
self.output_dim = output_dim
|
||||
self.learning_rate = learning_rate
|
||||
self.reg_lambda = reg_lambda
|
||||
self.max_iterations = max_iterations
|
||||
|
||||
# Initialize weights and bias
|
||||
# weights shape: (output_dim, input_dim)
|
||||
self.weights = np.random.randn(output_dim, input_dim) * 0.01
|
||||
self.bias = np.zeros((output_dim, 1))
|
||||
|
||||
def softmax(self, z: np.ndarray) -> np.ndarray:
|
||||
"""Compute softmax function."""
|
||||
# Subtract max for numerical stability
|
||||
exp_z = np.exp(z - np.max(z, axis=0, keepdims=True))
|
||||
return exp_z / np.sum(exp_z, axis=0, keepdims=True)
|
||||
|
||||
def forward(self, x: np.ndarray) -> np.ndarray:
|
||||
"""Forward pass: compute probabilities."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# output shape: (output_dim, batch_size)
|
||||
z = np.dot(self.weights, x) + self.bias
|
||||
return self.softmax(z)
|
||||
|
||||
def compute_loss(self, y_pred: np.ndarray, y_true: np.ndarray) -> float:
|
||||
"""Compute cross-entropy loss with L2 regularization."""
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Cross-entropy loss
|
||||
ce_loss = -np.sum(y_true * np.log(y_pred + 1e-8)) / m
|
||||
|
||||
# L2 regularization
|
||||
reg_loss = (self.reg_lambda / (2 * m)) * np.sum(np.square(self.weights))
|
||||
|
||||
return ce_loss + reg_loss
|
||||
|
||||
def backward(
|
||||
self, x: np.ndarray, y_pred: np.ndarray, y_true: np.ndarray
|
||||
) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""Compute gradients for backpropagation."""
|
||||
# x shape: (input_dim, batch_size)
|
||||
# y_pred shape: (output_dim, batch_size)
|
||||
# y_true shape: (output_dim, batch_size) - one-hot encoded
|
||||
m = y_true.shape[1]
|
||||
|
||||
# Gradient of loss with respect to scores (dL/dz)
|
||||
dz = y_pred - y_true
|
||||
|
||||
# Gradient of loss with respect to weights (dL/dW)
|
||||
dw = (1 / m) * np.dot(dz, x.T) + (self.reg_lambda / m) * self.weights
|
||||
|
||||
# Gradient of loss with respect to bias (dL/db)
|
||||
db = (1 / m) * np.sum(dz, axis=1, keepdims=True)
|
||||
|
||||
return dw, db
|
||||
|
||||
def train(
|
||||
self, x_batch: List[Dict[int, int]], y_batch: List[int], verbose: bool = True
|
||||
) -> List[float]:
|
||||
"""Train model on batched data."""
|
||||
# Convert sparse vectors to dense matrix
|
||||
batch_size = len(x_batch)
|
||||
x_dense = np.zeros((self.input_dim, batch_size))
|
||||
|
||||
for i, x_sparse in enumerate(x_batch):
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, i] = val
|
||||
|
||||
# Convert labels to one-hot encoding
|
||||
y_one_hot = np.zeros((self.output_dim, batch_size))
|
||||
for i, y in enumerate(y_batch):
|
||||
y_one_hot[y, i] = 1
|
||||
|
||||
losses = []
|
||||
|
||||
for iteration in range(self.max_iterations):
|
||||
# Forward pass
|
||||
y_pred = self.forward(x_dense)
|
||||
|
||||
# Compute loss
|
||||
loss = self.compute_loss(y_pred, y_one_hot)
|
||||
losses.append(loss)
|
||||
|
||||
if verbose and iteration % 100 == 0:
|
||||
print(f"Iteration {iteration}: loss = {loss:.4f}")
|
||||
|
||||
# Check convergence
|
||||
if iteration > 0 and abs(losses[-1] - losses[-2]) < 1e-5:
|
||||
if verbose:
|
||||
print(f"Converged at iteration {iteration}")
|
||||
break
|
||||
|
||||
# Backward pass
|
||||
dw, db = self.backward(x_dense, y_pred, y_one_hot)
|
||||
|
||||
# Update parameters
|
||||
self.weights -= self.learning_rate * dw
|
||||
self.bias -= self.learning_rate * db
|
||||
|
||||
return losses
|
||||
|
||||
def predict_proba(self, x_sparse: Dict[int, int]) -> np.ndarray:
|
||||
"""Predict class probabilities for a single sparse vector."""
|
||||
# Convert sparse vector to dense vector
|
||||
x_dense = np.zeros((self.input_dim, 1))
|
||||
for idx, val in x_sparse.items():
|
||||
x_dense[idx, 0] = val
|
||||
|
||||
# Forward pass
|
||||
return self.forward(x_dense).flatten()
|
||||
|
||||
def predict(self, x_sparse: Dict[int, int]) -> int:
|
||||
"""Predict class for a single sparse vector."""
|
||||
probs = self.predict_proba(x_sparse)
|
||||
return np.argmax(probs)
|
||||
|
||||
|
||||
class TransactionClassifier:
|
||||
"""Transaction classifier using Bag of Words and Logistic Regression."""
|
||||
|
||||
def __init__(self, model_path: Optional[Path] = None):
|
||||
self.tokenizer = None
|
||||
self.model = None
|
||||
self.categories = []
|
||||
self.category_to_idx = {}
|
||||
self.idx_to_category = {}
|
||||
|
||||
if model_path and os.path.exists(model_path):
|
||||
self.load(model_path)
|
||||
|
||||
def fit(
|
||||
self,
|
||||
transactions: List[Transaction],
|
||||
categories: List[str],
|
||||
verbose: bool = True,
|
||||
) -> None:
|
||||
"""Train classifier on transactions."""
|
||||
# Extract texts and labels
|
||||
texts = [t.row for t in transactions]
|
||||
labels = [t.account2 for t in transactions]
|
||||
|
||||
# Build category mapping
|
||||
self.categories = sorted(set(categories))
|
||||
self.category_to_idx = {cat: idx for idx, cat in enumerate(self.categories)}
|
||||
self.idx_to_category = {idx: cat for cat, idx in self.category_to_idx.items()}
|
||||
|
||||
# Map labels to indices
|
||||
label_indices = [self.category_to_idx.get(label, 0) for label in labels]
|
||||
|
||||
# Initialize and fit tokenizer
|
||||
self.tokenizer = Tokenizer(min_count=2)
|
||||
self.tokenizer.fit(texts)
|
||||
|
||||
if verbose:
|
||||
print(f"Vocabulary size: {self.tokenizer.vocab_size()}")
|
||||
|
||||
# Transform texts to feature vectors
|
||||
x_vectors = [self.tokenizer.transform(text) for text in texts]
|
||||
|
||||
# Initialize and train model
|
||||
self.model = LogisticRegression(
|
||||
input_dim=self.tokenizer.vocab_size(),
|
||||
output_dim=len(self.categories),
|
||||
learning_rate=0.05,
|
||||
reg_lambda=0.01,
|
||||
max_iterations=3000,
|
||||
)
|
||||
|
||||
self.model.train(x_vectors, label_indices, verbose=verbose)
|
||||
|
||||
def predict(self, text: str) -> Tuple[str, float, List[float]]:
|
||||
"""
|
||||
Predict category for a transaction text.
|
||||
|
||||
Returns:
|
||||
tuple: (predicted_category, confidence, all_probabilities)
|
||||
"""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Get predicted class
|
||||
pred_idx = np.argmax(probs)
|
||||
pred_category = self.idx_to_category[pred_idx]
|
||||
confidence = probs[pred_idx]
|
||||
|
||||
return pred_category, confidence, probs
|
||||
|
||||
def sort_categories(self, text: str, categories: List[str]) -> None:
|
||||
"""Sort categories by prediction probability for given text."""
|
||||
if not self.model or not self.tokenizer:
|
||||
return
|
||||
|
||||
# Transform text to feature vector
|
||||
x_vector = self.tokenizer.transform(text)
|
||||
|
||||
# Predict probabilities
|
||||
probs = self.model.predict_proba(x_vector)
|
||||
|
||||
# Create mapping from category to probability
|
||||
cat_to_prob = {}
|
||||
for idx, prob in enumerate(probs):
|
||||
if idx in self.idx_to_category:
|
||||
cat = self.idx_to_category[idx]
|
||||
cat_to_prob[cat] = prob
|
||||
|
||||
# Sort categories by probability
|
||||
categories.sort(key=lambda c: cat_to_prob.get(c, 0.0), reverse=True)
|
||||
|
||||
def save(self, path: Path) -> None:
|
||||
"""Save model to file."""
|
||||
if not self.model or not self.tokenizer:
|
||||
raise ValueError("Model not trained yet")
|
||||
|
||||
model_data = {
|
||||
"tokenizer_vocab": self.tokenizer.vocab,
|
||||
"tokenizer_inverse_vocab": self.tokenizer.inverse_vocab,
|
||||
"tokenizer_min_count": self.tokenizer.min_count,
|
||||
"tokenizer_lowercase": self.tokenizer.lowercase,
|
||||
"model_weights": self.model.weights,
|
||||
"model_bias": self.model.bias,
|
||||
"categories": self.categories,
|
||||
"category_to_idx": self.category_to_idx,
|
||||
"idx_to_category": self.idx_to_category,
|
||||
}
|
||||
|
||||
with open(path, "wb") as f:
|
||||
pickle.dump(model_data, f)
|
||||
|
||||
def load(self, path: Path) -> None:
|
||||
"""Load model from file."""
|
||||
with open(path, "rb") as f:
|
||||
model_data = pickle.load(f)
|
||||
|
||||
# Restore tokenizer
|
||||
self.tokenizer = Tokenizer(
|
||||
min_count=model_data["tokenizer_min_count"],
|
||||
lowercase=model_data["tokenizer_lowercase"],
|
||||
)
|
||||
self.tokenizer.vocab = model_data["tokenizer_vocab"]
|
||||
self.tokenizer.inverse_vocab = model_data["tokenizer_inverse_vocab"]
|
||||
|
||||
# Restore categories
|
||||
self.categories = model_data["categories"]
|
||||
self.category_to_idx = model_data["category_to_idx"]
|
||||
self.idx_to_category = model_data["idx_to_category"]
|
||||
|
||||
# Restore model
|
||||
input_dim = len(self.tokenizer.vocab)
|
||||
output_dim = len(self.categories)
|
||||
self.model = LogisticRegression(input_dim, output_dim)
|
||||
self.model.weights = model_data["model_weights"]
|
||||
self.model.bias = model_data["model_bias"]
|
||||
|
||||
|
||||
def train_classifier(
|
||||
transactions: List[Transaction],
|
||||
categories: List[str],
|
||||
output_path: Path = Path("transaction_classifier.pkl"),
|
||||
):
|
||||
"""Train transaction classifier and save to file."""
|
||||
global _classifier
|
||||
|
||||
# Filter transactions with account2
|
||||
valid_transactions = [t for t in transactions if t.account2 in categories]
|
||||
|
||||
if len(valid_transactions) < 10:
|
||||
logging.warning("Not enough transactions for training. Need at least 10.")
|
||||
return
|
||||
|
||||
logging.info(f"Training classifier on {len(valid_transactions)} transactions")
|
||||
|
||||
# Initialize and train classifier
|
||||
_classifier = TransactionClassifier()
|
||||
_classifier.fit(valid_transactions, categories, verbose=True)
|
||||
|
||||
# Save classifier
|
||||
_classifier.save(output_path)
|
||||
logging.info(f"Classifier saved to {output_path}")
|
||||
|
||||
|
||||
def get_sort_categories(model_path: Path):
|
||||
"""Get function to sort categories by prediction probability."""
|
||||
_classifier = None
|
||||
@@ -17,7 +370,6 @@ def get_sort_categories(model_path: Path):
|
||||
_classifier.sort_categories(row, categories)
|
||||
|
||||
try:
|
||||
|
||||
model_path = Path("transaction_classifier.pkl")
|
||||
_classifier = TransactionClassifier(model_path)
|
||||
if _classifier.model is None:
|
||||
@@ -29,9 +381,7 @@ def get_sort_categories(model_path: Path):
|
||||
return sort_categories
|
||||
|
||||
|
||||
def add_account2(
|
||||
model_path: Path, transactions: List[Transaction], categories: List[str]
|
||||
):
|
||||
def add_account2(model_path: Path, transactions: List[Transaction], categories: List[str]):
|
||||
"""Add account2 to unmapped transactions."""
|
||||
unmapped_transactions = list(filter(lambda t: t.mapping is None, transactions))
|
||||
if len(unmapped_transactions) == 0:
|
||||
@@ -47,8 +397,9 @@ def add_account2_interactive(transaction: Transaction, categories: List[str]):
|
||||
"""Interactively add account2 to a transaction."""
|
||||
t = transaction
|
||||
account2 = None
|
||||
prompt = f"{t.account1} {t.date} {t.description} {t.debit} > "
|
||||
header = f"{t.account1} | {t.date} | {t.description} | {t.debit}"
|
||||
logging.warning(f"No mapping for '{t}'.")
|
||||
while account2 is None:
|
||||
account2 = iterfzf(categories, prompt=prompt)
|
||||
account2 = iterfzf(categories, header=header)
|
||||
transaction.account2 = account2
|
||||
print(f"Assigned category '{account2}'.")
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import csv
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import toldg.models
|
||||
import toldg.predict
|
||||
@@ -13,13 +13,13 @@ from toldg.models import Config, CsvConfig, Mapping, Transaction
|
||||
|
||||
|
||||
def process_ldg_files(config: Config):
|
||||
for ldg_file in toldg.utils.get_ldg_files(config.input_directory):
|
||||
with open(ldg_file, "r") as f_in:
|
||||
with open(config.output_file, "a") as f_out:
|
||||
f_out.write(f_in.read())
|
||||
with open(config.output_file, "a") as f_out:
|
||||
for ldg_file in toldg.utils.get_ldg_files(config.input_directory):
|
||||
ldg_rel = os.path.relpath(ldg_file, os.path.dirname(config.output_file))
|
||||
f_out.write(f'include "{ldg_rel}"\n')
|
||||
|
||||
|
||||
def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
||||
def get_csv_config(csv_file: str, csv_configs: list[CsvConfig]) -> CsvConfig:
|
||||
cs = [c for c in csv_configs if re.match(c.file_match_regex, csv_file)]
|
||||
if not cs:
|
||||
logging.critical(f"No CSV config for {csv_file}.")
|
||||
@@ -30,7 +30,7 @@ def get_csv_config(csv_file: str, csv_configs: List[CsvConfig]) -> CsvConfig:
|
||||
return cs[0]
|
||||
|
||||
|
||||
def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
def get_transactions(csv_file: str, config: CsvConfig) -> list[Transaction]:
|
||||
def date_to_date(date: str) -> str:
|
||||
d = datetime.datetime.strptime(date, config.input_date_format)
|
||||
return d.strftime(config.output_date_format)
|
||||
@@ -38,7 +38,7 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
def flip_sign(amount: str) -> str:
|
||||
return amount[1:] if amount.startswith("-") else "-" + amount
|
||||
|
||||
def row_to_transaction(row, fields):
|
||||
def row_to_transaction(idx, row, fields):
|
||||
"""The user can configure the mapping of CSV fields to the three
|
||||
required fields date, amount and description via the CsvConfig."""
|
||||
t = {field: row[index] for index, field in fields}
|
||||
@@ -52,7 +52,8 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
account2=toldg.models.UNKNOWN_CATEGORY,
|
||||
description=t["description"],
|
||||
csv_file=csv_file,
|
||||
row=csv_file + ", " + ", ".join(row),
|
||||
row=", ".join(row),
|
||||
index=idx,
|
||||
)
|
||||
|
||||
fields = [(i, f) for i, f in enumerate(config.fields) if f]
|
||||
@@ -60,29 +61,31 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
|
||||
reader = csv.reader(f, delimiter=config.delimiter, quotechar=config.quotechar)
|
||||
for _ in range(config.skip):
|
||||
next(reader)
|
||||
transactions = [row_to_transaction(row, fields) for row in reader if row]
|
||||
rows = [row for row in reader if row]
|
||||
transactions = [row_to_transaction(i, row, fields) for i, row in enumerate(reversed(rows))]
|
||||
return transactions
|
||||
|
||||
|
||||
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
|
||||
def apply_mappings(transactions: list[Transaction], mappings: dict[str, Mapping]):
|
||||
"""Apply mappings to transactions."""
|
||||
unmapped_count = 0
|
||||
for t in transactions:
|
||||
if t.row in mappings:
|
||||
mapping = mappings[t.row]
|
||||
if t.key() in mappings:
|
||||
mapping = mappings[t.key()]
|
||||
assert isinstance(mapping, Mapping)
|
||||
assert (
|
||||
mapping.count > 0
|
||||
), f"{mapping} used by {t} but count is not greater than '0'."
|
||||
assert mapping.count > 0, f"{mapping} used by {t} but count is not greater than '0'."
|
||||
mapping.count -= 1
|
||||
t.mapping = mapping
|
||||
else:
|
||||
logging.warning(f"No mapping for '{t}'.")
|
||||
unmapped_count += 1
|
||||
if unmapped_count > 0:
|
||||
logging.info(f"{unmapped_count} transactions without mappings.")
|
||||
|
||||
for mapping in mappings.values():
|
||||
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
|
||||
|
||||
|
||||
def process_csv_files(config: Config) -> List[Transaction]:
|
||||
def process_csv_files(config: Config) -> list[Transaction]:
|
||||
csv_files = toldg.utils.get_csv_files(config.input_directory)
|
||||
transactions = []
|
||||
for csv_file in csv_files:
|
||||
@@ -94,5 +97,4 @@ def process_csv_files(config: Config) -> List[Transaction]:
|
||||
apply_mappings(transactions, mappings)
|
||||
toldg.predict.add_account2(config.model, transactions, config.categories)
|
||||
toldg.utils.write_mappings(transactions, config.mappings_file)
|
||||
toldg.write.render_to_file(transactions, config)
|
||||
return transactions
|
||||
|
||||
@@ -1,16 +1,13 @@
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from toldg.ml_predict import train_classifier
|
||||
from toldg.models import Config
|
||||
from toldg.predict import train_classifier
|
||||
from toldg.process import process_csv_files
|
||||
|
||||
|
||||
def train(config: Config):
|
||||
"""Train a transaction classifier from csv files."""
|
||||
logging.info("[train] Starting transaction classifier training")
|
||||
# Process transactions to get training data
|
||||
transactions = process_csv_files(config)
|
||||
output_path = Path("transaction_classifier.pkl")
|
||||
train_classifier(transactions, config.categories, config.model)
|
||||
logging.info("[train] Training completed")
|
||||
logging.info("Training completed")
|
||||
|
||||
@@ -3,7 +3,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Dict, List
|
||||
|
||||
from pydantic import ValidationError
|
||||
|
||||
@@ -57,29 +57,19 @@ def category_to_bean(c: str) -> str:
|
||||
return ":".join(new_sections)
|
||||
|
||||
|
||||
def write_meta(config: Config):
|
||||
with open(config.output_file, "a") as f:
|
||||
for category in config.categories:
|
||||
f.write(f"2017-01-01 open {category_to_bean(category)}\n")
|
||||
f.write("\n")
|
||||
f.write('option "operating_currency" "USD"\n\n')
|
||||
|
||||
|
||||
def write_mappings(transactions: List[Transaction], mappings_file: Path):
|
||||
"""Write transactions to the mappings file."""
|
||||
|
||||
mappings = read_mappings(mappings_file)
|
||||
for t in transactions:
|
||||
if t.row in mappings:
|
||||
if t.key() in mappings:
|
||||
pass
|
||||
else:
|
||||
mapping = Mapping(
|
||||
**{
|
||||
"account2": t.account2.strip(),
|
||||
"narration": t.description,
|
||||
}
|
||||
account2=t.account2.strip(),
|
||||
narration=t.description,
|
||||
)
|
||||
mappings[t.row] = mapping
|
||||
mappings[t.key()] = mapping
|
||||
|
||||
mappings = {k: v.model_dump(exclude_none=True) for k, v in mappings.items()}
|
||||
with open(mappings_file, "w") as f:
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from toldg.models import Config, Transaction
|
||||
@@ -6,6 +5,9 @@ from toldg.utils import category_to_bean
|
||||
|
||||
BEANCOUNT_TRANSACTION_TEMPLATE = """
|
||||
{t.date} * {description}{tags}
|
||||
source_file: "{t.csv_file}"
|
||||
source_index: {t.index}
|
||||
source_row: "{t.row}"
|
||||
{account2:<40} {t.debit:<6} {t.currency}
|
||||
{account1:<40} {t.credit:<6} {t.currency}
|
||||
"""
|
||||
@@ -13,7 +15,6 @@ BEANCOUNT_TRANSACTION_TEMPLATE = """
|
||||
|
||||
def format(t):
|
||||
t.date = t.date.replace("/", "-")
|
||||
|
||||
tags = ""
|
||||
description = None
|
||||
if t.mapping:
|
||||
|
||||
Reference in New Issue
Block a user