#!/bin/env python3 from typing import Dict, Any, Tuple, Optional, Iterator, List import argparse import datetime import os import re import sys import sqlite3 DB_CNX: Optional[sqlite3.Connection] = None USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') EMAIL_RE = re.compile('[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}') STREET_NAMES = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', 'Kurhaus-Str.': 'Kurhausstraße', 'Kurhaus-Straße': 'Kurhausstraße', 'Pirawartherstraße': 'Pirawarther Straße', 'Raggendorferstraße': 'Raggendorfer Straße', 'Matznerstraße': 'Matzner Straße', 'Stillfriederstraße': 'Stillfrieder Straße', 'Harraserstraße': 'Harraser Straße', 'Gänserndorferstraße': 'Gänserdorfer Straße', 'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße', 'Sulzerstraße': 'Sulzer Straße', 'Brünnerstraße': 'Brünner Straße', 'Flustraße': 'Flurstraße', 'Wienerstraße': 'Wiener Straße', 'St.Laurentstraße': 'St.-Laurentstraße', 'Angernerstraße': 'Angerner Straße', 'Schweinbartherstraße': 'Schweinbarther Straße', 'Hohenruppersdorferstraße': 'Hohenruppersdorfer Straße', 'Gruberhauptstraße': 'Gruber Hauptstraße', 'Josef Seitzstraße': 'Josef-Seitz-Straße', 'Auersthalerstraße': 'Auerstahler Straße', 'Ollersdorferstraße': 'Ollersdorfer Straße', 'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße', 'Spannbergerstraße': 'Spannberger Straße', 'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße', 'R. Virchow-Straße': 'Rudolf-Virchow-Straße', 'Ebenthalerstraße': 'Ebenthaler Straße', 'Bockfließerstraße': 'Bockfließer Straße', 'Dörfleserstraße': 'Dörfleser Straße', 'Dörflesserstraße': 'Dörfleser Straße', 'Grubere Hauptstraße': 'Gruber Hauptstraße', 'Groß Inzersdorf': 'Großinzersdorf', } def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: with open(filename, 'r') as f: header: Optional[Tuple[str]] = None for line in f: if header is None: header = tuple([e.strip() for e in line.strip().split(';')]) continue parts = [e.strip() for e in line.strip().split(';')] obj = {} for i, part in enumerate(parts): if part == '': part = None elif part[0] == '"' and part[-1] == '"': part = part[1:-1] elif part == 'T': part = True elif part == 'F': part = False elif part.isdigit(): part = int(part) elif re.match('[0-9]+\.[0-9]+', part): part = float(part) elif len(part) == 10 and part[4] == '-' and part[7] == '-': part = datetime.datetime.strptime(part, '%Y-%m-%d').date() else: raise RuntimeError(part) obj[header[i]] = part yield obj def format_row(*args) -> str: row = '' for arg in args: if arg is None: pass elif type(arg) == str: row += f'"{arg}"' elif type(arg) == bool: row += 'T' if arg else 'F' else: row += str(arg) row += ';' return f'{row[:-1]}\n' def success(mgnr: int, key: str, value: str) -> None: if not args.quiet: print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value: str) -> None: print(f'\x1B[1;33m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value: str) -> None: print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def convert(mgnr: int, key: str, old_value: str, new_value: str) -> None: if not args.quiet: print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: if not args.quiet: print(f'\x1B[1m{mgnr:>6}: ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def iban_checksum(iban: str) -> int: if not IBAN_RE.fullmatch(iban): raise RuntimeError() s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) v = 98 - (int(s) % 97) return v def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False return iban_checksum(iban) == 97 def generate_iban_at(blz: int, ktonr: str) -> str: if blz > 99999 or len(ktonr) > 11: raise RuntimeError() iban = f'AT00{blz:>05}{ktonr:>011}' s = iban_checksum(iban) return iban.replace('00', f'{s:02}', 1) def normalize_phone_nr(nr: str) -> str: nr = re.sub('[ /-]', '', nr) if nr[0] == '0': nr = '+43' + nr[1:] return nr def parse_branches(in_dir: str) -> Dict[str, Any]: return {b['ZNR']: b for b in parse_csv(f'{in_dir}/TZweigstellen.csv')} def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() ort_m = ort.lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] parts = address.split(' ') street = parts[:-1] nr = int(parts[-1].split('-')[0]) if ort == 'VELM-GÖTZENDORF': if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): # Velm return plz * 100000 + 3572 else: # Götzendorf return plz * 100000 + 3571 print(ort, address) raise RuntimeError() def lookup_kgnr(okz: Optional[int]) -> Optional[int]: if okz is None: return None cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr FROM AT_ort o JOIN wb_kg k ON k.kgnr = o.kgnr WHERE okz = ?", (okz,)) rows: List[Tuple[int]] = cur.fetchall() cur.close() if len(rows) == 1: return rows[0][0] return None def migrate_members(in_dir: str, out_dir: str) -> None: members = parse_csv(f'{in_dir}/TMitglieder.csv') branches = parse_branches(in_dir) with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba: f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' 'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' 'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' 'country;postal_dest;address;' 'email;phone_landline;phone_mobile_1;phone_mobile_2;' 'default_kgnr;comment\n') f_mba.write('mgr;name;country;postal_dest;address\n') for m in members: mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None if family_name is None and given_name is None: continue given_name = given_name or '' family_name = re.sub('\s+', ' ', family_name).strip() given_name = re.sub('\s+', ' ', given_name).strip().replace(', ', ',') if ' ' in family_name or '.' in family_name or ',' in family_name: if family_name.endswith(' KG'): parts = family_name.split(' ') family_name = parts[0].title() billing_name = f'{family_name} KG' convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): parts = given_name.split(' ') family_name = family_name.title() billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' given_name = parts[0].title() elif given_name.endswith(' GesBR'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} GesBR' elif given_name.endswith(' KeG.'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} KEG' elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): family_name = family_name.title() parts = given_name.split(' ') suffix = parts[-1].lower() if suffix[-1] != '.': suffix += '.' given_name = parts[0].title() elif ',' in given_name: family_name = family_name.title() parts = given_name.split(',') given_name = parts[0].title() prefix = ' '.join([p.title() for p in parts[1:]]) elif given_name.endswith(' DI'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() prefix = 'Dipl.-Ing.' elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): family_name = family_name.title() parts = given_name.split(' ') given_name = parts[0].title() prefix = parts[-1].title() if prefix[-1] != '.': prefix += '.' convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) else: family_name = family_name.title() given_name = given_name.title() bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr if not check_lfbis_nr(bnr): if bnr == '1234567': warning(mgnr, 'BetriebsNr.', bnr) else: invalid(mgnr, 'BetriebsNr.', bnr) bnr = None ustid: Optional[str] = m['UID'] if ustid is not None: ustid = ustid.replace(' ', '') if len(ustid) == 8 and ustid.isdigit(): ustid = 'ATU' + ustid elif not USTID_RE.fullmatch(ustid): invalid(mgnr, 'UID', ustid) ustid = None if ustid and not check_ustid_at(ustid): if ustid == 'ATU11111111': warning(mgnr, 'UID', ustid) else: invalid(mgnr, 'UID', ustid) ustid = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] ktonr: Optional[str] = m['KontoNr'] if iban is None: pass if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban) iban = None if iban is None and blz and ktonr: iban = generate_iban_at(blz, re.sub('[. -]', '', ktonr)) success(mgnr, 'IBAN', f'{iban} ({blz}, {ktonr})') if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic) bic = None if bic is not None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] if address is not None: address_old = address address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), re.sub('\s+', ' ', address).strip().title()) address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\ .replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\ .replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\ .replace('Haupstraße', 'Hauptstraße') address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address) if address.startswith('Nr. ') or address.startswith('Nr ') or address.isdigit(): address = ort.title() + ' ' + address.split(' ')[-1] elif address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) phone_1: Optional[str] = m['Telefon'] phone_2: Optional[str] = m['Mobiltelefon'] email: Optional[str] = m['EMail'] phone_landline = None phone_mobile = [] if email is not None: if email.isupper(): email = email.lower() if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail']) email = None if phone_1: phone_1 = normalize_phone_nr(phone_1) if len(phone_1) <= 8 or phone_1[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Telefon']) else: if phone_1[3] == '6': phone_mobile.append(phone_1) else: phone_landline = phone_1 if phone_2: phone_2 = normalize_phone_nr(phone_2) if len(phone_2) <= 8 or phone_2[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon']) else: if phone_2[3] == '6': phone_mobile.append(phone_2) elif phone_landline is None: phone_landline = phone_2 elif phone_landline != phone_2: invalid(mgnr, 'Tel.Nr.', phone_2) zwstid = m['ZNR'] and branches[m['ZNR']]['Kennbst'] or len(branches) == 1 and list(branches.values())[0]['Kennbst'] postal_dest = lookup_plz(int(m['PLZ']) if m['PLZ'] else None, m['Ort'], address) okz = postal_dest % 100000 if postal_dest else None kgnr = lookup_kgnr(okz) if kgnr is None: invalid(mgnr, 'KgNr', ort) f_m.write(format_row( mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid, m['Volllieferant'] or False, m['Buchführend'] or False, False, m['Aktives Mitglied'] or False, iban, bic, 'AT', postal_dest, address, email, phone_landline, phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None, kgnr, m['Anmerkung'] )) if billing_name: f_mba.write(format_row(mgnr, billing_name, 'AT', None, None)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('in_dir') parser.add_argument('out_dir') parser.add_argument('-q', '--quiet', action='store_true', default=False) parser.add_argument('-d', '--database', required=True) args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) DB_CNX = sqlite3.connect(args.database) migrate_members(args.in_dir, args.out_dir) DB_CNX.close()