From 3138be8d17ecd2a84fdd4ba6070d0fb69706d588 Mon Sep 17 00:00:00 2001 From: Felix Martin Date: Tue, 11 Aug 2020 11:44:30 -0400 Subject: [PATCH] Finish ofx script. It now automatically merges the latest data into the CSV file. --- getofx.py | 78 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 18 deletions(-) diff --git a/getofx.py b/getofx.py index 85fa569..dc0c112 100644 --- a/getofx.py +++ b/getofx.py @@ -44,6 +44,7 @@ class Config: client: ClientConfig accounts: List[AccountConfig] + @dataclass class Transaction: date: str @@ -52,8 +53,12 @@ class Transaction: def get_transactions(client: OFXClient, secret: str, account: AccountConfig): - dtstart = datetime.datetime(2020, 1, 1, tzinfo=ofxtools.utils.UTC) - dtend = datetime.datetime(2020, 12, 31, tzinfo=ofxtools.utils.UTC) + """ The idea is that I run this often enough so that the last 30 days of + data are enough to never lose a transaction. This would be the syntax to + create a datetime object with an arbitrary date: + datetime.datetime(2020, 1, 1, tzinfo=ofxtools.utils.UTC)""" + dtend = datetime.datetime.utcnow().replace(tzinfo=ofxtools.utils.UTC) + dtstart = dtend - datetime.timedelta(days=30) if account.accttype.upper() in ("CHECKING", "SAVINGS"): rq = StmtRq(acctid=account.acctid, accttype=account.accttype.upper(), @@ -64,10 +69,14 @@ def get_transactions(client: OFXClient, secret: str, account: AccountConfig): parser = OFXTree() parser.parse(response) ofx = parser.convert() + + # Sort by date because my credit transactions come in random order. + translist = sorted(ofx.statements[0].banktranlist, reverse=True, + key=lambda t: t.dtposted) + ts = [Transaction(t.dtposted.strftime("%m/%d/%Y"), - t.name + " " + t.memo if t.memo else t.name, - str(t.trnamt)) - for t in ofx.statements[0].banktranlist] + t.name + " " + t.memo if t.memo else t.name, + str(t.trnamt)) for t in translist] return ts @@ -83,23 +92,56 @@ def write_csv(account: AccountConfig, transactions: List[Transaction]): return [getattr(t, f) if hasattr(t, f) else f for f in account.fields] - status = "no change" + def merge_rows(csv_rows: List, ofx_rows: List) -> (List, int): + """ Prepend new transactions to the one from the CSV file. We assume + that both the new transactions from OFX and the transactions in the CSV + file are sorted in descending order, meaning the newest transaction + comes first. The idea is then to see if one of the new transactions + matches the latest one in the CSV file. If yes, we include all newer + transactions from the CSV file. If no, we include all transactions. """ + csv_rows.reverse() + ofx_rows.reverse() + newest_csv_row = csv_rows[-1] + new_rows = 0 + row_found = False + for row in ofx_rows: + if row_found is False and row == newest_csv_row: + row_found = True + elif row_found is True: + csv_rows.append(row) + new_rows += 1 + if not row_found: + csv_rows += ofx_rows + new_rows = len(ofx_rows) + csv_rows.reverse() + return (csv_rows, new_rows) + status = "no change" csv_file = account.csv_file + ofx_rows = [transaction_to_csv_row(t) for t in transactions] if not os.path.isfile(csv_file): - status = "new" - with open(account.csv_file, "w") as f: - csv_writer = csv.writer(f) - csv_writer.writerow(["date", "description", "amount"]) - for t in transactions: - r = transaction_to_csv_row(t) - csv_writer.writerow(r) + new_rows = len(ofx_rows) + status = f"new ({new_rows})" + rows = ofx_rows + header = ["date", "description", "amount"] else: - # TODO: diff rows and append only the new once. - pass + with open(csv_file, 'r') as f: + reader = csv.reader(f) + header = next(reader) # skip header + csv_rows = list(reader) + rows, new_rows = merge_rows(csv_rows, ofx_rows) + if new_rows > 0: + status = f"update ({new_rows})" logging.warning(f"{account.name:30} -> {account.csv_file:30} | {status}") + if new_rows == 0: + return + with open(account.csv_file, "w") as f: + csv_writer = csv.writer(f) + csv_writer.writerow(header) + for r in rows: + csv_writer.writerow(r) def get_client(c: ClientConfig) -> OFXClient: @@ -110,9 +152,9 @@ def get_client(c: ClientConfig) -> OFXClient: def parse_config(config_file: str) -> Config: with open(config_file, 'r') as f: - # We could use the dacite package if the configureation - # gets more complex and for automatical type ckecking, but - # probably not worth it as this point. + # We could use the dacite package to parse the configuration + # recursively with full type checking. Probably not worth it at this + # point. config = Config(**json.load(f)) config.client = ClientConfig(**config.client) config.accounts = [AccountConfig(**a) for a in config.accounts]