from datetime import datetime, timedelta, date
import os.path
import sys
import logging
sys.path.append('.')
# pip install beancount
from beancount.core import data
from beancount.loader import load_file
from beancount.core.amount import Amount
from beancount.ingest.importer import ImporterProtocol
# pip install ofxtools
from ofxtools.Parser import OFXTree
# Just a python dictionary mapping a Transaction's narration to a Payee and Category
from importers.resolver import categories, payees
logging.basicConfig(level=logging.INFO)
class QFXImporter(ImporterProtocol):
def __init__(self, account, account_id, beancount_file, current_year):
self.account = account
self.account_id = account_id
self.currency = 'CAD'
self.beancount_file = beancount_file
self.period_date_start = date(current_year, 1, 1)
self.period_date_end = date(current_year, 12, 31)
self.date_start = None
self.date_end = None
self.parser = OFXTree()
self.transactions_hash_set = self.index_beancount(self.beancount_file)
super().__init__()
def get_transaction_hash(self, t):
return f"{t.postings[0].account}|{t.date.isoformat()}|{t.narration}|{t.postings[0].units}"
def index_beancount(self, file_):
logging.info(f"Indexing: {file_}")
entries, _, _ = load_file(file_)
transactions_hash = []
for entry in entries:
if type(entry) == data.Transaction:
transactions_hash.append(self.get_transaction_hash(entry))
return set(transactions_hash)
def parse(self, file_):
with open(file_.name, 'rb') as f:
self.parser.parse(f)
return self.parser.convert()
def identify(self, file_):
ofx = self.parse(file_)
return ofx.statements[0].acctid == self.account_id
def extract(self, file_):
if not self.identify(file_):
return []
ofx = self.parse(file_)
entries = []
balance_amount = None
with open(file_.name) as fd:
lines = ofx.statements[0].banktranlist
self.date_start = ofx.statements[0].banktranlist.dtstart.date()
self.date_end = ofx.statements[0].banktranlist.dtend.date()
try:
balance_amount = Amount(ofx.statements[0].availbal.balamt, self.currency)
except AttributeError:
balance_amount = None
for i, line in enumerate(lines):
meta = data.new_metadata(file_.name, i)
amount = Amount(line.trnamt, self.currency)
date = line.dtposted.date()
description = line.name
category = categories.get(description) or "Unknown:TODO"
payee = payees.get(description) or "Unknown"
postings = [
data.Posting(self.account, amount, None, None, None, None),
data.Posting(category, None, None, None, None, None)
]
new_transaction = data.Transaction(
meta,
date,
self.FLAG,
payee,
description,
data.EMPTY_SET,
data.EMPTY_SET,
postings
)
transaction_hash = self.get_transaction_hash(new_transaction)
if date < self.period_date_start or date > self.period_date_end:
logging.warning(f"Skipping => transaction before or after relevant period: {transaction_hash}")
continue
if transaction_hash not in self.transactions_hash_set:
logging.info(f"New transaction: {transaction_hash}")
entries.append(new_transaction)
else:
logging.warning(f"Skipping => {transaction_hash}")
meta = data.new_metadata(fd.name, 0)
if balance_amount:
entries.append(
data.Balance(
meta,
self.date_end + timedelta(days=1),
self.account,
balance_amount,
None,
None
)
)
return entries
def file_account(self, file_):
return self.account
def file_date(self, file_):
self.extract(file_)
return self.date_end
def file_name(self, file_):
_, extension = os.path.splitext(os.path.basename(file_.name))
return f'XXX{extension}'