Add count to specify how often a mapping is used.

This commit is contained in:
2025-03-02 13:44:43 -05:00
parent 078bf07d0f
commit 5d40838368
2 changed files with 8 additions and 30 deletions

View File

@@ -40,8 +40,6 @@ class Config(BaseModel):
categories (List[str]): A list of account2s. An account has to be defined here categories (List[str]): A list of account2s. An account has to be defined here
before it can be used in a mapping. Otherwise, ledger will complain. before it can be used in a mapping. Otherwise, ledger will complain.
commodities (List[str]): A list of commodities relevant to the data processing. commodities (List[str]): A list of commodities relevant to the data processing.
find_duplicates (bool): Flag to check and abort on duplicated transactions. Not
really useful.
""" """
class Config: class Config:
@@ -52,7 +50,6 @@ class Config(BaseModel):
output_file: Path = Path("output.ldg") output_file: Path = Path("output.ldg")
csv_configs: List[CsvConfig] csv_configs: List[CsvConfig]
categories: List[str] categories: List[str]
find_duplicates: bool = False
class Transaction(BaseModel): class Transaction(BaseModel):
@@ -81,5 +78,6 @@ class Mapping(BaseModel):
extra = "forbid" extra = "forbid"
account2: str account2: str
count: int = 1
narration: Optional[str] = None narration: Optional[str] = None
payee: Optional[str] = None payee: Optional[str] = None

View File

@@ -64,29 +64,16 @@ def get_transactions(csv_file: str, config: CsvConfig) -> List[Transaction]:
return transactions return transactions
def find_duplicates(transactions: List[Transaction]):
rows = set()
for t in transactions:
row = t.row
if row in rows:
logging.critical(f"'{row}' is duplicated.")
logging.critical("Exit because of duplicated transactions.")
sys.exit(1)
else:
rows.add(row)
def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]): def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]):
"""Apply mappings to transactions.""" """Apply mappings to transactions."""
unused_mappings = set(mappings.keys())
for t in transactions: for t in transactions:
if t.row in mappings: if t.row in mappings:
mapping = mappings[t.row] mapping = mappings[t.row]
assert isinstance(mapping, Mapping)
assert isinstance( assert (
mapping, Mapping mapping.count > 0
), "Only new mappings format is supported." ), f"{mapping} used by {t} but count is not greater than '0'."
mapping.count -= 1
t.account2 = mapping.account2 t.account2 = mapping.account2
if mapping.narration: if mapping.narration:
@@ -94,15 +81,11 @@ def apply_mappings(transactions: List[Transaction], mappings: Dict[str, Mapping]
if mapping.payee: if mapping.payee:
t.payee = mapping.payee t.payee = mapping.payee
unused_mappings.discard(t.row)
else: else:
logging.warning(f"No mapping for '{t}'.") logging.warning(f"No mapping for '{t}'.")
for row in unused_mappings: for mapping in mappings.values():
mapping_info = mappings[row] assert mapping.count == 0, f"{mapping} was not used as often as expected!"
account2 = mapping_info["account2"]
logging.warning(f"Unused mapping '{row}' -> {account2}")
def process_csv_files(config: Config): def process_csv_files(config: Config):
@@ -113,9 +96,6 @@ def process_csv_files(config: Config):
csv_config = get_csv_config(csv_file, config.csv_configs) csv_config = get_csv_config(csv_file, config.csv_configs)
transactions += get_transactions(csv_file, csv_config) transactions += get_transactions(csv_file, csv_config)
if config.find_duplicates:
find_duplicates(transactions)
mappings = toldg.utils.read_mappings(config.mappings_file) mappings = toldg.utils.read_mappings(config.mappings_file)
apply_mappings(transactions, mappings) apply_mappings(transactions, mappings)
toldg.predict.add_account2(transactions, config.categories) toldg.predict.add_account2(transactions, config.categories)