From b48b3f602cae6cf01e81b4df53450632a05a7d08 Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Thu, 23 Feb 2023 17:34:19 +0100 Subject: [PATCH] Migrate: members --- sql/v01/01.create.sql | 6 +- wgmaster/{wgexport.py => export.py} | 0 wgmaster/{wgimport.py => import.py} | 0 wgmaster/migrate.py | 286 ++++++++++++++++++++++++++++ wgmaster/wgclean.py | 12 -- 5 files changed, 289 insertions(+), 15 deletions(-) rename wgmaster/{wgexport.py => export.py} (100%) rename wgmaster/{wgimport.py => import.py} (100%) create mode 100755 wgmaster/migrate.py delete mode 100755 wgmaster/wgclean.py diff --git a/sql/v01/01.create.sql b/sql/v01/01.create.sql index e5a7c96..72633cb 100644 --- a/sql/v01/01.create.sql +++ b/sql/v01/01.create.sql @@ -257,15 +257,15 @@ CREATE TABLE member ( accounting_nr TEXT DEFAULT NULL, zwstid TEXT NOT NULL, - betriebsnr TEXT DEFAULT NULL, + lfbis_nr TEXT CHECK (lfbis_nr REGEXP '^[0-9]{7}$') DEFAULT NULL, ustid TEXT CHECK (ustid REGEXP '^[A-Z]{2}[A-Z0-9]{2,12}$') DEFAULT NULL, volllieferant INTEGER NOT NULL CHECK (volllieferant IN (FALSE, TRUE)) DEFAULT FALSE, buchführend INTEGER NOT NULL CHECK (buchführend IN (FALSE, TRUE)) DEFAULT FALSE, funktionär INTEGER NOT NULL CHECK (funktionär IN (FALSE, TRUE)) DEFAULT FALSE, active INTEGER NOT NULL CHECK (active IN (FALSE, TRUE)) DEFAULT TRUE, - iban TEXT CHECK (iban REGEXP '^[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}$') DEFAULT NULL, - bic TEXT CHECK (bic REGEXP '^[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}[A-Z0-9]{3}?$') DEFAULT NULL, + iban TEXT CHECK (iban REGEXP '^[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}$') DEFAULT NULL, + bic TEXT CHECK (bic REGEXP '^[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$') DEFAULT NULL, country TEXT NOT NULL, postal_dest TEXT NOT NULL, diff --git a/wgmaster/wgexport.py b/wgmaster/export.py similarity index 100% rename from wgmaster/wgexport.py rename to wgmaster/export.py diff --git a/wgmaster/wgimport.py b/wgmaster/import.py similarity index 100% rename from wgmaster/wgimport.py rename to wgmaster/import.py diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py new file mode 100755 index 0000000..54745a3 --- /dev/null +++ b/wgmaster/migrate.py @@ -0,0 +1,286 @@ +#!/bin/env python3 + +from typing import Dict, Any, Tuple, Optional, Iterator +import argparse +import datetime +import os +import re +import sys + + +USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}') +BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') +IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') + + +def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: + with open(filename, 'r') as f: + header: Optional[Tuple[str]] = None + for line in f: + if header is None: + header = tuple([e.strip() for e in line.strip().split(';')]) + continue + + parts = [e.strip() for e in line.strip().split(';')] + obj = {} + for i, part in enumerate(parts): + if part == '': + part = None + elif part[0] == '"' and part[-1] == '"': + part = part[1:-1] + elif part == 'T': + part = True + elif part == 'F': + part = False + elif part.isdigit(): + part = int(part) + elif re.match('\d+\.\d+', part): + part = float(part) + elif len(part) == 10 and part[4] == '-' and part[7] == '-': + part = datetime.datetime.strptime(part, '%Y-%m-%d').date() + else: + raise RuntimeError(part) + obj[header[i]] = part + yield obj + + +def format_row(*args) -> str: + row = '' + for arg in args: + if arg is None: + pass + elif type(arg) == str: + row += f'"{arg}"' + elif type(arg) == bool: + row += 'T' if arg else 'F' + else: + row += str(arg) + row += ';' + return f'{row[:-1]}\n' + + +def success(mgnr: int, key: str, value: str) -> None: + print(f'\x1B[1;32m{mgnr:>5}: {key} {value}\x1B[0m', file=sys.stderr) + + +def warning(mgnr: int, key: str, value: str) -> None: + print(f'\x1B[1;33m{mgnr:>5}: {key} {value}\x1B[0m', file=sys.stderr) + + +def invalid(mgnr: int, key: str, value: str) -> None: + print(f'\x1B[1;31m{mgnr:>5}: {key} {value}\x1B[0m', file=sys.stderr) + + +def check_lfbis_nr(nr: str) -> bool: + # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 + if len(nr) != 7 or not nr.isdigit(): + return False + s = 0 + for i, ch in enumerate(nr[:-1]): + s += int(ch) * (7 - i) + v = (11 - (s % 11)) % 10 + return v == int(nr[-1]) + +def check_ustid_at(nr: str) -> bool: + # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml + if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): + return False + s = 0 + for i, ch in enumerate(nr[3:-1]): + s += sum(map(int, str(int(ch) * (i % 2 + 1)))) + v = (96 - s) % 10 + return v == int(nr[-1]) + + +def iban_checksum(iban: str) -> int: + if not IBAN_RE.fullmatch(iban): + raise RuntimeError() + s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) + v = 98 - (int(s) % 97) + return v + + +def check_iban(iban: str) -> bool: + if not IBAN_RE.fullmatch(iban): + return False + return iban_checksum(iban) == 97 + + +def generate_iban_at(blz: int, ktonr: str) -> str: + iban = f'AT00{blz:>05}{ktonr:>011}' + s = iban_checksum(iban) + return iban.replace('00', f'{s:02}', 1) + + +def parse_branches(in_dir: str) -> Dict[str, Any]: + branches = {} + for b in parse_csv(f'{in_dir}/TZweigstellen.csv'): + branches[b['ZNR']] = b + return branches + + +def migrate_members(in_dir: str, out_dir: str) -> None: + members = parse_csv(f'{in_dir}/TMitglieder.csv') + branches = parse_branches(in_dir) + with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba: + f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' + 'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' + 'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' + 'country;postal_dest;address;email;phone_landline;phone_mobile;default_kgnr;comment\n') + f_mba.write('mgr;name;country;postal_dest;address\n') + for m in members: + mgnr: int = m['MGNR'] + family_name: str = m['Nachname'] + given_name: str = m['Vorname'] + prefix: Optional[str] = None + middle_names: Optional[str] = None + suffix: Optional[str] = None + billing_name: Optional[str] = None + + if family_name is None and given_name is None: + continue + + given_name = given_name or '' + family_name = re.sub('\s+', ' ', family_name).strip() + given_name = re.sub('\s+', ' ', given_name).strip().replace(', ', ',') + + if ' ' in family_name or '.' in family_name or ',' in family_name: + print('') + print(f'{mgnr}: {family_name} / {given_name}') + + if family_name.endswith(' KG'): + parts = family_name.split(' ') + family_name = parts[0].title() + billing_name = f'{family_name} KG' + + print(f' -> {prefix or ""} / {given_name or ""} / {middle_names or ""} / {family_name or ""} / {suffix or ""}') + if billing_name: + print(f' -> {billing_name}') + print('') + elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': + print('') + print(f'{mgnr}: {family_name} / {given_name}') + + if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): + parts = given_name.split(' ') + family_name = family_name.title() + billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' + given_name = parts[0].title() + elif given_name.endswith(' GesBR'): + family_name = family_name.title() + given_name = given_name.split(' ')[0].title() + billing_name = f'{family_name} {given_name} GesBR' + elif given_name.endswith(' KeG.'): + family_name = family_name.title() + given_name = given_name.split(' ')[0].title() + billing_name = f'{family_name} {given_name} KEG' + elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): + family_name = family_name.title() + parts = given_name.split(' ') + suffix = parts[-1].lower() + if suffix[-1] != '.': + suffix += '.' + given_name = parts[0].title() + elif ',' in given_name: + family_name = family_name.title() + parts = given_name.split(',') + given_name = parts[0].title() + prefix = ' '.join([p.title() for p in parts[1:]]) + elif given_name.endswith(' DI'): + family_name = family_name.title() + given_name = given_name.split(' ')[0].title() + prefix = 'Dipl.-Ing.' + elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): + family_name = family_name.title() + parts = given_name.split(' ') + given_name = parts[0].title() + prefix = parts[-1].title() + if prefix[-1] != '.': + prefix += '.' + + if prefix: + prefix = prefix.replace('Dipl.Ing.', 'Dipl.-Ing.') + print(f' -> {prefix or ""} / {given_name or ""} / {middle_names or ""} / {family_name or ""} / {suffix or ""}') + if billing_name: + print(f' -> {billing_name}') + print('') + else: + family_name = family_name.title() + given_name = given_name.title() + + bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None + if bnr is not None: + bnr = bnr.replace('.', '') + if len(bnr) == 10: + bnr = bnr.removesuffix('000') + elif len(bnr) == 6: + bnr = '0' + bnr + if not check_lfbis_nr(bnr): + if bnr == '1234567': + warning(mgnr, 'Betriebsnr.', bnr) + else: + invalid(mgnr, 'Betriebsnr.', bnr) + bnr = None + + + ustid: Optional[str] = m['UID'] + if ustid is not None: + ustid = ustid.replace(' ', '') + if len(ustid) == 8 and ustid.isdigit(): + ustid = 'ATU' + ustid + elif not USTID_RE.fullmatch(ustid): + invalid(mgnr, 'UID', ustid) + ustid = None + if ustid and not check_ustid_at(ustid): + if ustid == 'ATU11111111': + warning(mgnr, 'UID', ustid) + else: + invalid(mgnr, 'UID', ustid) + ustid = None + + iban: Optional[str] = m['IBAN'] + bic: Optional[str] = m['BIC'] + blz: Optional[int] = m['BLZ'] + ktonr: Optional[str] = m['KontoNr'] + + if iban is None: + pass + + if iban is not None: + iban = iban.replace(' ', '') + if not check_iban(iban): + invalid(mgnr, 'IBAN', iban) + iban = None + + if iban is None and blz and ktonr: + iban = generate_iban_at(blz, re.sub('[. -]', '', ktonr)) + success(mgnr, 'IBAN', f'{iban} ({blz}, {ktonr})') + + if bic is not None: + bic = bic.upper() + if bic == 'RLNWATAUE': + bic = 'RLNWATWWAUE' + if not BIC_RE.fullmatch(bic): + invalid(mgnr, 'BIC', bic) + bic = None + + #print(m) + f_m.write(format_row( + mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix, + m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'], + m['BHKontonummer'], m['ZNR'] and branches[m['ZNR']]['Kennbst'], bnr, ustid, + m['Volllieferant'] or False, m['Buchführend'] or False, False, m['Aktives Mitglied'] or False, + iban, bic, 'AT', + )) + if billing_name: + f_mba.write(format_row(mgnr, billing_name, 'AT', None, None)) + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('in_dir') + parser.add_argument('out_dir') + args = parser.parse_args() + + os.makedirs(args.out_dir, exist_ok=True) + + migrate_members(args.in_dir, args.out_dir) diff --git a/wgmaster/wgclean.py b/wgmaster/wgclean.py deleted file mode 100755 index e8c25d9..0000000 --- a/wgmaster/wgclean.py +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/env python3 - -import argparse - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('in_dir') - parser.add_argument('out_dir') - args = parser.parse_args() - - # TODO