#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Dict, Any, Tuple, Optional, List, Iterable import argparse import os import re import sys import sqlite3 import requests import datetime import utils DB_CNX: Optional[sqlite3.Connection] = None QUIET: bool = False WG: Optional[str] = None USTID_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}') GRADATION_MAP: Optional[Dict[float, float]] = None CULTIVATION_MAP: Optional[Dict[int, str]] = None BRANCH_MAP: Optional[Dict[int, str]] = None GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None REED_MAP: Optional[Dict[int, Tuple[int, int]]] = None GROSSLAGE_MAP: Optional[Dict[int, int]] = None MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None QUAL_MAP: Dict[int, str] = { 0: 'WEI', 1: 'RSW', 2: 'LDW', 3: 'QUW', 4: 'KAB', 5: 'SPL', } # TODO GWK streetnames STREET_NAMES: Dict[str, str] = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', 'Kurhaus-Str.': 'Kurhausstraße', 'Kurhaus-Straße': 'Kurhausstraße', 'Pirawartherstraße': 'Pirawarther Straße', 'Raggendorferstraße': 'Raggendorfer Straße', 'Matznerstraße': 'Matzner Straße', 'Stillfriederstraße': 'Stillfrieder Straße', 'Harraserstraße': 'Harraser Straße', 'Gänserndorferstraße': 'Gänserdorfer Straße', 'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße', 'Sulzerstraße': 'Sulzer Straße', 'Brünnerstraße': 'Brünner Straße', 'Flustraße': 'Flurstraße', 'Wienerstraße': 'Wiener Straße', 'St.Laurentstraße': 'St.-Laurentstraße', 'Angernerstraße': 'Angerner Straße', 'Schweinbartherstraße': 'Schweinbarther Straße', 'Hohenruppersdorferstraße': 'Hohenruppersdorfer Straße', 'Gruberhauptstraße': 'Gruber Hauptstraße', 'Josef Seitzstraße': 'Josef-Seitz-Straße', 'Auersthalerstraße': 'Auerstahler Straße', 'Ollersdorferstraße': 'Ollersdorfer Straße', 'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße', 'Spannbergerstraße': 'Spannberger Straße', 'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße', 'R. Virchow-Straße': 'Rudolf-Virchow-Straße', 'Ebenthalerstraße': 'Ebenthaler Straße', 'Bockfließerstraße': 'Bockfließer Straße', 'Dörfleserstraße': 'Dörfleser Straße', 'Dörflesserstraße': 'Dörfleser Straße', 'Grubere Hauptstraße': 'Gruber Hauptstraße', 'Groß Inzersdorf': 'Großinzersdorf', } def success(mgnr: int, key: str, value) -> None: if not QUIET: print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None: if not QUIET: print(f'\x1B[1m{lsnr_1:<14} -> {lsnr_2:<14}\x1B[0m') def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def convert(mgnr: int, key: str, old_value: str, new_value) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6}: ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def modulo(a: str, b: int) -> int: s = 0 for ch in a: s = (s * 10 + int(ch)) % b return s def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) return modulo(s, 97) == 1 def normalize_phone_nr(nr: Optional[str]) -> Optional[str]: if nr is None: return None nr = nr.replace('/', ' ').strip() if nr[0] == '0': nr = '+43 ' + nr[1:] if nr.startswith('+43'): if nr[4] == '6': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}' elif nr[4] == '2': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}' return nr def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/') if r.status_code != 200: return None data = r.json() return sum([n['fl'] for n in data['properties']['nutzungen']]) def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: members = {} for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if f['MGNR'] not in members: members[f['MGNR']] = {} members[f['MGNR']][f['FBNR']] = f return members def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None ort = ort.replace('0', 'O').replace('SZ', 'SS') if ort.upper() == 'PILLICHSDORF' and plz == 2212: plz = 2211 elif ort.upper() == 'ENZERSFELD' and plz == 2203: plz = 2202 elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212: ort = 'GROSSENGERSDORF' elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123: plz = 2122 elif ort.upper() == 'FRAUENDORF' and plz == 3710: plz = 3714 elif ort.upper() == 'MAISSAU' and plz == 3721: ort = 'UNTERDÜRNBACH' elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074: plz = 2070 elif ort.upper() == 'DROSENDORF' and plz == 2095: ort = 'DROSENDORF ALTSTADT' elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033: plz = 2023 cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) > 1: rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] if ort == 'VELM-GÖTZENDORF': parts = address.split(' ') street = parts[:-1] nr = int(parts[-1].split('-')[0]) if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): # Velm return plz * 100000 + 3572 else: # Götzendorf return plz * 100000 + 3571 raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})') def lookup_kgnr(okz: Optional[int]) -> Optional[int]: if okz is None: return None cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,)) rows: List[Tuple[int]] = cur.fetchall() cur.close() if len(rows) == 1: return rows[0][0] return None def lookup_gem_name(name: str) -> List[Tuple[int, int]]: gem_name, hkid = None, None if WG == 'MATZEN': hkid = "'WLWV'" if name.lower() == 'dörfles': gem_name = 'Weikendorf' elif name.lower() == 'velm-götzendorf': return [(6027, 30859), (6007, 30859)] elif name.lower() == 'grub': name = 'Grub an der March' elif WG == 'GWK': hkid = "'WLWV', 'WIEN', 'WLWG', 'WLWA'" if name.endswith('*'): # TODO do something with * name = name[:-1].strip() if name.lower() == 'kreuttal': return [(15206, 31627), (15221, 31627), (15226, 31627)] elif name.lower() == 'hochleithen': return [(15219, 31622), (15223, 31622), (15202, 31622)] elif name.lower() == 'wolfpassing': gem_name = 'Hochleithen' elif name.lower() == 'seebarn': gem_name = 'Harmannsdorf' elif name.lower() == 'königsbrunn': gem_name = 'Enzersfeld im Weinviertel' elif name.lower() == 'wien': return [(1616, 90001), (1617, 90001)] elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'): gem_name = 'Sitzendorf an der Schmida' elif name.lower() == 'dietersdorf': gem_name = 'Hollabrunn' elif name.lower() == 'altenmarkt': name = 'Altenmarkt im Thale' elif name.lower() == 'eitzerstal': name = 'Eitzersthal' elif name.lower() == 'gross': gem_name = 'Hollabrunn' elif name.lower() == 'auggenthal': name = 'Augenthal' elif name.lower() == 'karlsdorf': name = 'Pfaffendorf' elif name.lower() == 'kleinhaugsdorf': name = 'Augenthal' elif name.lower() == 'merkersdorf': gem_name = 'Hardegg' elif name.lower() == 'retz': name = 'Retz Altstadt' elif name.lower() == 'heldenberg': return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)] elif name.lower() == 'retzbach': return [(18129, 31038), (18112, 31038), (18117, 31038)] elif name.lower() == 'dietmannsdorf': gem_name = 'Zellerndorf' elif name.lower() == 'sierndorf': gem_name = 'Sierndorf' elif name.lower() == 'waltersdorf': gem_name = 'Staatz' elif name.lower() == 'viendorf': name = 'Viendorf Weingebirge' elif name.lower() == 'stoitzendorf': return [(10137, 31105)] elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'): name = name.replace(' ', '') elif name.lower() == 'drosendorf': name = 'Drosendorf Stadt' elif name.lower() == 'etzmannsdorf': name = 'Etzmannsdorf bei Straning' elif name.lower() == 'roggendorf': gem_name = 'Röschitz' elif name.lower() == 'wilhelmsdorf': gem_name = 'Poysdorf' cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " "FROM AT_kg k " "JOIN AT_gem g ON g.gkz = k.gkz " "JOIN wb_gem wg ON wg.gkz = g.gkz " f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})", (name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ') .replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() cur.close() if gem_name: rows = [row for row in rows if row[3] == gem_name] if len(rows) == 1: return [(k, g) for k, _, g, _ in rows] print(name, rows) raise RuntimeError() def lookup_kg_name(kgnr: int) -> str: cur = DB_CNX.cursor() cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() return rows[0][0] def lookup_hkid(kgnr: Optional[int], qualid: str) -> str: hkid = None if qualid in ('WEI', 'RSW'): return 'OEST' elif kgnr is None: if WG in ('MATZEN', 'GWK'): hkid = 'WLWV' else: cur = DB_CNX.cursor() cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz " "WHERE kg.kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() hkid = rows[0][0] if qualid == 'LDW': if hkid == 'WIEN': hkid = 'WLXX' elif hkid[:2] in ('WL', 'BL', 'SL'): hkid = hkid[:2] + 'XX' return hkid def migrate_gradation(in_dir: str, out_dir: str) -> None: global GRADATION_MAP GRADATION_MAP = {} for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'): GRADATION_MAP[g['Oechsle']] = g['KW'] def migrate_branches(in_dir: str, out_dir: str) -> None: global BRANCH_MAP BRANCH_MAP = {} with utils.csv_open(f'{out_dir}/branch.csv') as f: f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr') for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'): BRANCH_MAP[b['ZNR']] = b['Kennbst'] address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) tel, mob = normalize_phone_nr(b['Telefon']), None if tel and tel[4] == '6': mob, tel = tel, None f.row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, tel, normalize_phone_nr(b['Telefax']), mob) def migrate_grosslagen(in_dir: str, out_dir: str) -> None: global GROSSLAGE_MAP GROSSLAGE_MAP = {} glnr = 0 with utils.csv_open(f'{out_dir}/wb_gl.csv') as f: f.header('glnr', 'name') for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): glnr += 1 GROSSLAGE_MAP[gl['GLNR']] = glnr f.row(glnr, gl['Bezeichnung']) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP GEM_MAP = {} inserted = set() with utils.csv_open(f'{out_dir}/wb_kg.csv') as f: f.header('kgnr', 'glnr') for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: if kgnr in inserted: continue inserted.add(kgnr) f.row(kgnr, GROSSLAGE_MAP[g['GLNR']]) def migrate_reeds(in_dir: str, out_dir: str) -> None: global REED_MAP REED_MAP = {} with utils.csv_open(f'{out_dir}/wb_rd.csv') as f: f.header('kgnr', 'rdnr', 'name') for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper(): name = name.title() try: gem = GEM_MAP[r['GNR']] kgnr = gem[0][0] if len(gem) != 1: print(gem, name, '->', gem[0]) except KeyError: print(f'Invalid GNR {r["GNR"]} for reed {name}') continue rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr) f.row(kgnr, rdnr, name) def migrate_attributes(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f: f.header('attrid', 'name', 'kg_per_ha') for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'): if a['SANR'] is None: continue f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None) if WG == 'MATZEN': f.row('M', 'Matzen', 10000) f.row('HU', 'Huber', 10000) def migrate_cultivations(in_dir: str, out_dir: str) -> None: global CULTIVATION_MAP CULTIVATION_MAP = {} with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f: f.header('cultid', 'name') for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): name: str = c['Bezeichnung'] cultid = name[0].upper() if name.isupper(): cultid = name elif 'biolog' in name.lower(): cultid = 'BIO' CULTIVATION_MAP[c['BANR']] = cultid f.row(cultid, name) def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')] mgnrs = [m['MGNR'] for m in members] fbs = parse_flaechenbindungen(in_dir) with utils.csv_open(f'{out_dir}/member.csv') as f_m,\ utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \ utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel,\ utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg: f_m.header( 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', 'lfbis_nr', 'ustid', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic', 'country', 'postal_dest', 'address', 'email', 'default_kgnr', 'comment') f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') for m in members: # TODO handle * in GWK mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None funktionaer = False if family_name is None and given_name is None: continue given_name = given_name or '' if WG == 'MATZEN' and given_name.startswith(' '): funktionaer = True family_name = re.sub(r'\s+', ' ', family_name).strip() given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',') if ' ' in family_name or '.' in family_name or ',' in family_name: if family_name.endswith(' KG'): parts = family_name.split(' ') family_name = parts[0].title() billing_name = f'{family_name} KG' convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): parts = given_name.split(' ') family_name = family_name.title() billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' given_name = parts[0].title() elif given_name.lower().endswith(' gesbr'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} GesbR' elif given_name.endswith(' KeG.'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} KEG' elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \ given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): family_name = family_name.title() parts = given_name.split(' ') suffix = parts[-1].lower() if suffix[-1] != '.': suffix += '.' given_name = parts[0].title() elif ',' in given_name: family_name = family_name.title() parts = given_name.split(',') given_name = parts[0].title() prefix = ' '.join([p.title() for p in parts[1:]]) elif given_name.endswith(' DI'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() prefix = 'Dipl.-Ing.' elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \ given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): family_name = family_name.title() parts = given_name.split(' ') given_name = parts[0].title() prefix = parts[-1].title() if prefix[-1] != '.': prefix += '.' convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) else: family_name = family_name.title() given_name = given_name.title() bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr elif bnr.endswith(' inaktiv'): bnr = bnr.split(' ')[0] if not check_lfbis_nr(bnr): if bnr in ('0', '1234567'): warning(mgnr, 'BetriebsNr.', bnr) else: invalid(mgnr, 'BetriebsNr.', bnr) bnr = None ustid: Optional[str] = m['UID'] if ustid is not None: ustid = ustid.replace(' ', '') if len(ustid) == 8 and ustid.isdigit(): ustid = 'ATU' + ustid elif not USTID_RE.fullmatch(ustid): invalid(mgnr, 'UID', ustid) ustid = None if ustid and not check_ustid_at(ustid): if ustid == 'ATU11111111': warning(mgnr, 'UID', ustid) else: invalid(mgnr, 'UID', ustid) ustid = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] kto_nr: Optional[str] = m['KontoNr'] if iban is None: pass if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban) iban = None if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' elif bic == 'RLNWATWMIB': bic = 'RLNWATWWMIB' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic) bic = None if bic is not None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] plz = int(m['PLZ']) if m['PLZ'] else None ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] parts = ort.split(' ') if ort else [''] if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()): if len(parts) > 1 and parts[-2].isdigit(): ort = ' '.join(parts[:-2]) new_address = parts[-2] + parts[-1] else: ort = ' '.join(parts[:-1]) new_address = parts[-1] if address is not None and address != ' ' and address != new_address: print(address, new_address) raise RuntimeError() address = parts[-1] if WG == 'GWK' and ort == 'JETZELDORF': ort = 'JETZELSDORF' if address is not None: address_old = address address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), re.sub(r'\s+', ' ', address).strip().title()) address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\ .replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\ .replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\ .replace('Haupstraße', 'Hauptstraße') address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address) if address.startswith('Nr. ') or address.startswith('Nr ') or address.isdigit(): address = ort.title() + ' ' + address.split(' ')[-1] elif address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) email: Optional[str] = m['EMail'] if email is not None: if email.isupper(): email = email.lower() if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail']) email = None else: parts = email.split('@') email = f'{parts[0]}@{parts[1].lower()}' zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] if WG == 'GWK' and plz == 1228: plz = 1020 postal_dest = lookup_plz(plz, ort, address) #if mgnr in fbs: # gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020} # if len(gems) == 1: # print(GEM_MAP[list(gems)[0]]) okz = postal_dest % 100000 if postal_dest else None kgnr = lookup_kgnr(okz) active = m['Aktives Mitglied'] or False if kgnr is None: invalid(mgnr, 'KGNr.', ort) elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: glnr = list(GROSSLAGE_MAP.values())[0] print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})') f_kg.row(kgnr, glnr) if 9999 not in GEM_MAP: GEM_MAP[9999] = [] GEM_MAP[9999].append((kgnr, 0)) if postal_dest is None: invalid(mgnr, 'PLZ', None) continue if active and kgnr is None: print(m) raise RuntimeError('No default KgNr. set') pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None f_m.row( mgnr, pred, prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid, m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, iban, bic, 'AT', postal_dest, address or '-', email, kgnr, m['Anmerkung'] ) phone_1: Optional[str] = m['Telefon'] phone_2: Optional[str] = m['Telefax'] phone_3: Optional[str] = m['Mobiltelefon'] numbers = [] if phone_1: phone_1 = normalize_phone_nr(phone_1) if len(phone_1) <= 10 or phone_1[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Telefon']) else: numbers.append(phone_1) if phone_1[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_1, None) else: f_tel.row(mgnr, len(numbers), 'landline', phone_1, None) if phone_2: phone_2 = normalize_phone_nr(phone_2) if len(phone_2) <= 8 or phone_2[0] != '+': invalid(mgnr, 'Fax.Nr.', m['Telefax']) else: numbers.append(phone_2) if phone_2[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_2, None) else: f_tel.row(mgnr, len(numbers), 'fax', phone_2, None) if phone_3: phone_3 = normalize_phone_nr(phone_3) if len(phone_3) <= 10 or phone_3[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon']) elif phone_3 not in numbers: numbers.append(phone_3) if phone_3[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_3, None) else: f_tel.row(mgnr, len(numbers), 'landline', phone_3, None) MEMBER_MAP[mgnr] = { 'default_kgnr': kgnr } if billing_name: f_mba.row(mgnr, billing_name, 'AT', postal_dest, address or '-') def migrate_area_commitments(in_dir: str, out_dir: str) -> None: def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]: if nr_str is None: return [] elif nr_str.isdigit() and len(nr_str) <= 6: return [nr_str] elif nr_str.count('/') == 1: parts = nr_str.split('/') if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3: return [parts[0], parts[1]] elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3: return [nr_str] if nr_str.count('/') > 1: parts = nr_str.split('/') if all([p.isdigit() for p in parts]): if all([len(p) <= 1 for p in parts[1:]]): return [f'{parts[0]}/{p}' for p in parts[1:]] elif all([len(p) == len(parts[0]) for p in parts]): return parts if nr_str.startswith(f'{kgnr:05}'): return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr) if nr_str.endswith(' 2000'): return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr) parts = re.split(r' *[,;+&] *', nr_str) if len(parts) == 1: parts = nr_str.split(' / ') if len(parts) == 1 and ' ' not in nr_str: parts = nr_str.split(' ') if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str: parts = nr_str.split(' ') if len(parts) > 1: return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)] m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str) if m is not None: b = m.group(1) f = int(m.group(2)) t = int(m.group(3)) if t < f: t += f - (f % pow(10, len(m.group(3)))) if t - f < 50: return [ gst for counter in range(f, t + 1) for p in [f'{b or ""}{counter}'] for gst in parse_gstnrs(p, kgnr, mgnr) ] invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}') return [] def replace_nrs(m: re.Match, sep: str) -> str: end = m.group(0).endswith(sep) parts = [int(p) for p in m.group(0).split(sep)] text = '' last = None for i, p in enumerate(parts): if last is not None: if last + 1 == p: last = p continue else: text += f'{last}{sep}' last = None if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]: last = p text += f'{p}-' else: text += f'{p}{sep}' if last is not None: text += str(last) return text.strip().strip(sep) + (sep if end else '') def format_gstnr(nrs: List[str]) -> Optional[str]: if len(nrs) == 0: return None nrs = [re.sub(r'\b0+', '', nr) for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr) for nr in nrs])] last = None text = '' for nr in nrs: if last is None: text += nr elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]: text += f'+{nr.split("/")[-1]}' else: text += f', {nr}' last = nr text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text) text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text) return text with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \ utils.csv_open(f'{out_dir}/area_commitment_attribute.csv',) as f_attr: f_fb.header('fbnr', 'mgnr', 'sortid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr', 'year_from', 'year_to', 'comment') f_attr.header('fbnr', 'attrid') for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None: continue parz: str = fb['Parzellennummer'] fbnr: int = fb['FBNR'] mgnr: int = fb['MGNR'] gem = GEM_MAP[fb['GNR']] kgnr = gem[0][0] if mgnr not in MEMBER_MAP: continue area = int(fb['Flaeche']) if WG == 'MATZEN': gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) else: # TODO GstNrs GWK gstnrs = [] comment, gstnr = None, None if parz is None or parz == '0000': invalid(mgnr, 'GstNr.', f'{kgnr or 0:05}-{parz}') gstnrs = [] gstnr = '-' if len(gstnrs) == 0: comment = f'KG {kgnr:05}: {parz}' gstnr = format_gstnr(gstnrs) or gstnr or parz if parz != gstnr.replace('+', '/'): convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr) rdnr = REED_MAP.get(fb['RNR'], (None, None))[1] if fb['RNR'] else None to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR'] or 1], area, kgnr, gstnr, rdnr, fb['Von'], to, comment) if fb['SANR']: f_attr.row(fbnr, fb['SANR']) def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: dates = {} fixed = {} last_dates = {} def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None: if lsnr not in fixed: fixed[lsnr] = [] dates[lsnr] = date elif unique: return add(lsnr + '/2', linr, date, unique) fixed[lsnr].append(linr) def get_lsnr(date: datetime.date, lsnr: str) -> str: if date.year < 2000: return date.strftime('%y%m%d00') + lsnr[8:] else: return date.strftime('%Y%m%d') + lsnr[8:] deliveries: List[Tuple[int, str, datetime.date, int, int]] = [ (d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR']) for d in deliveries if d['Lieferscheinnummer'] and not d['Storniert'] ] lsnrs = {d[1] for d in deliveries} for lnr, lsnr, date, zwstnr, mgnr in deliveries: if len(lsnr) < 8: continue lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \ else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6])) if len(lsnr) == 12: if date != lsdate: if date.year == lsdate.year: lsnr_n = get_lsnr(date, lsnr) if lsnr_n not in lsnrs: lsnr = lsnr_n else: warning_delivery(lsnr, mgnr, 'date', date) else: date = datetime.date(lsdate.year, date.month, date.day) if zwstnr not in last_dates or not date < last_dates[zwstnr]: last_dates[zwstnr] = date add(lsnr, lnr, date, unique=True) else: add(lsnr[:12], lnr, date) return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()], key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0]) def migrate_deliveries(in_dir: str, out_dir: str) -> None: modifiers = {m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']} delivery_map = {} seasons = {} branches = {} for mod in modifiers.values(): name: str = mod['Bezeichnung'] nr: int = mod['ASNR'] if WG == 'MATZEN': mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' elif WG == 'GWK': mod['id'] = str(nr) else: raise NotImplementedError() deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')) delivery_dict = {d['LINR']: d for d in deliveries} fixed = fix_deliveries(deliveries) with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \ utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \ utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr: f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment') f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr', 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen', 'temperature', 'acid', 'scale_id', 'weighing_id', 'comment') f_attr.header('year', 'did', 'dpnr', 'attrid') for lsnr, linrs, date in fixed: if date.year not in seasons: seasons[date.year] = { 'currency': 'EUR' if date.year >= 2001 else 'ATS', 'precision': 4, 'start': date, 'end': date, 'nr': 0, } s = seasons[date.year] if date > s['end']: s['end'] = date s['nr'] += 1 snr = s['nr'] znr = delivery_dict[linrs[0]]['ZNR'] if znr not in branches: branches[znr] = {} if date not in branches[znr]: branches[znr][date] = 0 branches[znr][date] += 1 lnr = branches[znr][date] comments = [] attributes = set() for dpnr, linr in enumerate(linrs, start=1): d = delivery_dict[linr] delivery_map[linr] = (date.year, snr, dpnr) if lsnr != d['Lieferscheinnummer']: renumber_delivery(d['Lieferscheinnummer'], lsnr) oe = d['OechsleOriginal'] or d['Oechsle'] kmw = GRADATION_MAP[oe] sortid = d['SNR'].upper() if d['SANR']: attributes.add(d['SANR']) if len(sortid) != 2: attributes.add(sortid[2:]) sortid = sortid[:2] if WG == 'MATZEN': if sortid == 'HU': # Gr.Veltliner (Huber) sortid = 'GV' attributes.add('HU') elif sortid == 'SV': sortid = 'SW' elif sortid == 'WC': # WEIẞBURGUNDER/CHARDONNAY sortid = 'SW' if 'H' in attributes: attributes.remove('H') attributes.add('HK') if 'W' in attributes: attributes.remove('W') if d['SNR'] != sortid: print(f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{attributes}') qualid = QUAL_MAP[d['QSNR']] kgnr, rdnr = None, None if d['GNR']: gem = GEM_MAP.get(d['GNR'], []) if len(gem) == 1: kgnr = gem[0][0] if d['RNR']: kgnr, rdnr = REED_MAP[d['RNR']] if kgnr is None: m = MEMBER_MAP[d['MGNR']] kgnr = m['default_kgnr'] if kgnr is None: warning_delivery(lsnr, d['MGNR'], 'KGNr.', None) elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: warning_delivery(lsnr, d['MGNR'], 'KGNr.', kgnr) kgnr = None hkid = lookup_hkid(kgnr, qualid) waage = d['Waagentext'] scale_id, weighing_id = None, None if waage: waage = re.split(r' +', waage) scale_id = waage[1] weighing_id = waage[3] comment: Optional[str] = d['Anmerkung'] acid = d['Säure'] hand, lesemaschine = None, None if comment: comment = comment.replace('Söure', 'Säure') if comment.startswith('Säure'): acid = float(comment.split(' ')[-1].replace(',', '.')) comment = None elif comment == 'Maschine': hand = False comment = None elif comment == 'Hand': hand = True comment = None if comment: comments.append(comment) f_part.row( date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr, d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False, hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment ) for attrid in attributes: f_attr.row(date.year, snr, dpnr, attrid) f_delivery.row(date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'], '; '.join(comments) or None) with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod: f_part_mod.header('year', 'did', 'dpnr', 'modid') for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): if m['LINR'] not in delivery_map: continue nid = delivery_map[m['LINR']] f_part_mod.row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id']) with utils.csv_open(f'{out_dir}/season.csv') as f_season, \ utils.csv_open(f'{out_dir}/modifier.csv') as f_mod: f_season.header('year', 'currency', 'precision', 'start_date', 'end_date') f_mod.header('year', 'modid', 'name', 'abs', 'rel', 'standard', 'quick_select') for y, s in seasons.items(): f_season.row(y, s['currency'], s['precision'], s['start'], s['end']) for m in modifiers.values(): abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m.get('Standard', None), m['Schnellauswahl']) def migrate_payments(in_dir: str, out_dir: str) -> None: pass # TODO migrate payments def migrate_parameters(in_dir: str, out_dir: str) -> None: params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')} name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und') shortened = name.replace(' für ', ' f. ').replace(' und ', ' u. ') suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '') suffixes = { 'reggenmbh': 'reg. Gen.m.b.H.' } new_params: Dict[str, Optional[str]] = { 'NAME_TOKEN': None, 'NAME_SHORT': None, 'NAME_SHORTENED': shortened, 'NAME': name, 'NAME_SUFFIX': suffixes[suffix], 'PLZ': params['MANDANTENPLZ'], 'ORT': params['MANDANTENORT'], 'ADDRESS': params['MANDANTENSTRASSE'], 'DOCUMENT_SENDER': params.get('ABSENDERTEXT2', None), 'IBAN': None, 'BIC': None, 'USTID': params['MANDANTENUID'].replace(' ', ''), 'LFBISNR': params['MANDANTENBETRIEBSNUMMER'], 'PHONE': params['MANDANTENTELEFON'], 'FAX': params['MANDANTENTELEFAX'], 'EMAIL': params['MANDANTENEMAIL'], 'WEBSITE': params.get('MANDANTENHOMEPAGE', None), } with utils.csv_open(f'{out_dir}/client_parameter.csv') as f: f.header('param', 'value') for param, value in new_params.items(): f.row(param, value) def main() -> None: global DB_CNX, QUIET, WG parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, help='The input directory where the exported csv files are stored') parser.add_argument('out_dir', type=str, help='The output directory where the migrated csv file should be stored') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Be less verbose') parser.add_argument('-d', '--database', metavar='DB', required=True, help='The sqlite database file to look up information') parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str, choices=('MATZEN', 'GWK')) args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) QUIET = args.quiet WG = args.genossenschaft DB_CNX = sqlite3.connect(args.database) migrate_gradation(args.in_dir, args.out_dir) migrate_branches(args.in_dir, args.out_dir) migrate_grosslagen(args.in_dir, args.out_dir) migrate_gemeinden(args.in_dir, args.out_dir) migrate_reeds(args.in_dir, args.out_dir) migrate_attributes(args.in_dir, args.out_dir) migrate_cultivations(args.in_dir, args.out_dir) migrate_members(args.in_dir, args.out_dir) migrate_area_commitments(args.in_dir, args.out_dir) migrate_deliveries(args.in_dir, args.out_dir) migrate_payments(args.in_dir, args.out_dir) migrate_parameters(args.in_dir, args.out_dir) DB_CNX.close() if __name__ == '__main__': main()