#!/bin/env python3 from typing import Dict, Any, Tuple, Optional, Iterator import argparse import datetime import os import re import sys USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: with open(filename, 'r') as f: header: Optional[Tuple[str]] = None for line in f: if header is None: header = tuple([e.strip() for e in line.strip().split(';')]) continue parts = [e.strip() for e in line.strip().split(';')] obj = {} for i, part in enumerate(parts): if part == '': part = None elif part[0] == '"' and part[-1] == '"': part = part[1:-1] elif part == 'T': part = True elif part == 'F': part = False elif part.isdigit(): part = int(part) elif re.match('\d+\.\d+', part): part = float(part) elif len(part) == 10 and part[4] == '-' and part[7] == '-': part = datetime.datetime.strptime(part, '%Y-%m-%d').date() else: raise RuntimeError(part) obj[header[i]] = part yield obj def format_row(*args) -> str: row = '' for arg in args: if arg is None: pass elif type(arg) == str: row += f'"{arg}"' elif type(arg) == bool: row += 'T' if arg else 'F' else: row += str(arg) row += ';' return f'{row[:-1]}\n' def success(mgnr: int, key: str, value: str) -> None: print(f'\x1B[1;32m{mgnr:>5}: {key} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value: str) -> None: print(f'\x1B[1;33m{mgnr:>5}: {key} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value: str) -> None: print(f'\x1B[1;31m{mgnr:>5}: {key} {value}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def iban_checksum(iban: str) -> int: if not IBAN_RE.fullmatch(iban): raise RuntimeError() s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) v = 98 - (int(s) % 97) return v def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False return iban_checksum(iban) == 97 def generate_iban_at(blz: int, ktonr: str) -> str: iban = f'AT00{blz:>05}{ktonr:>011}' s = iban_checksum(iban) return iban.replace('00', f'{s:02}', 1) def parse_branches(in_dir: str) -> Dict[str, Any]: branches = {} for b in parse_csv(f'{in_dir}/TZweigstellen.csv'): branches[b['ZNR']] = b return branches def migrate_members(in_dir: str, out_dir: str) -> None: members = parse_csv(f'{in_dir}/TMitglieder.csv') branches = parse_branches(in_dir) with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba: f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' 'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' 'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' 'country;postal_dest;address;email;phone_landline;phone_mobile;default_kgnr;comment\n') f_mba.write('mgr;name;country;postal_dest;address\n') for m in members: mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None if family_name is None and given_name is None: continue given_name = given_name or '' family_name = re.sub('\s+', ' ', family_name).strip() given_name = re.sub('\s+', ' ', given_name).strip().replace(', ', ',') if ' ' in family_name or '.' in family_name or ',' in family_name: print('') print(f'{mgnr}: {family_name} / {given_name}') if family_name.endswith(' KG'): parts = family_name.split(' ') family_name = parts[0].title() billing_name = f'{family_name} KG' print(f' -> {prefix or ""} / {given_name or ""} / {middle_names or ""} / {family_name or ""} / {suffix or ""}') if billing_name: print(f' -> {billing_name}') print('') elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': print('') print(f'{mgnr}: {family_name} / {given_name}') if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): parts = given_name.split(' ') family_name = family_name.title() billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' given_name = parts[0].title() elif given_name.endswith(' GesBR'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} GesBR' elif given_name.endswith(' KeG.'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} KEG' elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): family_name = family_name.title() parts = given_name.split(' ') suffix = parts[-1].lower() if suffix[-1] != '.': suffix += '.' given_name = parts[0].title() elif ',' in given_name: family_name = family_name.title() parts = given_name.split(',') given_name = parts[0].title() prefix = ' '.join([p.title() for p in parts[1:]]) elif given_name.endswith(' DI'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() prefix = 'Dipl.-Ing.' elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): family_name = family_name.title() parts = given_name.split(' ') given_name = parts[0].title() prefix = parts[-1].title() if prefix[-1] != '.': prefix += '.' if prefix: prefix = prefix.replace('Dipl.Ing.', 'Dipl.-Ing.') print(f' -> {prefix or ""} / {given_name or ""} / {middle_names or ""} / {family_name or ""} / {suffix or ""}') if billing_name: print(f' -> {billing_name}') print('') else: family_name = family_name.title() given_name = given_name.title() bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr if not check_lfbis_nr(bnr): if bnr == '1234567': warning(mgnr, 'Betriebsnr.', bnr) else: invalid(mgnr, 'Betriebsnr.', bnr) bnr = None ustid: Optional[str] = m['UID'] if ustid is not None: ustid = ustid.replace(' ', '') if len(ustid) == 8 and ustid.isdigit(): ustid = 'ATU' + ustid elif not USTID_RE.fullmatch(ustid): invalid(mgnr, 'UID', ustid) ustid = None if ustid and not check_ustid_at(ustid): if ustid == 'ATU11111111': warning(mgnr, 'UID', ustid) else: invalid(mgnr, 'UID', ustid) ustid = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] ktonr: Optional[str] = m['KontoNr'] if iban is None: pass if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban) iban = None if iban is None and blz and ktonr: iban = generate_iban_at(blz, re.sub('[. -]', '', ktonr)) success(mgnr, 'IBAN', f'{iban} ({blz}, {ktonr})') if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic) bic = None #print(m) f_m.write(format_row( mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'], m['BHKontonummer'], m['ZNR'] and branches[m['ZNR']]['Kennbst'], bnr, ustid, m['Volllieferant'] or False, m['Buchführend'] or False, False, m['Aktives Mitglied'] or False, iban, bic, 'AT', )) if billing_name: f_mba.write(format_row(mgnr, billing_name, 'AT', None, None)) if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('in_dir') parser.add_argument('out_dir') args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) migrate_members(args.in_dir, args.out_dir)