Finish ofx script. It now automatically merges the latest data into the CSV file.

This commit is contained in:
2020-08-11 11:44:30 -04:00
parent b9adfc0960
commit 3138be8d17

View File

@@ -44,6 +44,7 @@ class Config:
client: ClientConfig
accounts: List[AccountConfig]
@dataclass
class Transaction:
date: str
@@ -52,8 +53,12 @@ class Transaction:
def get_transactions(client: OFXClient, secret: str, account: AccountConfig):
dtstart = datetime.datetime(2020, 1, 1, tzinfo=ofxtools.utils.UTC)
dtend = datetime.datetime(2020, 12, 31, tzinfo=ofxtools.utils.UTC)
""" The idea is that I run this often enough so that the last 30 days of
data are enough to never lose a transaction. This would be the syntax to
create a datetime object with an arbitrary date:
datetime.datetime(2020, 1, 1, tzinfo=ofxtools.utils.UTC)"""
dtend = datetime.datetime.utcnow().replace(tzinfo=ofxtools.utils.UTC)
dtstart = dtend - datetime.timedelta(days=30)
if account.accttype.upper() in ("CHECKING", "SAVINGS"):
rq = StmtRq(acctid=account.acctid, accttype=account.accttype.upper(),
@@ -64,10 +69,14 @@ def get_transactions(client: OFXClient, secret: str, account: AccountConfig):
parser = OFXTree()
parser.parse(response)
ofx = parser.convert()
# Sort by date because my credit transactions come in random order.
translist = sorted(ofx.statements[0].banktranlist, reverse=True,
key=lambda t: t.dtposted)
ts = [Transaction(t.dtposted.strftime("%m/%d/%Y"),
t.name + " " + t.memo if t.memo else t.name,
str(t.trnamt))
for t in ofx.statements[0].banktranlist]
t.name + " " + t.memo if t.memo else t.name,
str(t.trnamt)) for t in translist]
return ts
@@ -83,23 +92,56 @@ def write_csv(account: AccountConfig, transactions: List[Transaction]):
return [getattr(t, f) if hasattr(t, f) else f
for f in account.fields]
status = "no change"
def merge_rows(csv_rows: List, ofx_rows: List) -> (List, int):
""" Prepend new transactions to the one from the CSV file. We assume
that both the new transactions from OFX and the transactions in the CSV
file are sorted in descending order, meaning the newest transaction
comes first. The idea is then to see if one of the new transactions
matches the latest one in the CSV file. If yes, we include all newer
transactions from the CSV file. If no, we include all transactions. """
csv_rows.reverse()
ofx_rows.reverse()
newest_csv_row = csv_rows[-1]
new_rows = 0
row_found = False
for row in ofx_rows:
if row_found is False and row == newest_csv_row:
row_found = True
elif row_found is True:
csv_rows.append(row)
new_rows += 1
if not row_found:
csv_rows += ofx_rows
new_rows = len(ofx_rows)
csv_rows.reverse()
return (csv_rows, new_rows)
status = "no change"
csv_file = account.csv_file
ofx_rows = [transaction_to_csv_row(t) for t in transactions]
if not os.path.isfile(csv_file):
status = "new"
with open(account.csv_file, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow(["date", "description", "amount"])
for t in transactions:
r = transaction_to_csv_row(t)
csv_writer.writerow(r)
new_rows = len(ofx_rows)
status = f"new ({new_rows})"
rows = ofx_rows
header = ["date", "description", "amount"]
else:
# TODO: diff rows and append only the new once.
pass
with open(csv_file, 'r') as f:
reader = csv.reader(f)
header = next(reader) # skip header
csv_rows = list(reader)
rows, new_rows = merge_rows(csv_rows, ofx_rows)
if new_rows > 0:
status = f"update ({new_rows})"
logging.warning(f"{account.name:30} -> {account.csv_file:30} | {status}")
if new_rows == 0:
return
with open(account.csv_file, "w") as f:
csv_writer = csv.writer(f)
csv_writer.writerow(header)
for r in rows:
csv_writer.writerow(r)
def get_client(c: ClientConfig) -> OFXClient:
@@ -110,9 +152,9 @@ def get_client(c: ClientConfig) -> OFXClient:
def parse_config(config_file: str) -> Config:
with open(config_file, 'r') as f:
# We could use the dacite package if the configureation
# gets more complex and for automatical type ckecking, but
# probably not worth it as this point.
# We could use the dacite package to parse the configuration
# recursively with full type checking. Probably not worth it at this
# point.
config = Config(**json.load(f))
config.client = ClientConfig(**config.client)
config.accounts = [AccountConfig(**a) for a in config.accounts]