Add count to specify how often a mapping is used

This commit is contained in:
2025-03-02 13:44:43 -05:00
parent 078bf07d0f
commit 35e1c1039e
3 changed files with 22 additions and 42 deletions

View File

@@ -40,8 +40,6 @@ class Config(BaseModel):
categories (List[str]): A list of account2s. An account has to be defined here
before it can be used in a mapping. Otherwise, ledger will complain.
commodities (List[str]): A list of commodities relevant to the data processing.
find_duplicates (bool): Flag to check and abort on duplicated transactions. Not
really useful.
"""
class Config:
@@ -52,7 +50,6 @@ class Config(BaseModel):
output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig]
categories: List[str]
find_duplicates: bool = False
class Transaction(BaseModel):
@@ -81,5 +78,6 @@ class Mapping(BaseModel):
extra = "forbid"
account2: str
count: int = 1
narration: Optional[str] = None
payee: Optional[str] = None

View File

@@ -64,29 +64,16 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
return transactions
def find_duplicates(transactions: List[Transaction]):
rows = set()
for t in transactions:
row = t.row
if row in rows:
logging.critical(f"'{row}' is duplicated.")
logging.critical("Exit because of duplicated transactions.")
sys.exit(1)
else:
rows.add(row)
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
"""Apply mappings to transactions."""
unused_mappings = set(mappings.keys())
for t in transactions:
if t.row in mappings:
mapping = mappings[t.row]
assert isinstance(
mapping, Mapping
), "Only new mappings format is supported."
assert isinstance(mapping, Mapping)
assert (
mapping.count > 0
), f"{mapping} used by {t} but count is not greater than '0'."
mapping.count -= 1
t.account2 = mapping.account2
if mapping.narration:
@@ -94,15 +81,11 @@ def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]
if mapping.payee:
t.payee = mapping.payee
unused_mappings.discard(t.row)
else:
logging.warning(f"No mapping for '{t}'.")
for row in unused_mappings:
mapping_info = mappings[row]
account2 = mapping_info["account2"]
logging.warning(f"Unused mapping '{row}' -> {account2}")
for mapping in mappings.values():
assert mapping.count == 0, f"{mapping} was not used as often as expected!"
def process_csv_files(config: Config):
@@ -113,9 +96,6 @@ def process_csv_files(config: Config):
csv_config = get_csv_config(csv_file, config.csv_configs)
transactions += get_transactions(csv_file, csv_config)
if config.find_duplicates:
find_duplicates(transactions)
mappings = toldg.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings)
toldg.predict.add_account2(transactions, config.categories)

View File

@@ -67,19 +67,21 @@ def write_meta(config: Config):
def write_mappings(transactions: List[Transaction], mappings_file: Path):
"""Write transactions to the mappings file."""
mappings = {}
for t in transactions:
mapping = Mapping(
**{
"account2": t.account2.strip(),
}
)
if t.narration:
mapping.narration = t.narration
if t.payee:
mapping.payee = t.payee
mappings[t.row] = mapping.dict()
mappings = read_mappings(mappings_file)
for t in transactions:
if t.row in mappings:
pass
else:
mapping = Mapping(
**{
"account2": t.account2.strip(),
"narration": t.description,
}
)
mappings[t.row] = mapping
mappings = {k: v.dict() for k, v in mappings.items()}
with open(mappings_file, "w") as f:
json.dump(mappings, f, indent=4)