395 lines
16 KiB
Python
Executable File
395 lines
16 KiB
Python
Executable File
#!/bin/env python3
|
|
|
|
from typing import Dict, Any, Tuple, Optional, Iterator
|
|
import argparse
|
|
import datetime
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
|
|
USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}')
|
|
BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
|
|
IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
|
|
EMAIL_RE = re.compile('[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}')
|
|
|
|
|
|
STREET_NAMES = {
|
|
'Hans-Wagnerstraße': 'Hans-Wagner-Straße',
|
|
'J.Seitzstraße': 'Josef-Seitz-Straße',
|
|
'Kurhaus-Str.': 'Kurhausstraße',
|
|
'Kurhaus-Straße': 'Kurhausstraße',
|
|
'Pirawartherstraße': 'Pirawarther Straße',
|
|
'Raggendorferstraße': 'Raggendorfer Straße',
|
|
'Matznerstraße': 'Matzner Straße',
|
|
'Stillfriederstraße': 'Stillfrieder Straße',
|
|
'Harraserstraße': 'Harraser Straße',
|
|
'Gänserndorferstraße': 'Gänserdorfer Straße',
|
|
'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße',
|
|
'Sulzerstraße': 'Sulzer Straße',
|
|
'Brünnerstraße': 'Brünner Straße',
|
|
'Flustraße': 'Flurstraße',
|
|
'Wienerstraße': 'Wiener Straße',
|
|
'St.Laurentstraße': 'St.-Laurentstraße',
|
|
'Angernerstraße': 'Angerner Straße',
|
|
'Schweinbartherstraße': 'Schweinbarther Straße',
|
|
'Hohenruppersdorferstraße': 'Hohenruppersdorfer Straße',
|
|
'Gruberhauptstraße': 'Gruber Hauptstraße',
|
|
'Josef Seitzstraße': 'Josef-Seitz-Straße',
|
|
'Auersthalerstraße': 'Auerstahler Straße',
|
|
'Ollersdorferstraße': 'Ollersdorfer Straße',
|
|
'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße',
|
|
'Spannbergerstraße': 'Spannberger Straße',
|
|
'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße',
|
|
'R. Virchow-Straße': 'Rudolf-Virchow-Straße',
|
|
'Ebenthalerstraße': 'Ebenthaler Straße',
|
|
'Bockfließerstraße': 'Bockfließer Straße',
|
|
'Dörfleserstraße': 'Dörfleser Straße',
|
|
'Dörflesserstraße': 'Dörfleser Straße',
|
|
'Grubere Hauptstraße': 'Gruber Hauptstraße',
|
|
'Groß Inzersdorf': 'Großinzersdorf',
|
|
}
|
|
|
|
|
|
def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
|
|
with open(filename, 'r') as f:
|
|
header: Optional[Tuple[str]] = None
|
|
for line in f:
|
|
if header is None:
|
|
header = tuple([e.strip() for e in line.strip().split(';')])
|
|
continue
|
|
|
|
parts = [e.strip() for e in line.strip().split(';')]
|
|
obj = {}
|
|
for i, part in enumerate(parts):
|
|
if part == '':
|
|
part = None
|
|
elif part[0] == '"' and part[-1] == '"':
|
|
part = part[1:-1]
|
|
elif part == 'T':
|
|
part = True
|
|
elif part == 'F':
|
|
part = False
|
|
elif part.isdigit():
|
|
part = int(part)
|
|
elif re.match('[0-9]+\.[0-9]+', part):
|
|
part = float(part)
|
|
elif len(part) == 10 and part[4] == '-' and part[7] == '-':
|
|
part = datetime.datetime.strptime(part, '%Y-%m-%d').date()
|
|
else:
|
|
raise RuntimeError(part)
|
|
obj[header[i]] = part
|
|
yield obj
|
|
|
|
|
|
def format_row(*args) -> str:
|
|
row = ''
|
|
for arg in args:
|
|
if arg is None:
|
|
pass
|
|
elif type(arg) == str:
|
|
row += f'"{arg}"'
|
|
elif type(arg) == bool:
|
|
row += 'T' if arg else 'F'
|
|
else:
|
|
row += str(arg)
|
|
row += ';'
|
|
return f'{row[:-1]}\n'
|
|
|
|
|
|
def success(mgnr: int, key: str, value: str) -> None:
|
|
if not args.quiet:
|
|
print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def warning(mgnr: int, key: str, value: str) -> None:
|
|
print(f'\x1B[1;33m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def invalid(mgnr: int, key: str, value: str) -> None:
|
|
print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def convert(mgnr: int, key: str, old_value: str, new_value: str) -> None:
|
|
if not args.quiet:
|
|
print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None:
|
|
if not args.quiet:
|
|
print(f'\x1B[1m{mgnr:>6}: '
|
|
f'{" / ".join([e or "" for e in old_name])} -> '
|
|
f'{" / ".join([e or "" for e in new_name])}'
|
|
f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def check_lfbis_nr(nr: str) -> bool:
|
|
# https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41
|
|
if len(nr) != 7 or not nr.isdigit():
|
|
return False
|
|
s = 0
|
|
for i, ch in enumerate(nr[:-1]):
|
|
s += int(ch) * (7 - i)
|
|
v = (11 - (s % 11)) % 10
|
|
return v == int(nr[-1])
|
|
|
|
def check_ustid_at(nr: str) -> bool:
|
|
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
|
|
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
|
|
return False
|
|
s = 0
|
|
for i, ch in enumerate(nr[3:-1]):
|
|
s += sum(map(int, str(int(ch) * (i % 2 + 1))))
|
|
v = (96 - s) % 10
|
|
return v == int(nr[-1])
|
|
|
|
|
|
def iban_checksum(iban: str) -> int:
|
|
if not IBAN_RE.fullmatch(iban):
|
|
raise RuntimeError()
|
|
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
|
|
v = 98 - (int(s) % 97)
|
|
return v
|
|
|
|
|
|
def check_iban(iban: str) -> bool:
|
|
if not IBAN_RE.fullmatch(iban):
|
|
return False
|
|
return iban_checksum(iban) == 97
|
|
|
|
|
|
def generate_iban_at(blz: int, ktonr: str) -> str:
|
|
if blz > 99999 or len(ktonr) > 11:
|
|
raise RuntimeError()
|
|
iban = f'AT00{blz:>05}{ktonr:>011}'
|
|
s = iban_checksum(iban)
|
|
return iban.replace('00', f'{s:02}', 1)
|
|
|
|
|
|
def normalize_phone_nr(nr: str) -> str:
|
|
nr = re.sub('[ /-]', '', nr)
|
|
if nr[0] == '0':
|
|
nr = '+43' + nr[1:]
|
|
return nr
|
|
|
|
|
|
def parse_branches(in_dir: str) -> Dict[str, Any]:
|
|
return {b['ZNR']: b for b in parse_csv(f'{in_dir}/TZweigstellen.csv')}
|
|
|
|
|
|
def migrate_members(in_dir: str, out_dir: str) -> None:
|
|
members = parse_csv(f'{in_dir}/TMitglieder.csv')
|
|
branches = parse_branches(in_dir)
|
|
with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba:
|
|
f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;'
|
|
'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;'
|
|
'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;'
|
|
'country;postal_dest;address;'
|
|
'email;phone_landline;phone_mobile_1;phone_mobile_2;'
|
|
'default_kgnr;comment\n')
|
|
f_mba.write('mgr;name;country;postal_dest;address\n')
|
|
for m in members:
|
|
mgnr: int = m['MGNR']
|
|
family_name: str = m['Nachname']
|
|
given_name: str = m['Vorname']
|
|
prefix: Optional[str] = None
|
|
middle_names: Optional[str] = None
|
|
suffix: Optional[str] = None
|
|
billing_name: Optional[str] = None
|
|
|
|
if family_name is None and given_name is None:
|
|
continue
|
|
|
|
given_name = given_name or ''
|
|
family_name = re.sub('\s+', ' ', family_name).strip()
|
|
given_name = re.sub('\s+', ' ', given_name).strip().replace(', ', ',')
|
|
|
|
if ' ' in family_name or '.' in family_name or ',' in family_name:
|
|
if family_name.endswith(' KG'):
|
|
parts = family_name.split(' ')
|
|
family_name = parts[0].title()
|
|
billing_name = f'{family_name} KG'
|
|
|
|
convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name)
|
|
elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA':
|
|
if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower():
|
|
parts = given_name.split(' ')
|
|
family_name = family_name.title()
|
|
billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}'
|
|
given_name = parts[0].title()
|
|
elif given_name.endswith(' GesBR'):
|
|
family_name = family_name.title()
|
|
given_name = given_name.split(' ')[0].title()
|
|
billing_name = f'{family_name} {given_name} GesBR'
|
|
elif given_name.endswith(' KeG.'):
|
|
family_name = family_name.title()
|
|
given_name = given_name.split(' ')[0].title()
|
|
billing_name = f'{family_name} {given_name} KEG'
|
|
elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'):
|
|
family_name = family_name.title()
|
|
parts = given_name.split(' ')
|
|
suffix = parts[-1].lower()
|
|
if suffix[-1] != '.':
|
|
suffix += '.'
|
|
given_name = parts[0].title()
|
|
elif ',' in given_name:
|
|
family_name = family_name.title()
|
|
parts = given_name.split(',')
|
|
given_name = parts[0].title()
|
|
prefix = ' '.join([p.title() for p in parts[1:]])
|
|
elif given_name.endswith(' DI'):
|
|
family_name = family_name.title()
|
|
given_name = given_name.split(' ')[0].title()
|
|
prefix = 'Dipl.-Ing.'
|
|
elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'):
|
|
family_name = family_name.title()
|
|
parts = given_name.split(' ')
|
|
given_name = parts[0].title()
|
|
prefix = parts[-1].title()
|
|
if prefix[-1] != '.':
|
|
prefix += '.'
|
|
|
|
convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name)
|
|
else:
|
|
family_name = family_name.title()
|
|
given_name = given_name.title()
|
|
|
|
bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None
|
|
if bnr is not None:
|
|
bnr = bnr.replace('.', '')
|
|
if len(bnr) == 10:
|
|
bnr = bnr.removesuffix('000')
|
|
elif len(bnr) == 6:
|
|
bnr = '0' + bnr
|
|
if not check_lfbis_nr(bnr):
|
|
if bnr == '1234567':
|
|
warning(mgnr, 'BetriebsNr.', bnr)
|
|
else:
|
|
invalid(mgnr, 'BetriebsNr.', bnr)
|
|
bnr = None
|
|
|
|
|
|
ustid: Optional[str] = m['UID']
|
|
if ustid is not None:
|
|
ustid = ustid.replace(' ', '')
|
|
if len(ustid) == 8 and ustid.isdigit():
|
|
ustid = 'ATU' + ustid
|
|
elif not USTID_RE.fullmatch(ustid):
|
|
invalid(mgnr, 'UID', ustid)
|
|
ustid = None
|
|
if ustid and not check_ustid_at(ustid):
|
|
if ustid == 'ATU11111111':
|
|
warning(mgnr, 'UID', ustid)
|
|
else:
|
|
invalid(mgnr, 'UID', ustid)
|
|
ustid = None
|
|
|
|
iban: Optional[str] = m['IBAN']
|
|
bic: Optional[str] = m['BIC']
|
|
blz: Optional[int] = m['BLZ']
|
|
ktonr: Optional[str] = m['KontoNr']
|
|
|
|
if iban is None:
|
|
pass
|
|
|
|
if iban is not None:
|
|
iban = iban.replace(' ', '')
|
|
if not check_iban(iban):
|
|
invalid(mgnr, 'IBAN', iban)
|
|
iban = None
|
|
|
|
if iban is None and blz and ktonr:
|
|
iban = generate_iban_at(blz, re.sub('[. -]', '', ktonr))
|
|
success(mgnr, 'IBAN', f'{iban} ({blz}, {ktonr})')
|
|
|
|
if bic is not None:
|
|
bic = bic.upper()
|
|
if bic == 'RLNWATAUE':
|
|
bic = 'RLNWATWWAUE'
|
|
if not BIC_RE.fullmatch(bic):
|
|
invalid(mgnr, 'BIC', bic)
|
|
bic = None
|
|
if bic is not None:
|
|
if len(bic) == 11 and bic.endswith('XXX'):
|
|
bic = bic[:-3]
|
|
|
|
ort: Optional[str] = m['Ort']
|
|
address: Optional[str] = m['Straße']
|
|
if address is not None:
|
|
address_old = address
|
|
address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), re.sub('\s+', ' ', address).strip().title())
|
|
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
|
|
.replace('Strasse', 'Straße').replace('Str.', 'Straße')\
|
|
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
|
|
.replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\
|
|
.replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\
|
|
.replace('Haupstraße', 'Hauptstraße')
|
|
address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address)
|
|
if address.startswith('Nr. ') or address.startswith('Nr ') or address.isdigit():
|
|
address = ort.title() + ' ' + address.split(' ')[-1]
|
|
elif address.startswith('Ob. '):
|
|
address = address.replace('Ob. ', 'Obere ', 1)
|
|
address = address.replace(' Nr. ', ' ')
|
|
address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address)
|
|
if address_old != address:
|
|
convert(mgnr, 'Adresse', address_old, address)
|
|
|
|
phone_1: Optional[str] = m['Telefon']
|
|
phone_2: Optional[str] = m['Mobiltelefon']
|
|
email: Optional[str] = m['EMail']
|
|
phone_landline = None
|
|
phone_mobile = []
|
|
|
|
if email is not None:
|
|
if email.isupper():
|
|
email = email.lower()
|
|
if not EMAIL_RE.fullmatch(email):
|
|
invalid(mgnr, 'E-Mail', m['EMail'])
|
|
email = None
|
|
|
|
if phone_1:
|
|
phone_1 = normalize_phone_nr(phone_1)
|
|
if len(phone_1) <= 8 or phone_1[0] != '+':
|
|
invalid(mgnr, 'Tel.Nr.', m['Telefon'])
|
|
else:
|
|
if phone_1[3] == '6':
|
|
phone_mobile.append(phone_1)
|
|
else:
|
|
phone_landline = phone_1
|
|
if phone_2:
|
|
phone_2 = normalize_phone_nr(phone_2)
|
|
if len(phone_2) <= 8 or phone_2[0] != '+':
|
|
invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon'])
|
|
else:
|
|
if phone_2[3] == '6':
|
|
phone_mobile.append(phone_2)
|
|
elif phone_landline is None:
|
|
phone_landline = phone_2
|
|
elif phone_landline != phone_2:
|
|
invalid(mgnr, 'Tel.Nr.', phone_2)
|
|
|
|
zwstid = m['ZNR'] and branches[m['ZNR']]['Kennbst'] or len(branches) == 1 and list(branches.values())[0]['Kennbst']
|
|
|
|
f_m.write(format_row(
|
|
mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix,
|
|
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'],
|
|
m['BHKontonummer'], zwstid, bnr, ustid,
|
|
m['Volllieferant'] or False, m['Buchführend'] or False, False, m['Aktives Mitglied'] or False,
|
|
iban, bic, 'AT', None, address, email, phone_landline,
|
|
phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None,
|
|
None, m['Anmerkung']
|
|
))
|
|
if billing_name:
|
|
f_mba.write(format_row(mgnr, billing_name, 'AT', None, None))
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('in_dir')
|
|
parser.add_argument('out_dir')
|
|
parser.add_argument('-q', '--quiet', action='store_true', default=False)
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.out_dir, exist_ok=True)
|
|
|
|
migrate_members(args.in_dir, args.out_dir)
|