#!/usr/bin/env python3 import os import sys import csv import json import logging import datetime import ofxtools from typing import List from dataclasses import dataclass from ofxtools import OFXClient from ofxtools.Client import StmtRq, CcStmtEndRq, CcStmtRq from ofxtools.Parser import OFXTree @dataclass class ClientConfig: url: str userid: str org: str clientuid: str fid: str bankid: str version: int @dataclass class AccountConfig: name: str accttype: str acctid: str csv_file: str fields: List[str] @dataclass class Config: """ Basic class for abstracting the configuration. """ secret: str client: ClientConfig accounts: List[AccountConfig] @dataclass class Transaction: date: str description: str amount: str def get_transactions(client: OFXClient, secret: str, account: AccountConfig): """ The idea is that I run this often enough so that the last 30 days of data are enough to never lose a transaction. This would be the syntax to create a datetime object with an arbitrary date: datetime.datetime(2020, 1, 1, tzinfo=ofxtools.utils.UTC)""" dtend = datetime.datetime.utcnow().replace(tzinfo=ofxtools.utils.UTC) dtstart = dtend - datetime.timedelta(days=30) if account.accttype.upper() in ("CHECKING", "SAVINGS"): rq = StmtRq(acctid=account.acctid, accttype=account.accttype.upper(), dtstart=dtstart, dtend=dtend) else: rq = CcStmtRq(acctid=account.acctid, dtstart=dtstart, dtend=dtend) response = client.request_statements(secret, rq) parser = OFXTree() parser.parse(response) ofx = parser.convert() # Sort by date because my credit transactions come in random order. translist = sorted(ofx.statements[0].banktranlist, reverse=True, key=lambda t: t.dtposted) ts = [Transaction(t.dtposted.strftime("%m/%d/%Y"), t.name + " " + t.memo if t.memo else t.name, str(t.trnamt)) for t in translist] return ts def write_csv(account: AccountConfig, transactions: List[Transaction]): def transaction_to_csv_row(t: Transaction) -> List[str]: """ This allows to user to specify how to order the fields in the CSV file. I have implemented this feature because the columns in my checking account and in my credit card accounts are different. If the field is one of 'date', 'description', or 'amount' we get that attribute from the transaction. Otherwise, we use the field itself (usually an empty string in my case). """ return [getattr(t, f) if hasattr(t, f) else f for f in account.fields] def merge_rows(csv_rows: List, ofx_rows: List) -> (List, int): """ Prepend new transactions to the one from the CSV file. We assume that both the new transactions from OFX and the transactions in the CSV file are sorted in descending order, meaning the newest transaction comes first. The idea is then to see if one of the new transactions matches the latest one in the CSV file. If yes, we include all newer transactions from the CSV file. If no, we include all transactions. """ csv_rows.reverse() ofx_rows.reverse() newest_csv_row = csv_rows[-1] new_rows = 0 row_found = False for row in ofx_rows: if row_found is False and row == newest_csv_row: row_found = True elif row_found is True: csv_rows.append(row) new_rows += 1 if not row_found: csv_rows += ofx_rows new_rows = len(ofx_rows) csv_rows.reverse() return (csv_rows, new_rows) status = "no change" csv_file = account.csv_file ofx_rows = [transaction_to_csv_row(t) for t in transactions] if not os.path.isfile(csv_file): new_rows = len(ofx_rows) status = f"new ({new_rows})" rows = ofx_rows header = ["date", "description", "amount"] else: with open(csv_file, 'r') as f: reader = csv.reader(f) header = next(reader) # skip header csv_rows = list(reader) rows, new_rows = merge_rows(csv_rows, ofx_rows) if new_rows > 0: status = f"update ({new_rows})" logging.warning(f"{account.name:30} -> {account.csv_file:30} | {status}") if new_rows == 0: return with open(account.csv_file, "w") as f: csv_writer = csv.writer(f) csv_writer.writerow(header) for r in rows: csv_writer.writerow(r) def get_client(c: ClientConfig) -> OFXClient: return OFXClient(c.url, userid=c.userid, org=c.org, fid=c.fid, clientuid=c.clientuid, bankid=c.bankid, version=c.version, prettyprint=True) def parse_config(config_file: str) -> Config: with open(config_file, 'r') as f: # We could use the dacite package to parse the configuration # recursively with full type checking. Probably not worth it at this # point. config = Config(**json.load(f)) config.client = ClientConfig(**config.client) config.accounts = [AccountConfig(**a) for a in config.accounts] return config def main(config: Config): client = get_client(config.client) for account in config.accounts: transactions = get_transactions(client, config.secret, account) write_csv(account, transactions) if __name__ == "__main__": logging.basicConfig(level=logging.WARNING, format='%(message)s') try: config_file = sys.argv[1] except IndexError: config_file = "getofx.json" config = parse_config(config_file) main(config)