ledgerpy/getofx.py

180 lines
5.7 KiB
Python

#!/usr/bin/env python3
import os
import sys
import csv
import json
import logging
import datetime
import ofxtools
from typing import List
from dataclasses import dataclass
from ofxtools import OFXClient
from ofxtools.Client import StmtRq, CcStmtEndRq, CcStmtRq
from ofxtools.Parser import OFXTree
@dataclass
class ClientConfig:
url: str
userid: str
org: str
clientuid: str
fid: str
bankid: str
version: int
@dataclass
class AccountConfig:
name: str
accttype: str
acctid: str
csv_file: str
fields: List[str]
@dataclass
class Config:
"""
Basic class for abstracting the configuration.
"""
secret: str
client: ClientConfig
accounts: List[AccountConfig]
@dataclass
class Transaction:
date: str
description: str
amount: str
def get_transactions(client: OFXClient, secret: str, account: AccountConfig):
""" The idea is that I run this often enough so that the last 30 days of
data are enough to never lose a transaction. This would be the syntax to
create a datetime object with an arbitrary date:
datetime.datetime(2020, 1, 1, tzinfo=ofxtools.utils.UTC)"""
dtend = datetime.datetime.utcnow().replace(tzinfo=ofxtools.utils.UTC)
dtstart = dtend - datetime.timedelta(days=30)
if account.accttype.upper() in ("CHECKING", "SAVINGS"):
rq = StmtRq(acctid=account.acctid, accttype=account.accttype.upper(),
dtstart=dtstart, dtend=dtend)
else:
rq = CcStmtRq(acctid=account.acctid, dtstart=dtstart, dtend=dtend)
response = client.request_statements(secret, rq)
parser = OFXTree()
parser.parse(response)
ofx = parser.convert()
# Sort by date because my credit transactions come in random order.
translist = sorted(ofx.statements[0].banktranlist, reverse=True,
key=lambda t: t.dtposted)
ts = [Transaction(t.dtposted.strftime("%m/%d/%Y"),
t.name + " " + t.memo if t.memo else t.name,
str(t.trnamt)) for t in translist]
return ts
def write_csv(account: AccountConfig, transactions: List[Transaction]):
def transaction_to_csv_row(t: Transaction) -> List[str]:
""" This allows to user to specify how to order the fields in the CSV
file. I have implemented this feature because the columns in my
checking account and in my credit card accounts are different. If the
field is one of 'date', 'description', or 'amount' we get that
attribute from the transaction. Otherwise, we use the field itself
(usually an empty string in my case). """
return [getattr(t, f) if hasattr(t, f) else f
for f in account.fields]
def merge_rows(csv_rows: List, ofx_rows: List) -> (List, int):
""" Prepend new transactions to the one from the CSV file. We assume
that both the new transactions from OFX and the transactions in the CSV
file are sorted in descending order, meaning the newest transaction
comes first. The idea is then to see if one of the new transactions
matches the latest one in the CSV file. If yes, we include all newer
transactions from the CSV file. If no, we include all transactions. """
csv_rows.reverse()
ofx_rows.reverse()
newest_csv_row = csv_rows[-1]
new_rows = 0
row_found = False
for row in ofx_rows:
if row_found is False and row == newest_csv_row:
row_found = True
elif row_found is True:
csv_rows.append(row)
new_rows += 1
if not row_found:
csv_rows += ofx_rows
new_rows = len(ofx_rows)
csv_rows.reverse()
return (csv_rows, new_rows)
status = "no change"
csv_file = account.csv_file
ofx_rows = [transaction_to_csv_row(t) for t in transactions]
if not os.path.isfile(csv_file):
new_rows = len(ofx_rows)
status = f"new ({new_rows})"
rows = ofx_rows
header = ["date", "description", "amount"]
else:
with open(csv_file, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip header
csv_rows = list(reader)
rows, new_rows = merge_rows(csv_rows, ofx_rows)
if new_rows > 0:
status = f"update ({new_rows})"
logging.warning(f"{account.name:30} -> {account.csv_file:30} | {status}")
if new_rows == 0:
return
with open(account.csv_file, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow(header)
for r in rows:
csv_writer.writerow(r)
def get_client(c: ClientConfig) -> OFXClient:
return OFXClient(c.url, userid=c.userid, org=c.org, fid=c.fid,
clientuid=c.clientuid, bankid=c.bankid,
version=c.version, prettyprint=True)
def parse_config(config_file: str) -> Config:
with open(config_file, 'r') as f:
# We could use the dacite package to parse the configuration
# recursively with full type checking. Probably not worth it at this
# point.
config = Config(**json.load(f))
config.client = ClientConfig(**config.client)
config.accounts = [AccountConfig(**a) for a in config.accounts]
return config
def main(config: Config):
client = get_client(config.client)
for account in config.accounts:
transactions = get_transactions(client, config.secret, account)
write_csv(account, transactions)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING, format='%(message)s')
try:
config_file = sys.argv[1]
except IndexError:
config_file = "getofx.json"
config = parse_config(config_file)
main(config)