mbank-to-mt940/mbank.py

156 lines
4.5 KiB
Python

import os
import string
import csv
import math
from datetime import datetime, timedelta
from data import Transaction
EXPECT_HEADERS = [
"#Data księgowania",
"#Data operacji",
"#Opis operacji",
"#Tytuł",
"#Nadawca/Odbiorca",
"#Numer konta",
"#Kwota",
"#Saldo po operacji",
]
NAME_REMOVE_PREFIXES = ["Payment from ", "To "]
DATE_FORMAT = "%Y-%m-%d"
TIME_FORMAT = "%H:%M:%S"
DATETIME_FORMAT = DATE_FORMAT + TIME_FORMAT
FEE_NAME = "Revolut"
FEE_IBAN = ""
FEE_DESCRIPTION_FORMAT = "Bank transaction fee {}"
FEE_DATETIME_DELTA = timedelta(seconds=1)
def parse_float(s: string):
return float(s.replace(",", ".").replace(" ", ""))
class MbankCsvReader:
def __init__(self, file_path):
if not os.path.isfile(file_path):
raise ValueError("File does not exist: {}".format(file_path))
temp_file_path = file_path + ".tmp"
# trimming metadata added my mbank export
with open(file_path, encoding="cp1250") as old, open(
temp_file_path, "w", encoding="utf-8"
) as new:
lines = old.readlines()
(date_start, date_end, _3) = lines[14].split(";")
iban = "PL" + lines[20].split(";")[0].replace(" ", "")
starting_balance = parse_float(lines[35].split(";")[1].split(" ")[0])
self.range = date_start + "-" + date_end
self.date_start = datetime.fromisoformat(
"-".join(list(reversed(date_start.split("."))))
)
self.date_end = datetime.fromisoformat(
"-".join(list(reversed(date_end.split("."))))
)
self.iban = iban
self.starting_balance = starting_balance
new.writelines(lines[37:-1])
self.filename = temp_file_path
self.file = open(self.filename, "r", encoding="utf-8")
self.reader = csv.reader(self.file, delimiter=";")
self._validate()
def __del__(self):
if not self.file.closed:
self.file.close()
def _validate(self):
headers = [h for h in next(self.reader) if h != ""]
print(headers)
print(EXPECT_HEADERS)
if headers != EXPECT_HEADERS:
raise ValueError("Headers do not match expected Revolut CSV format.")
def get_all_transactions(self):
transactions = []
for row in self.reader:
transactions = self._parse_transaction(row) + transactions
return transactions
def _parse_transaction(self, row):
def _santize_name(name_):
for remove_prefix in NAME_REMOVE_PREFIXES:
if name_.startswith(remove_prefix):
name_ = name_[len(remove_prefix) :]
return name_
def _parse_datetime(date_str):
return datetime.strptime(date_str + "7:00:00", DATETIME_FORMAT)
print(row)
if len(row) < 2 or row[2] == "OTWARCIE RACHUNKU" or row[0] == "":
print("skipping")
return []
(
_0,
completed_date_str,
_1,
description,
name,
iban,
amount_str,
balance_str,
_9,
) = row
completed_datetime = _parse_datetime(completed_date_str)
amount, balance = parse_float(amount_str), parse_float(balance_str)
print(amount, balance)
transaction_without_fee = Transaction(
amount=amount,
name=_santize_name(name),
iban=iban,
description=description,
datetime=completed_datetime,
before_balance=balance - amount,
after_balance=balance,
)
batch = [transaction_without_fee]
# no support for transaction fees
# if not math.isclose(fee, 0.00):
# fee_transaction = self._make_fee_transaction(
# completed_datetime, balance, fee
# )
# batch.append(fee_transaction)
return batch
def _make_fee_transaction(self, completed_datetime, balance, fee):
return Transaction(
amount=fee,
name=FEE_NAME,
iban=FEE_IBAN,
# include timestamp of transaction to make sure that SnelStart
# does not detect similar transactions as the same one
description=FEE_DESCRIPTION_FORMAT.format(
int(completed_datetime.timestamp())
),
datetime=completed_datetime + FEE_DATETIME_DELTA,
before_balance=balance - fee,
after_balance=balance,
)