#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Dict, Any, Tuple, Optional, List, Iterable from enum import Enum import argparse import os import re import sys import sqlite3 import requests import datetime import json import string import utils class WG(Enum): MATZEN = 1 WINZERKELLER = 2 WEINLAND = 3 BADEN = 4 @classmethod def from_str(cls, name: str): return cls({wg.name: wg.value for wg in WG}[name]) DB_CNX: Optional[sqlite3.Connection] = None QUIET: bool = False CLIENT: Optional[WG] = None USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}') GRADATION_MAP: Optional[Dict[float, float]] = None CULTIVATION_MAP: Optional[Dict[int, str]] = None BRANCH_MAP: Optional[Dict[int, str]] = None GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None GROSSLAGE_MAP: Optional[Dict[int, int]] = None MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None MODIFIER_MAP: Optional[Dict[str, Dict]] = None SORT_MAP: Optional[Dict[str, str]] = None ATTRIBUTE_MAP: Optional[Dict[str, str]] = None PARAMETERS: Optional[Dict[str, str]] = None AUSTRIA = 40 WGMASTER_PRECISION = 4 QUAL_MAP: Dict[int, str] = { 0: 'WEI', 1: 'RSW', 2: 'LDW', 3: 'QUW', 4: 'KAB', 5: 'SPL', } ORT_NAMES: Dict[str, Optional[str]] = { 'Pirawarth': None, 'Raggendorf': None, 'Matzen': 'Matzner', 'Matzn': None, 'Stillfried': None, 'Harras': None, 'Gänserndorf': None, 'Sulz': None, 'Brünn': None, 'Wien': None, 'Angern': None, 'Schweinbarth': None, 'Hohenruppersdorf': None, 'Grub': None, 'Auersthal': None, 'Ollersdorf': None, 'Spannberg': None, 'Ebenthal': None, 'Bockfließ': None, 'Dörfless': 'Dörfleser', 'Dörfles': None, 'Ableiding': None, 'Absberg': None, 'Eibesbrunn': None, 'Engersdorf': None, 'Enzersfeld': None, 'Großebersdorf': None, 'Hollabrunn': None, 'Korneuburg': None, 'Königsbrunn': None, 'Laa': None, 'Leopoldau': None, 'Manhartsbrunn': None, 'Mannhartsbrunn': 'Manhartsbrunner', 'Münichsthal': None, 'Pernau': None, 'Pillichsdorf': None, 'Retz': None, 'Russbach': None, 'Schleinbach': None, 'Seefeld': None, 'Seyring': None, 'Stammersdorf': None, 'Stelzendorf': None, 'Traunfeld': None, 'Tresdorf': None, 'Trumau': None, 'Wolkersdorf': None, 'Znaim': None, 'Obersdorf': None, 'Sechshaus': None, 'Rußbach': None, 'Pfaffstätten': 'Pfaffstättner', 'Berndorf': None, 'Teesdorf': None, 'Baden': 'Badner', 'Dornau': None, 'Pottendorf': None, 'Möllersdorf': None, 'Wienersdorf': None, 'Münchendorf': None, 'Hernstein': None, 'Großau': None, 'Oberwaltersdorf': None, 'Vöslau': None, 'Tribuswinkel': 'Tribuswinkler', 'Sollenau': None, 'Gutenbrunn': None, 'Kottingbrunn': None, 'Siebenhaus': None, 'Mariazell': None, } STREET_NAMES: Dict[str, str] = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', 'Kurhaus-Str.': 'Kurhausstraße', 'Kurhaus-Straße': 'Kurhausstraße', 'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße', 'Flustraße': 'Flurstraße', 'St.Laurentstraße': 'St.-Laurentstraße', 'Josef Seitzstraße': 'Josef-Seitz-Straße', 'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße', 'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße', 'R. Virchow-Straße': 'Rudolf-Virchow-Straße', 'Grubere Hauptstraße': 'Gruber Hauptstraße', 'Groß Inzersdorf': 'Großinzersdorf', 'Erdpress': 'Erdpreß', 'Hochleitengasse': 'Hochleithengasse', 'Bei Der Gösselmühle': 'Bei der Gösslmühle', 'Dr. Peschlstraße': 'Dr.-Peschl-Straße', 'Dr.Peschlstraße': 'Dr.-Peschl-Straße', 'Dr. Salzbornstraße': 'Dr.-Salzborn-Straße', 'Elsa Brandström-Straße': 'Elsa-Brandström-Straße', 'Franz Ecker Siedlung': 'Franz-Ecker-Siedlung', 'Franz-Ecker Siedlung': 'Franz-Ecker-Siedlung', 'Franz Gillygasse': 'Franz-Gilly-Gasse', 'Franz V. Zülowstraße': 'Franz-von-Zülow-Straße', 'Gr. Nondorf': 'Großnondorf', 'In Der Trift': 'In der Trift', 'Johann Degengasse': 'Johann-Degen-Gasse', 'Josef Fürnkranz Siedlung': 'Josef-Fürnkranz-Siedlung', 'Kaiser Franz Josef Platz': 'Kaiser-Franz-Josef-Platz', 'Klein Haugsdorf': 'Kleinhaugsdorf', 'Leopold Leuthnerstraße': 'Leopold-Leuthner-Straße', 'Lh.-Mayer-Platz': 'Landeshauptmann-Mayer-Platz', 'Manhartsbr.Straße': 'Manhartsbrunner Straße', 'Maria Lourd Weg': 'Maria-Lourd-Weg', 'U. Weißgasse Straße': 'Untere Weißgerberstraße', 'Dr. Josef-Levit-Straße': 'Dr.-Josef-Levit-Straße', 'Karl Katschthaler-Straße': 'Karl-Katschthaler-Straße', 'Pfaffstättnerstraße': 'Paffstättner Straße', 'Badnerstraße': 'Badner Straße', 'Anton Krennstraße': 'Anton-Krenn-Straße', 'Fr.Jonasstraße': 'Franz-Jonas-Straße', 'Wr.Neustädterstraße': 'Wiener Neustädter Straße', 'Wr. Neustädterstraße': 'Wiener Neustädter Straße', 'Wr. Neustäderstraße': 'Wiener Neustädter Straße', 'Ob.Ödlitzerstraße': 'Obere Ödlitzer Straße', 'Obere Ödlitzerstraße': 'Obere Ödlitzer Straße', 'Triesterstraße': 'Triester Straße', 'Dr. Dolpstraße': 'Dr.-Dolp-Straße', 'Wienersd.Hauptstr.': 'Wienersdorfer Hauptstraße', 'Wienersd.Hauptstraße': 'Wienersdorfer Hauptstraße', 'Tr.Bundesstr.': 'Triester Bundesstraße', 'Tr.Bundesstraße': 'Triester Bundesstraße', 'J.Brunastraße': 'Josef-Bruna-Straße', 'J. Brunastraße': 'Josef-Bruna-Straße', 'Ferdinand Pichlergasse': 'Ferdinand-Pichler-Gasse', 'Dr. Figlstraße': 'Dr.-Figl-Straße', 'Franz Broschekplatz': 'Franz-Borschek-Platz', 'Tribuswinklerstraße': 'Tribuswinkler Straße', 'Rudolf Kaspargasse': 'Rudolf-Kaspar-Gasse', 'Traiskirchnerstraße': 'Traiskirchner Straße', 'Dr. Theodor Körnerstraße': 'Dr.-Theodor-Körner-Straße', 'Richard Klingerstraße': 'Richard-Klinger-Straße', 'Karl Langegasse': 'Karl-Lange-Gasse', 'Leopold Hörbingerstraße': 'Leopold-Hörbiger-Straße', 'Leopold Hörbinger Straße': 'Leopold-Hörbiger-Straße', 'Rudolf Zöllnergasse': 'Rudolf-Zöllner-Gasse', 'Anton Rauchstraße': 'Anton-Rauch-Straße', 'Isabellestraße': 'Erzherzogin-Isabelle-Straße', 'Erzherzogin Isabelle Straße': 'Erzherzogin-Isabelle-Straße', 'E. Penzig Franz Straße': 'Edgar-Penzing-Franz-Straße', 'Hernsteinerstr Straße': 'Hernsteiner Straße', } def new(t: str, ids: Any, name: str, comment: str = None) -> None: print(f'\x1B[1;32mNew {t:>6}: {str(ids):>10} ({name}{", " + comment if comment else ""})\x1B[0m', file=sys.stderr) def success(mgnr: int, key: str, value) -> None: if not QUIET: print(f'\x1B[1;32m{mgnr:>6} : {key:<12} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value, active: bool) -> None: act = 'A' if active else '?' if active is None else ' ' print(f'\x1B[1;33m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value, active: Optional[bool]) -> None: act = 'A' if active else '?' if active is None else ' ' print(f'\x1B[1;31m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr) def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None: if not QUIET: print(f'\x1B[1m{lsnr_1:<15} -> {lsnr_2:<15}\x1B[0m', file=sys.stderr) def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def convert(mgnr: int, key: str, old_value: str, new_value) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6} : {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6} : ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_nr_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def modulo(a: str, b: int) -> int: s = 0 for ch in a: s = (s * 10 + int(ch)) % b return s def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) return modulo(s, 97) == 1 def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]: if nr is None: return None nr = nr.replace('/', ' ').strip() if nr.count('-') > 1 or len(nr.split('-')[-1]) > 3: nr = nr.replace('-', '') if nr[0] == '0': nr = '+43 ' + nr[1:] elif nr.startswith('43'): nr = '+' + nr elif CLIENT == WG.WINZERKELLER and ort: ort = ort.upper().strip() if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', 'EIBESBRUNN'): nr = f'+43 2245 {nr}' elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'): nr = f'+43 2944 {nr}' elif ort in ('HADRES'): nr = f'+43 2943 {nr}' else: raise RuntimeError(f'Unable to find telephone number of "{ort}" ({nr})') if nr.startswith('+43'): if nr[4] == '6': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}' elif nr[4] == '1': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3]} {nr[3:]}' elif nr[4] == '2': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}' return nr.strip() def check_phone_nr(nr: str, mgnr: int, active: Optional[bool]) -> Tuple[Optional[str], Optional[str], Optional[str]]: m = re.fullmatch(r'(.*?) ([A-Za-zäöüÄÖÜßẞ]+)$', nr) if m is not None: nr = m.group(1) comment = m.group(2).strip() if comment == 'Fi': comment = 'Firma' else: comment = None nnr = normalize_phone_nr(nr) if len(nnr) <= 10 or nnr[0] != '+' or re.fullmatch(r'[+0-9 \-]+', nnr) is None: invalid(mgnr, 'Tel.Nr.', nr, active) return nnr, None, None return nnr, 'mobile' if nnr[4] == '6' else 'landline', comment def fix_street_name(name: str) -> str: if name in STREET_NAMES: return STREET_NAMES[name] orte = [(k, v) for k, v in ORT_NAMES.items() if name.startswith(k + 'er')] if (name.endswith('straße') or name.endswith('platz')) and len(orte) == 1: return f'{orte[0][1] or orte[0][0] + "er"} {name[len(orte[0][0]) + 2:].title()}'.replace(' ', ' ') return name def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/') if r.status_code != 200: return None data = r.json() return sum([n['fl'] for n in data['properties']['nutzungen']]) def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: members = {} for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if f['MGNR'] not in members: members[f['MGNR']] = {} members[f['MGNR']][f['FBNR']] = f return members def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None ort = ort.replace('0', 'O').replace('SZ', 'SS') if ort.upper() == 'PILLICHSDORF' and plz == 2212: plz = 2211 elif ort.upper() == 'ENZERSFELD' and plz == 2203: plz = 2202 elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212: ort = 'GROSSENGERSDORF' elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123: plz = 2122 elif ort.upper() == 'FRAUENDORF' and plz == 3710: plz = 3714 elif ort.upper() == 'MAISSAU' and plz == 3721: ort = 'UNTERDÜRNBACH' elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074: plz = 2070 elif ort.upper() == 'DROSENDORF' and plz == 2095: ort = 'DROSENDORF ALTSTADT' elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033: plz = 2023 elif ort.upper() == 'NIEDERSCHLEINZ' and plz == 3721: plz = 3714 elif ort.upper() == 'OEYNHAUSEN' and plz == 2500: plz = 2512 elif ort.upper() == 'MÖLLERSDORF' and plz == 2513: plz = 2514 elif ort.upper() == 'SOOSS' and plz == 2500: ort = 'SOOẞ' plz = 2504 elif ort.upper() == 'ÖDLITZ' and plz == 2560: ort = 'BERNDORF' elif ort.upper() == 'ST.VEIT' and plz == 2562: ort = 'BERNDORF' plz = 2560 elif ort.upper() == 'SCHÖNAU/TRIESTING' and plz == 2525: ort = 'SCHÖNAU AN DER TRIESTING' elif ort.upper() == 'BAD FISCHAU - BRUNN' and plz == 2721: ort = 'BAD FISCHAU-BRUNN' elif ort.upper() == 'NEUSIEDL/ZAYA': ort = 'NEUSIEDL AN DER ZAYA' elif ort.upper() == 'SIERNDORF/MARCH': ort = 'SIERNDORF AN DER MARCH' cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) > 1: rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] if ort == 'VELM-GÖTZENDORF': parts = address.split(' ') street = ' '.join(parts[:-1]) nr = int(parts[-1].split('-')[0]) if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): # Velm return plz * 100000 + 3572 else: # Götzendorf return plz * 100000 + 3571 elif ort == 'BAD FISCHAU-BRUNN': if 'viaduktstraße' in address.lower(): return plz * 100000 + 6560 elif 'teichplatz' in address.lower(): return plz * 100000 + 6560 raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})') def lookup_kgnr(okz: Optional[int]) -> Optional[int]: if okz is None: return None cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,)) rows: List[Tuple[int]] = cur.fetchall() cur.close() if len(rows) == 1: return rows[0][0] return None def lookup_gem_name(name: str) -> List[Tuple[int, int]]: gem_name, hkid = None, None if CLIENT == WG.MATZEN: hkid = "'WLWV'" if name.lower() == 'dörfles': gem_name = 'Weikendorf' elif name.lower() == 'velm-götzendorf': return [(6027, 30859), (6007, 30859)] elif name.lower() == 'grub': name = 'Grub an der March' elif CLIENT == WG.WINZERKELLER: hkid = "'WLWV', 'WIEN', 'WLWG'" if name.endswith('*'): name = name[:-1].strip() if name.lower() == 'joching': return [(12185, 31351)] elif name.lower() == 'kreuttal': return [(15206, 31627), (15221, 31627), (15226, 31627)] elif name.lower() == 'hochleithen': return [(15219, 31622), (15223, 31622), (15202, 31622)] elif name.lower() == 'wolfpassing': gem_name = 'Hochleithen' elif name.lower() == 'seebarn': gem_name = 'Harmannsdorf' elif name.lower() == 'königsbrunn': gem_name = 'Enzersfeld im Weinviertel' elif name.lower() == 'wien': return [(1616, 90001), (1617, 90001)] elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'): gem_name = 'Sitzendorf an der Schmida' elif name.lower() == 'dietersdorf': gem_name = 'Hollabrunn' elif name.lower() == 'altenmarkt': name = 'Altenmarkt im Thale' elif name.lower() == 'eitzerstal': name = 'Eitzersthal' elif name.lower() == 'gross': gem_name = 'Hollabrunn' elif name.lower() == 'auggenthal': name = 'Augenthal' elif name.lower() == 'karlsdorf': name = 'Pfaffendorf' elif name.lower() == 'kleinhaugsdorf': name = 'Augenthal' elif name.lower() == 'merkersdorf': gem_name = 'Hardegg' elif name.lower() == 'retz': name = 'Retz Altstadt' elif name.lower() == 'heldenberg': return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)] elif name.lower() == 'retzbach': return [(18129, 31038), (18112, 31038), (18117, 31038)] elif name.lower() == 'dietmannsdorf': gem_name = 'Zellerndorf' elif name.lower() == 'sierndorf': gem_name = 'Sierndorf' elif name.lower() == 'waltersdorf': gem_name = 'Staatz' elif name.lower() == 'viendorf': name = 'Viendorf Weingebirge' elif name.lower() == 'stoitzendorf': return [(10137, 31105)] elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'): name = name.replace(' ', '') elif name.lower() == 'drosendorf': name = 'Drosendorf Stadt' elif name.lower() == 'etzmannsdorf': name = 'Etzmannsdorf bei Straning' elif name.lower() == 'roggendorf': gem_name = 'Röschitz' elif name.lower() == 'wilhelmsdorf': gem_name = 'Poysdorf' elif name.lower() == 'nappersdorf-kammersdorf': return [(9008, 31028), (9026, 31028), (9032, 31028), (9037, 31028), (9051, 31028), (9067, 31028)] elif CLIENT == WG.WEINLAND: hkid = "'WLWV'" if name.lower() == 'neusiedl/zaya': name = 'Neusiedl an der Zaya' elif name.lower() == 'bad pirawarth': name = 'pirawarth' elif name.lower() == 'sierndorf': gem_name = 'Jedenspeigen' elif name.lower() == 'velm-götzendorf': return [(6027, 30859), (6007, 30859)] elif CLIENT == WG.BADEN: hkid = "'WLTH'" if name.lower() == 'baden': gem_name = 'Baden' elif name.lower() in ('bad fischau-brunn', 'bad fischau - brunn'): return [(23402, 32301), (23401, 32301)] elif name.lower() == 'bad vöslau': return [(4005, 30603), (4009, 30603), (4035, 30603)] elif name.lower() == 'berndorf': return [(4303, 30605), (4304, 30605), (4032, 30605), (4305, 30605)] elif name.lower() in ('berndorf-ödlitz', 'ödlitz'): return [(4304, 30605)] elif name.lower() == 'eggendorf': return [(23437, 32305), (23426, 32305)] elif name.lower() == 'purkersdorf': return [] elif name.lower() == 'schönau': gem_name = 'Schönau an der Triesting' elif name.lower() == 'siegersdorf': gem_name = 'Pottendorf' elif name.lower() == 'sooss': name = 'Sooß' elif name.lower() == 'st.veit': return [(4303, 30605)] elif name.lower() == 'wien': return [] elif name.lower() == 'gramatneusiedl': return [] else: raise NotImplementedError(f'Gemeinde lookup for {CLIENT} not yet implemented') cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " "FROM AT_kg k " "JOIN AT_gem g ON g.gkz = k.gkz " "JOIN wb_gem wg ON wg.gkz = g.gkz " f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})", (name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ') .replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() cur.close() if gem_name: rows = [row for row in rows if row[3] == gem_name] if len(rows) == 1: return [(k, g) for k, _, g, _ in rows] raise RuntimeError(f'Unable to find Gemeinde "{name}" ({rows})') def lookup_kg_name(kgnr: int) -> str: cur = DB_CNX.cursor() cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() return rows[0][0] if len(rows) > 0 else None def lookup_rnr_name(rnr: int) -> str: return REED_MAP[rnr][2] def lookup_hkid(kgnr: Optional[int], qualid: str) -> str: hkid = None if qualid in ('WEI', 'RSW'): return 'OEST' elif kgnr is None: if CLIENT in (WG.MATZEN, WG.WINZERKELLER, WG.BADEN, WG.WEINLAND): hkid = 'WLNO' else: raise NotImplementedError(f'Default hkid for {CLIENT} not implemented yet') else: cur = DB_CNX.cursor() cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz " "WHERE kg.kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() hkid = rows[0][0] if qualid == 'LDW': if hkid == 'WIEN': hkid = 'WLXX' elif hkid[:2] in ('WL', 'BL', 'SL'): hkid = hkid[:2] + 'XX' else: if hkid.startswith('WL') and not hkid.endswith('XX'): hkid = 'WLNO' return hkid def guess_glnr(kgnr: int) -> Optional[int]: cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_kg " "WHERE gkz / 100 != 900 AND gkz / 100 = (SELECT gkz / 100 FROM AT_kg WHERE kgnr = ?)", (kgnr,)) rows0 = cur.fetchall() cur.execute("SELECT kgnr FROM AT_kg " "WHERE gkz / 100 != 900 AND gkz = (SELECT gkz FROM AT_kg WHERE kgnr = ?)", (kgnr,)) rows1 = cur.fetchall() cur.close() glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows0 if k in GROSSLAGE_KG_MAP])) if len(glnrs) == 0: return None elif len(glnrs) == 1: return glnrs[0] glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows1 if k in GROSSLAGE_KG_MAP])) return glnrs[0] if len(glnrs) > 0 else None def migrate_gradation(in_dir: str, out_dir: str) -> None: global GRADATION_MAP GRADATION_MAP = {} for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'): GRADATION_MAP[g['Oechsle']] = g['KW'] def migrate_branches(in_dir: str, out_dir: str) -> None: global BRANCH_MAP BRANCH_MAP = {} with utils.csv_open(f'{out_dir}/branch.csv') as f: f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr') for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'): kennbst = b['Kennbst'] or ('G' if CLIENT == WG.WEINLAND else None) BRANCH_MAP[b['ZNR']] = kennbst address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) if CLIENT == WG.MATZEN: address = 'Schloßstraße 6' postal_dest = 224303541 tel, mob = normalize_phone_nr(b['Telefon']), None if tel and tel[4] == '6': mob, tel = tel, None f.row(kennbst, b['Name'].strip().title(), AUSTRIA, postal_dest, address, tel, normalize_phone_nr(b['Telefax']), mob) def migrate_grosslagen(in_dir: str, out_dir: str) -> None: global GROSSLAGE_MAP GROSSLAGE_MAP = {} glnr = 0 with utils.csv_open(f'{out_dir}/wb_gl.csv') as f: f.header('glnr', 'name') for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): glnr += 1 if CLIENT == WG.WINZERKELLER and gl['GLNR'] == 8: GROSSLAGE_MAP[8] = 6 continue GROSSLAGE_MAP[gl['GLNR']] = glnr f.row(glnr, gl['Bezeichnung']) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP, GROSSLAGE_KG_MAP GEM_MAP, GROSSLAGE_KG_MAP = {}, {} inserted = set() with utils.csv_open(f'{out_dir}/wb_kg.csv') as f: f.header('kgnr', 'glnr') for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: if kgnr in inserted: continue inserted.add(kgnr) glnr = GROSSLAGE_MAP[g['GLNR']] GROSSLAGE_KG_MAP[kgnr] = glnr f.row(kgnr, glnr) def migrate_reeds(in_dir: str, out_dir: str) -> None: global REED_MAP REED_MAP = {} with utils.csv_open(f'{out_dir}/wb_rd.csv') as f: f.header('kgnr', 'rdnr', 'name') for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper() or name.islower(): name = name.title() try: gem = GEM_MAP[r['GNR']] kgnr = gem[0][0] if len(gem) != 1: print(gem, name, '->', gem[0]) except KeyError: print(f'Invalid GNR {r["GNR"]} for reed {name}') continue rdnr = max([n for k, n, _ in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr, name) f.row(kgnr, rdnr, name) def migrate_attributes(in_dir: str, out_dir: str) -> None: global ATTRIBUTE_MAP ATTRIBUTE_MAP = {} with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f: f.header('attrid', 'name', 'active', 'max_kg_per_ha', 'strict', 'fill_lower') for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'): if a['SANR'] is None: continue max_kg = int(a['KgProHa']) if a['KgProHa'] is not None else None if max_kg == 10_000: max_kg = None attrid = a['SANR'] if attrid == 'BIO': attrid = 'B' ATTRIBUTE_MAP[a['SANR']] = attrid if attrid == 'B': continue f.row(attrid, a['Attribut'], True, max_kg, False, 0) if CLIENT == WG.MATZEN: f.row('M', 'Matzen', False, None, False, 0) f.row('HU', 'Huber', False, None, False, 0) ATTRIBUTE_MAP['M'] = 'M' ATTRIBUTE_MAP['HU'] = 'HU' elif CLIENT == WG.WINZERKELLER: f.row('F', 'Fixpreis', False, None, False, 0) ATTRIBUTE_MAP['F'] = 'F' elif CLIENT == WG.BADEN: f.row('D', 'DAC', False, 7500, False, 0) f.row('K', 'Kabinett', False, None, False, 0) ATTRIBUTE_MAP['D'] = 'D' ATTRIBUTE_MAP['K'] = 'K' def migrate_cultivations(in_dir: str, out_dir: str) -> None: global CULTIVATION_MAP CULTIVATION_MAP = {} with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f: f.header('cultid', 'name', 'description') for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): name: str = c['Bezeichnung'] cultid = name[0].upper() if name.isupper(): cultid = name elif name == 'Normal': cultid = None elif 'biolog' in name.lower(): cultid = 'B' name = 'Bio' CULTIVATION_MAP[c['BANR']] = cultid if cultid is None: continue f.row(cultid, name, None) def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/area_commitment_type.csv') as f: f.header('vtrgid', 'sortid', 'attrid', 'disc', 'min_kg_per_ha', 'penalty_per_kg', 'penalty_amount', 'penalty_none') if not os.path.exists(f'{in_dir}/TLiefermengen.csv'): return for t in utils.csv_parse_dict(f'{in_dir}/TLiefermengen.csv'): sortid: str = t['SNR'] if not sortid or sortid == 'SV': continue menge = int(t['ErwarteteLiefermengeProHa']) attrid = ATTRIBUTE_MAP[t['SANR']] if t['SANR'] else None f.row(sortid + (attrid or ''), sortid[:2], attrid or sortid[2:] or None, None, menge, None, None, None) if CLIENT == WG.MATZEN: f.row('BM', 'BM', None, None, None, None, None, None, None) def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]: letters = string.ascii_letters + 'äöüßÄÖÜẞ-' double_names = ['eva maria', 'maria theresia'] def is_alpha(s: str) -> bool: return all(c in letters for c in s) if s.lower() not in double_names else True if CLIENT == WG.WINZERKELLER: if 'BEZIRKSBAUERNKAMMER' == family_name: return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach' elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'): return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach' elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN': return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen' elif CLIENT == WG.BADEN: if family_name in ('Marktgemeinde', 'Weinbauverein'): return None, None, None, None, None, f'{family_name} {given_name}' if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \ len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name): return None, given_name.title(), None, family_name.title(), None, None prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None if given_name.startswith('z.H. '): billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR') parts = given_name.split(' ') given_name = parts[1] family_name = parts[2] given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ') given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)', lambda m: m.group(0) if m.group(1) == 'FH' else f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name) given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE) titles = '' def repl_title(m: re.Match) -> str: nonlocal titles, suffix t = m.group(1).lower().replace(' ', '').replace('.', '') match t: case 'jun': suffix = 'jun.' case 'sen': suffix = 'sen.' case 'dr': titles += 'Dr. ' case 'mag': titles += 'Mag. ' case 'ing': titles += 'Ing. ' case 'di(fh)': titles += 'DI (FH) ' case 'dipling': titles += 'DI ' case 'dipli': titles += 'DI ' case 'di': titles += 'DI ' case 'dkfm': titles += 'Dipl.-Kfm. ' case 'ökrat': titles += 'ÖkR ' case 'lkr': titles += 'ÖkR ' return ' ' title_re = re.compile(r',?((di ?\(fh\))|\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?i(ng)?|di|ök\.rat|lkr)\b)\.?', re.IGNORECASE) given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name)) family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name)) if titles: prefix = titles.strip() if given_name.lower() in ('weingut', 'weinbau'): parts = family_name.split(' ') return prefix, None, middle_names, ' '.join(parts[:-1]), suffix, given_name + ' ' + ' '.join(parts) elif given_name.lower().startswith('weinbau ') or given_name.startswith('weingut '): parts = given_name.split(' ') return prefix, None, middle_names, family_name, suffix, ' '.join(parts[:-1]) + ' ' + family_name + ' ' + parts[-1] elif family_name.lower() in ('weingut', 'weinbau'): parts = given_name.split(' ') return prefix, None, middle_names, ' '.join(parts[:-1]), suffix, family_name + ' ' + ' '.join(parts) family_parts = family_name.split(' ') last = family_parts[-1].lower() if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr', 'og', 'gmbh'): family_name = ' '.join(family_parts[:-1]) if ' ' not in family_name and len(family_name) > 4: family_name = family_name.title() if family_name.startswith('Gem.'): family_name = 'GeM ' + family_name[5:] billing_name = family_name + ' ' + {'kg': 'KG', 'keg.': 'KEG', 'og': 'OG', 'gmbh': 'GmbH'}.get(last, 'GesbR') if given_name.count(' ') == 1: parts = given_name.split(' ') return None, parts[0], None, parts[1], None, billing_name elif is_alpha(given_name): return prefix, given_name.title(), middle_names, family_name, suffix, billing_name given_parts = given_name.split(' ') last = given_parts[-1].lower() if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr', 'og', 'gmbh'): given_name = ' '.join(given_parts[:-1]).title() family_name = family_name.title() billing_name = family_name + ' ' + {'kg': 'KG', 'keg.': 'KEG', 'og': 'OG', 'gmbh': 'GmbH'}.get(last, 'GesbR') return prefix, given_name, middle_names, family_name, suffix, billing_name if ' ' in family_name or '.' in family_name: if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'): billing_name = family_name.title() family_name = ' '.join(family_name.split(' ')[1:]).title() elif family_name.lower().endswith('veltlinerhof') or family_name.lower().endswith('biohof'): billing_name = ' '.join(family_name.split(' ')[::-1]).title() family_name = ' '.join(family_name.split(' ')[:-1]).title() elif 'u.' in family_name: billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und ')) family_name = family_name.split(' ')[0].title() else: billing_name = family_name family_name = family_name.split(' ')[-1].title() if ' + ' in given_name: parts = given_name.split(' + ') family_name = family_name.title() billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] + f' {billing_name or family_name}') given_name = parts[0].title() else: family_name = family_name.title() given_name = given_name.title() return prefix, given_name, middle_names, family_name, suffix, billing_name def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')] mgnrs = [m['MGNR'] for m in members] fbs = parse_flaechenbindungen(in_dir) with (utils.csv_open(f'{out_dir}/member.csv') as f_m, \ utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \ utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel, \ utils.csv_open(f'{out_dir}/member_email_address.csv') as f_email, \ utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg): f_m.header( 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', 'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'organic', 'funktionär', 'active', 'deceased', 'iban', 'bic', 'country', 'postal_dest', 'address', 'default_kgnr', 'comment') f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') f_email.header('mgnr', 'nr', 'address', 'comment') for m in members: mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] funktionaer, deceased = False, False if family_name is None and given_name is None: continue elif m['Anmerkung'] == 'Musterbetrieb': continue elif CLIENT == WG.BADEN and family_name == 'Winzergenoss.': continue given_name = given_name or '' if CLIENT == WG.MATZEN and given_name.startswith(' '): funktionaer = True if CLIENT == WG.WINZERKELLER and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): deceased = True family_name = family_name.replace('*', '').replace('(+)', '') given_name = given_name.replace('*', '').replace('(+)', '') family_name = utils.remove_spaces(family_name) given_name = utils.remove_spaces(given_name).replace(', ', ',') ret = normalize_name(family_name, given_name) prefix, given_name, middle_names, family_name, suffix, billing_name = ret n1 = utils.remove_spaces(' '.join(r or '' for r in ret)) n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or '')) if billing_name or n1.lower() != n2.lower(): convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) if not given_name or not family_name: given_name = given_name or '' family_name = family_name or '' invalid(mgnr, 'Name', n1, active) bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr elif bnr.endswith(' inaktiv'): bnr = bnr.split(' ')[0] if not check_lfbis_nr(bnr): if bnr in ('0', '1234567'): warning(mgnr, 'BetriebsNr.', bnr, active) else: invalid(mgnr, 'BetriebsNr.', bnr, active) bnr = None ustid_nr: Optional[str] = m['UID'] if ustid_nr is not None: ustid_nr = ustid_nr.replace(' ', '') if len(ustid_nr) == 8 and ustid_nr.isdigit(): ustid_nr = 'ATU' + ustid_nr elif not USTID_NR_RE.fullmatch(ustid_nr): invalid(mgnr, 'UID', ustid_nr, active) ustid_nr = None if ustid_nr and not check_ustid_nr_at(ustid_nr): if ustid_nr == 'ATU11111111': warning(mgnr, 'UID', ustid_nr, active) else: invalid(mgnr, 'UID', ustid_nr, active) ustid_nr = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] kto_nr: Optional[str] = m['KontoNr'] if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban, active) iban = None if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' elif bic == 'RLNWATWMIB': bic = 'RLNWATWWMIB' elif bic == 'VBÖEATWW': bic = 'VBOEATWW' elif bic == 'RLNWATBAD': bic = 'RLNWATWWBAD' elif bic == 'SPBDATXXX': bic = 'SPBDAT21' elif bic == 'IBAATWWXXX': bic = 'GIBAATWW' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic, active) bic = None if bic is not None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] plz = int(m['PLZ']) if m['PLZ'] else None ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] parts = ort.split(' ') if ort else [''] if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()): if len(parts) > 1 and parts[-2].isdigit(): ort = ' '.join(parts[:-2]) new_address = parts[-2] + parts[-1] else: ort = ' '.join(parts[:-1]) new_address = parts[-1] if address is not None and address != ' ' and address != new_address: raise RuntimeError(f'Unable to rewrite address: "{address}" -> "{new_address}"') address = parts[-1] if CLIENT == WG.WINZERKELLER and ort == 'JETZELDORF': ort = 'JETZELSDORF' if ort: ort = ort.upper().strip() if address is not None: address_old = address address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), utils.remove_spaces(address).title()) if address.startswith('Haus Nr.') or \ address.startswith('Nr. ') or \ address.startswith('Nr ') or \ (len(address) > 0 and address[0].isdigit()): address = ort.title() + ' ' + address.split(' ')[-1] address = address.replace('Pelz.', 'Pelzg.').replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße').replace('st.', 'straße ')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g.', 'gasse ').replace('pl.', 'platz ')\ .replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\ .replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\ .replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß') address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address) if address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( +[0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address) address = utils.remove_spaces(address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) email: Optional[str] = m['EMail'] emails = [] if email is not None: for email in (email.split(' ') if CLIENT == WG.BADEN else email.split(' + ')): if email.isupper(): email = email.lower() if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail'], active) else: parts = email.split('@') emails.append(f'{parts[0]}@{parts[1].lower()}') zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] if CLIENT == WG.WINZERKELLER and plz == 1228: plz = 1020 postal_dest = lookup_plz(plz, ort, address) #if mgnr in fbs: # gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020} # if len(gems) == 1: # print(GEM_MAP[list(gems)[0]]) okz = postal_dest % 100000 if postal_dest else None kgnr = lookup_kgnr(okz) active = m['Aktives Mitglied'] or False if kgnr is None: invalid(mgnr, 'KGNr.', ort, active) elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: glnr = guess_glnr(kgnr) if glnr: new('KG', kgnr, lookup_kg_name(kgnr), f'GL {glnr}') f_kg.row(kgnr, glnr) if 9999 not in GEM_MAP: GEM_MAP[9999] = [] GEM_MAP[9999].append((kgnr, 0)) else: kgnr = None if postal_dest is None: invalid(mgnr, 'PLZ', None, active) continue pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None f_m.row( mgnr, pred, prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid_nr, m['Volllieferant'] or False, m['Buchführend'] or False, False, funktionaer, active, deceased, iban, bic, AUSTRIA, postal_dest, address or '-', kgnr, m['Anmerkung'] ) phone_1: Optional[str] = m['Telefon'] phone_2: Optional[str] = m['Telefax'] phone_3: Optional[str] = m['Mobiltelefon'] phone_4: Optional[str] = m['EMail'] if m['EMail'] and '@' not in m['EMail'] else None numbers = [] if CLIENT == WG.WINZERKELLER: # Telefax (phone_2) not used numbers = {} def add_number(nr: str, fax: bool = False, comment: str = None, fax_only: bool = False) -> None: mob = nr[4] == '6' numbers[nr] = {'mobile': mob, 'landline': not mob and not fax_only, 'fax': fax, 'comment': None} if phone_1: phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\ .replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ') phone_1 = utils.remove_spaces(phone_1) fax = False if phone_1.endswith(' u. fax'): fax = True phone_1 = ' '.join(phone_1.split(' ')[:-2]) if phone_1.replace(' ', '').replace('/', '').replace('-', '').isdigit() and len(phone_1) <= 20: if phone_1[0] != '0' and '/' in phone_1: for nr in phone_1.split('/'): add_number(normalize_phone_nr(nr, ort), fax) else: add_number(normalize_phone_nr(phone_1, ort), fax) elif re.fullmatch(r'0[0-9/ -]+ od\. 0[0-9/ -]+', phone_1): parts = phone_1.split(' od. ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[1], ort), fax) elif re.fullmatch(r'0[0-9/ -]+ od\. [1-9][0-9/ -]+', phone_1): parts = phone_1.split(' od. ') add_number(normalize_phone_nr(parts[0], ort), False) if parts[0][1] == '6': add_number(normalize_phone_nr(parts[1], ort), fax) else: add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), fax) elif re.fullmatch(r'0[0-9/ -]+ fax 0[0-9/ -]+', phone_1): parts = phone_1.split(' fax ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[1], ort), True, fax_only=True) elif re.fullmatch(r'0[0-9/ -]+ fax [1-9][0-9/ -]+', phone_1): parts = phone_1.split(' fax ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), True, fax_only=True) elif '-' in phone_1 and phone_1.endswith('fax'): nr = re.sub(r'-+ ', '-', phone_1) nr = ' '.join(nr.split(' ')[:-1]) add_number(normalize_phone_nr(nr.split('-')[0], ort), False) add_number(normalize_phone_nr(nr, ort), True, fax_only=True) elif 'fax -' in phone_1: parts = phone_1.split('fax') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[0].strip() + parts[1].strip(), ort), True, fax_only=True) elif phone_1.endswith('fax'): nr = phone_1[:-3].strip() add_number(normalize_phone_nr(nr), False) add_number(normalize_phone_nr(nr), True, fax_only=True) elif re.fullmatch(r'0[0-9/ -]+ u\. fax (od\. |u\. )?[0-9/ -]+', phone_1): parts = phone_1.split(' ') add_number(normalize_phone_nr(parts[0], ort), True) nr = parts[-1] if nr[0] == '0': add_number(normalize_phone_nr(nr, ort)) else: add_number(normalize_phone_nr(parts[0][:5] + nr, ort)) else: parts = phone_1.split(' ') if parts[-1].isalpha(): add_number(normalize_phone_nr(parts[0], ort), comment=parts[-1]) else: for nr in parts: add_number(normalize_phone_nr(nr, ort), fax) if phone_3: for nr in phone_3.split(','): nr = nr.strip() parts = nr.split(' ') comment = None if parts[-1].startswith('(') and parts[-1].endswith(')'): nr = nr[:nr.rindex(' ')].strip() comment = parts[-1][1:-1].strip() elif parts[-1].isalpha(): nr = nr[:nr.rindex(' ')].strip() comment = parts[-1].strip() add_number(normalize_phone_nr(nr, ort), comment=comment) count = 0 for nr, data in numbers.items(): if data['mobile']: count += 1 f_tel.row(mgnr, count, 'mobile', nr, data['comment']) if data['landline']: count += 1 f_tel.row(mgnr, count, 'landline', nr, data['comment']) if data['fax']: count += 1 f_tel.row(mgnr, count, 'fax', nr, data['comment']) else: if phone_1: phone_1, t, c = check_phone_nr(phone_1, mgnr, active) if t is not None: numbers.append(phone_1) f_tel.row(mgnr, len(numbers), t, phone_1, c) if phone_2: phone_2, t, c = check_phone_nr(phone_2, mgnr, active) if t is not None: numbers.append(phone_2) f_tel.row(mgnr, len(numbers), 'fax' if t == 'landline' else t, phone_2, c) if phone_3: if phone_3.startswith('Handy'): phone_3 = phone_3[5:].strip(':').strip() phone_3, t, c = check_phone_nr(phone_3, mgnr, active) if t is not None and phone_3 not in numbers: numbers.append(phone_3) f_tel.row(mgnr, len(numbers), t, phone_3, c) if phone_4: phone_4, t, c = check_phone_nr(phone_4, mgnr, active) if t is not None and phone_4 not in numbers: numbers.append(phone_4) f_tel.row(mgnr, len(numbers), t, phone_4, c) for i, email in enumerate(emails): f_email.row(mgnr, i + 1, email, None) MEMBER_MAP[mgnr] = { 'default_kgnr': kgnr } if billing_name: f_mba.row(mgnr, billing_name, AUSTRIA, postal_dest, address or '-') def migrate_area_commitments(in_dir: str, out_dir: str) -> None: def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]: if nr_str is None: return [] elif nr_str.isdigit() and len(nr_str) <= 6: return [nr_str] elif nr_str.count('/') == 1: parts = nr_str.split('/') if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3: return [parts[0], parts[1]] elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3: return [nr_str] if nr_str.count('/') > 1: parts = nr_str.split('/') if all([p.isdigit() for p in parts]): if all([len(p) <= 1 for p in parts[1:]]): return [f'{parts[0]}/{p}' for p in parts[1:]] elif all([len(p) == len(parts[0]) for p in parts]): return parts if nr_str.startswith(f'{kgnr:05}'): return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr) if nr_str.endswith(' 2000'): return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr) parts = re.split(r' *[,;+&] *', nr_str) if len(parts) == 1: parts = nr_str.split(' / ') if len(parts) == 1 and ' ' not in nr_str: parts = nr_str.split(' ') if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str: parts = nr_str.split(' ') if len(parts) > 1: return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)] m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str) if m is not None: b = m.group(1) f = int(m.group(2)) t = int(m.group(3)) if t < f: t += f - (f % pow(10, len(m.group(3)))) if t - f < 50: return [ gst for counter in range(f, t + 1) for p in [f'{b or ""}{counter}'] for gst in parse_gstnrs(p, kgnr, mgnr) ] invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}', None) return [] def replace_nrs(m: re.Match, sep: str) -> str: end = m.group(0).endswith(sep) parts = [int(p) for p in m.group(0).split(sep)] text = '' last = None for i, p in enumerate(parts): if last is not None: if last + 1 == p: last = p continue else: text += f'{last}{sep}' last = None if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]: last = p text += f'{p}-' else: text += f'{p}{sep}' if last is not None: text += str(last) return text.strip().strip(sep) + (sep if end else '') def format_gstnr(nrs: List[str]) -> Optional[str]: if len(nrs) == 0: return None nrs = [re.sub(r'\b0+', '', nr) for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr) for nr in nrs])] last = None text = '' for nr in nrs: if last is None: text += nr elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]: text += f'+{nr.split("/")[-1]}' else: text += f', {nr}' last = nr text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text) text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text) return text reeds: Dict[int, Dict[int, str]] = {k: {r: n for rk, r, n in REED_MAP.values() if rk == k} for k in set([k for k, _, _ in REED_MAP.values()])} new_reeds: Dict[Tuple[int, int], int] = {} with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \ utils.csv_open(f'{out_dir}/wb_rd.csv', 'a+') as f_rd: f_fb.header('fbnr', 'mgnr', 'vtrgid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr', 'year_from', 'year_to', 'comment') for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None: continue parz: str = fb['Parzellennummer'] fbnr: int = fb['FBNR'] mgnr: int = fb['MGNR'] gem = GEM_MAP[fb['GNR']] kgnrs = [kgnr for kgnr, gkz in gem] rnr = fb['RNR'] rd_kgnr, rdnr, _ = REED_MAP.get(rnr, (None, None, None)) if rnr else (None, None, None) if mgnr not in MEMBER_MAP: continue kgnr = None if rd_kgnr is None: kgnr = kgnrs[0] elif rd_kgnr in kgnrs: kgnr = rd_kgnr elif (kgnrs[0], rnr) in new_reeds: kgnr = kgnrs[0] rdnr = new_reeds[(kgnr, rnr)] else: rname = lookup_rnr_name(rnr) for k in kgnrs: if k not in reeds: continue try: pos = list(reeds[k].values()).index(rname) r = list(reeds[k].keys())[pos] kgnr = k rdnr = r new_reeds[(kgnr, rnr)] = rdnr break except ValueError: continue if kgnr is None: kgnr = kgnrs[0] rdnr = max([r for _, r, _ in REED_MAP.values() if k == kgnr] + [r for (k, _), r in new_reeds.items() if k == kgnr]) + 1 f_rd.row(kgnr, rdnr, rname) new_reeds[(kgnr, rnr)] = rdnr new('Reed', (kgnr, rdnr), rname) area = int(fb['Flaeche']) if CLIENT == WG.MATZEN: gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) else: gstnrs = [] comment, gstnr = None, None if parz is None or parz == '0000': if parz is not None: invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None) gstnrs = [] gstnr = '-' if CLIENT == WG.MATZEN and len(gstnrs) == 0: comment = f'KG {kgnr or 0:05}: {parz}' gstnr = format_gstnr(gstnrs) or gstnr or parz if parz != gstnr.replace('+', '/'): convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr) to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None attrid = ATTRIBUTE_MAP[fb['SANR']] if fb['SANR'] else None if attrid == 'B': attrid = None f_fb.row(fbnr, mgnr, fb['SNR'] + (attrid or ''), CULTIVATION_MAP[fb['BANR'] or 1], area, kgnr, gstnr, rdnr, fb['Von'], to, comment) def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: dates = {} fixed = {} last_dates = {} def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None: if lsnr not in fixed: fixed[lsnr] = [] dates[lsnr] = date elif unique: return add(lsnr + '/2', linr, date, unique) fixed[lsnr].append(linr) def get_lsnr(date: datetime.date, lsnr: str) -> str: if date.year < 2000: return date.strftime('%y%m%d00') + lsnr[8:] else: return date.strftime('%Y%m%d') + lsnr[8:] deliveries: List[Tuple[int, str, datetime.date, int, int]] = [ (d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR']) for d in deliveries if (d['Lieferscheinnummer'] or (CLIENT == WG.WEINLAND and d['Oechsle'] and d['MGNR'])) and not d['Storniert'] ] lsnrs = {d[1] for d in deliveries} local_lnr = 99 for lnr, lsnr, date, zwstid, mgnr in deliveries: if CLIENT == WG.WEINLAND and not lsnr or len(lsnr) <= 8: local_lnr += 1 add(f'{date:%Y%m%d}G{local_lnr:03}', lnr, date) continue elif len(lsnr) < 8: continue if lsnr.startswith('22'): lsnr = '20' + lsnr[2:] lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \ else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6])) lsnr_zwstid = lsnr[8] if lsnr_zwstid != zwstid and lsnr_zwstid in BRANCH_MAP.values(): zwstid = lsnr_zwstid if len(lsnr) == 12: if date != lsdate: if date.year == lsdate.year: lsnr_n = get_lsnr(date, lsnr) if lsnr_n not in lsnrs: lsnr = lsnr_n else: warning_delivery(lsnr, mgnr, 'date', date) else: date = datetime.date(lsdate.year, date.month, date.day) if zwstid not in last_dates or not date < last_dates[zwstid]: last_dates[zwstid] = date add(lsnr, lnr, date, unique=True) else: add(lsnr[:12], lnr, date) return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()], key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0]) def migrate_deliveries(in_dir: str, out_dir: str) -> None: global DELIVERY_MAP, MODIFIER_MAP, SORT_MAP DELIVERY_MAP, MODIFIER_MAP, SORT_MAP = {}, {}, {'HU/': 'GV/HU', 'SV/': 'SW/', 'MEF/B': 'ME/F', 'CSF/B': 'CS/F'} modifiers = { m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung'] and m['Bezeichnung'] != '-' } seasons = {} branches = {} for mod in modifiers.values(): name: str = mod['Bezeichnung'].replace('ausser', 'außer') nr: int = mod['ASNR'] MODIFIER_MAP[name] = mod if CLIENT == WG.MATZEN: mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' elif CLIENT == WG.WINZERKELLER: mod['id'] = { 1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG', 5: 'VT', 6: 'MV', 7: 'UP', 8: 'VL', 9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG', 13: 'KU', }[nr] elif CLIENT == WG.BADEN: mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'UE' elif CLIENT == WG.WEINLAND: mod['id'] = { 1: 'PZS', 2: 'TB', 3: 'LM' }[nr] else: raise NotImplementedError(f'Modifier migration for {CLIENT} not yet implemented') deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')) delivery_dict = {d['LINR']: d for d in deliveries} fixed = fix_deliveries(deliveries) updated_varieties = {} with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \ utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part: f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment') f_part.header('year', 'did', 'dpnr', 'sortid', 'attrid', 'cultid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr', 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen', 'gebunden', 'temperature', 'acid', 'scale_id', 'weighing_id', 'weighing_reason', 'comment') for lsnr, linrs, date in fixed: if date.year not in seasons: seasons[date.year] = { 'currency': 'EUR' if date.year >= 2001 else 'ATS', 'precision': WGMASTER_PRECISION, 'start': date, 'end': date, 'nr': 0, } s = seasons[date.year] if date > s['end']: s['end'] = date s['nr'] += 1 snr = s['nr'] mgnr = delivery_dict[linrs[0]]['MGNR'] znr = delivery_dict[linrs[0]]['ZNR'] or (1 if CLIENT == WG.WEINLAND else None) glob_waage = set(delivery_dict[linr]['Waagentext'] for linr in linrs if delivery_dict[linr]['Waagentext']) zwstid = lsnr[8] if zwstid not in branches: branches[zwstid] = {} if date not in branches[zwstid]: branches[zwstid][date] = 0 branches[zwstid][date] += 1 lnr = branches[zwstid][date] if BRANCH_MAP[znr] != zwstid: if zwstid not in BRANCH_MAP.values(): zwstid = BRANCH_MAP[znr] comments = [] attributes = set() for dpnr, linr in enumerate(linrs, start=1): d = delivery_dict[linr] DELIVERY_MAP[linr] = (date.year, snr, dpnr) if lsnr != d['Lieferscheinnummer']: renumber_delivery(d['Lieferscheinnummer'] or '', lsnr) oe = d['OechsleOriginal'] or d['Oechsle'] kmw = GRADATION_MAP[oe] sortid = d['SNR'].upper() attrid = ATTRIBUTE_MAP[d['SANR']] if d['SANR'] else None cultid = None if attrid == 'B': cultid = 'B' elif attrid: attributes.add(attrid) if len(sortid) != 2: attributes.add(sortid[2:]) sortid = sortid[:2] if CLIENT == WG.MATZEN: if sortid == 'HU': # Gr.Veltliner (Huber) sortid = 'GV' attributes.remove('B') attributes.add('HU') elif sortid == 'SV': sortid = 'SW' elif sortid == 'WC': # WEIẞBURGUNDER/CHARDONNAY sortid = 'SW' if 'H' in attributes: attributes.remove('H') attributes.add('HK') if 'W' in attributes: attributes.remove('W') elif CLIENT == WG.BADEN: if sortid == 'GO': sortid = 'SO' if d['SNR'] != sortid: SORT_MAP[f'{d["SNR"]}/{attrid or ""}'] = f'{sortid}/{",".join(list(attributes)) or ""}' line = f'{d["SNR"]}/{attrid} -> {sortid}/{",".join(list(attributes)) or None}' if line not in updated_varieties: updated_varieties[line] = 0 updated_varieties[line] += 1 if d['QSNR'] is None: warning_delivery(lsnr, mgnr, 'qualid', 'UNSET') if d['Oechsle'] >= 86: qualid = 'KAB' else: qualid = QUAL_MAP[d['QSNR']] if qualid != 'WEI' and d['Abgewertet']: if qualid == 'RSW': qualid = 'WEI' else: warning_delivery(lsnr, mgnr, 'qualid', f'{qualid} (abgewertet)') qualid = 'WEI' kgnr, rdnr = None, None if d['GNR']: gem = GEM_MAP.get(d['GNR'], []) if len(gem) == 1: kgnr = gem[0][0] if d['RNR']: kgnr, rdnr, _ = REED_MAP[d['RNR']] if kgnr is None: m = MEMBER_MAP[mgnr] kgnr = m['default_kgnr'] if kgnr is None: pass elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: warning_delivery(lsnr, mgnr, 'KGNr.', kgnr) kgnr = None hkid = lookup_hkid(kgnr, qualid) handwiegung = d['Handwiegung'] or False waage = list(glob_waage)[0] if len(glob_waage) == 1 else d['Waagentext'] scale_id, weighing_id = None, None if waage: # Waagenr: 1 ID: 19 # Waagennummer: 1 Speichernummer: 9166 # 1 waage = re.split(r' +', waage) scale_id = waage[1] if len(waage) > 2 else '1' weighing_id = waage[-1] if len(waage) > 2 and waage[2] == 'Speichernummer:' else f'{date}/{waage[-1]}' elif len(glob_waage) == 0 and not handwiegung: handwiegung = True comment: Optional[str] = d['Anmerkung'] acid = d['Säure'] hand, lesewagen = None, None if comment: comment = comment.replace('Söure', 'Säure') if comment.startswith('Säure'): acid = float(comment.split(' ')[-1].replace(',', '.')) comment = None elif comment in ('Maschine', 'Masschine'): hand = False comment = None elif comment == 'Hand': hand = True comment = None elif comment == '.': comment = None elif comment in ('LW', 'lw'): lesewagen = True comment = None elif 'LW' in comment: lesewagen = True comment = comment.replace('LW', '').strip() if comment == '': comment = None if comment: comments.append(comment) gerebelt = True if CLIENT == WG.MATZEN or (CLIENT == WG.WINZERKELLER and zwstid == 'W') else d['Gerebelt'] or False gebunden = None if CLIENT in (WG.MATZEN, WG.WINZERKELLER) else d['Gebunden'] if len(attributes) > 1: print("ERROR: ", attributes) attrid = attributes.pop() if len(attributes) == 1 else None f_part.row( date.year, snr, dpnr, sortid, attrid, cultid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr, gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False, hand, lesewagen, gebunden, d['Temperatur'], acid, scale_id, weighing_id, None, comment ) f_delivery.row(date.year, snr, date, d['Uhrzeit'], zwstid, lnr, lsnr, mgnr, '; '.join(comments) or None) for k, v in updated_varieties.items(): print(k + (f' ({v} times)' if v > 1 else '')) with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod: f_part_mod.header('year', 'did', 'dpnr', 'modid') for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): if m['LINR'] not in DELIVERY_MAP or m['ASNR'] not in modifiers: continue y, did, dpnr = DELIVERY_MAP[m['LINR']] f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id']) with utils.csv_open(f'{out_dir}/season.csv') as f_season, \ utils.csv_open(f'{out_dir}/modifier.csv') as f_mod: f_season.header('year', 'currency', 'precision', 'max_kg_per_ha', 'vat_normal', 'vat_flatrate', 'min_kg_per_bs', 'max_kg_per_bs', 'penalty_per_kg', 'penalty_amount', 'penalty_none', 'penalty_per_bs_amount', 'penalty_per_bs_none', 'start_date', 'end_date') f_mod.header('year', 'modid', 'ordering', 'name', 'abs', 'rel', 'active') for y, s in seasons.items(): f_season.row(y, s['currency'], s['precision'], 10_000, 0.10, 0.13, PARAMETERS['LIEFERPFLICHT/GA1'], PARAMETERS['LIEFERRECHT/GA1'], None, None, None, None, None, s['start'], s['end']) for m in modifiers.values(): abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None rel_v = m['AZASProzent'] / 100.0 if m['AZASProzent'] is not None else None f_mod.row(y, m['id'], m['ASNR'], m['Bezeichnung'], abs_v, rel_v, True) def migrate_payments(in_dir: str, out_dir: str) -> None: variant_map: Dict[int, Tuple[int, int]] = {} variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {} year_map = {} az_map = {} p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv')) sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])} p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv')) qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])} def collapse_data(data: dict[str, Any]): rev = {} for k, v in data.items(): if k == 'default' or k.startswith('/'): continue rev[v] = rev.get(v, []) rev[v].append(k) if 'default' not in data.keys(): if len(rev) == 1: return set(rev.keys()).pop() for v, ks in rev.items(): if len(ks) >= len(data) / 2: for k in ks: del data[k] data['default'] = v return collapse_data(data) for idx1 in {'/' + k.split('/')[1] for k in data.keys() if '/' in k and len(k) > 3}: len1 = len(list(k for k, _ in data.items() if k.endswith(idx1))) for v, ks in rev.items(): my_ks = list(k for k in ks if k.endswith(idx1)) if len(my_ks) > 1 and len(my_ks) >= len1 / 2: for k in my_ks: del data[k] data[idx1] = v return data def collapse_curve(curve: list[float]): n = {} d = 0 for oe, p0, p1, p2 in zip(range(0, len(curve) + 1), [0] + curve, curve, curve[1:] + [curve[len(curve) - 1]]): d1, d2 = round(p1 - p0, WGMASTER_PRECISION), round(p2 - p1, WGMASTER_PRECISION) if d1 == d: continue d = d2 if p0 > 0: n[f'{oe - 1}oe'] = p0 n[f'{oe}oe'] = p1 if curve[len(curve) - 1] > 0: n[f'{len(curve) - 1}oe'] = curve[len(curve) - 1] keys = list(n.keys()) vals = list(n.values()) while len(n) >= 2 and vals[0] == vals[1]: del n[keys[0]] del n[keys[1]] n = {keys[1]: vals[1], **n} keys = list(n.keys()) vals = list(n.values()) while len(n) >= 2 and vals[len(vals) - 1] == vals[len(vals) - 2]: del n[keys[len(vals) - 1]] del n[keys[len(vals) - 2]] n = {**n, keys[len(vals) - 2]: vals[len(vals) - 2]} keys = list(n.keys()) vals = list(n.values()) if len(n) == 0: n = {'15kmw': 0} elif len(n) == 1: n = {'15kmw': list(n.values())[0]} return n with (utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment): f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time', 'comment', 'data') for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'): year = p['Lesejahr'] if year is None: continue if year not in year_map: year_map[year] = 0 year_map[year] += 1 variant_map[p['AZNR']] = (year, year_map[year]) var = p.copy() del var['AZNR'] del var['Datum'] del var['Beschreibung'] del var['Lesejahr'] del var['Titel'] del var['TeilzahlungNr'] data = { 'mode': 'wgmaster', **var, 'AuszahlungSorten': {}, 'AuszahlungSortenQualitätsstufe': {}, } gb = data['Grundbetrag'] or 0 gbzs = data['GBZS'] azs = data['AuszahlungSorten'] for s in sort_map.get(p['AZNR'], []): del s['AZNR'] del s['ID'] if s['Oechsle'] is None: continue attrid = ATTRIBUTE_MAP[s['SANR']] if s['SANR'] else None key = SORT_MAP.get(f'{s["SNR"]}/{attrid or ""}', f'{s["SNR"].upper()}/{attrid or ""}') if key is None or len(key) < 3: continue azs[key] = azs.get(key, {'Gebunden': {}, 'NichtGebunden': {}}) azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = round(s['Betrag'] + gb, WGMASTER_PRECISION) curves = [] curve_zero = False for key, d1 in azs.items(): oe = [d1['NichtGebunden'].get(n, 0.0) for n in range(max(d1['NichtGebunden'].keys()) + 1)] if len(d1['Gebunden']) > 0: oe_geb = [d1['Gebunden'].get(n, 0.0) for n in range(max(d1['Gebunden'].keys()) + 1)] else: oe_geb = None if len(set(oe)) <= 2 and oe[0] == 0 and \ (oe_geb is None or (len(set(oe_geb)) <= 2 and oe[len(oe) - 1] == oe_geb[len(oe_geb) - 1])): azs[key] = oe[len(oe) - 1] if azs[key] == 0: azs[key] = 'curve:0' curve_zero = True else: c = (oe, oe_geb) if c not in curves: curves.append(c) azs[key] = f'curve:{curves.index(c) + 1}' data['AuszahlungSorten'] = collapse_data(azs) for i, cs in enumerate(curves): c, c_geb = cs geb = None if c_geb is not None: diff = {round(b - a, WGMASTER_PRECISION) for a, b in zip(c, c_geb)} diff.remove(0.0) if len(diff) == 1: geb = diff.pop() elif len(diff) > 1: geb = collapse_curve(c_geb) curves[i] = { 'id': i + 1, 'mode': 'oe', 'data': collapse_curve(c), } if geb is not None: curves[i]['geb'] = geb if curve_zero: curves.insert(0, { 'id': 0, 'mode': 'oe', 'data': gb, 'geb': gbzs or 0 }) data['Kurven'] = curves azq = data['AuszahlungSortenQualitätsstufe'] for q in qual_map.get(p['AZNR'], []): del q['AZNR'] del q['ID'] qualid = QUAL_MAP[q['QSNR']] attrid = ATTRIBUTE_MAP[s['SANR']] if s['SANR'] else None key = SORT_MAP.get(f'{q["SNR"]}/{attrid or ""}', f'{q["SNR"].upper()}/{attrid or ""}') if key is None or len(key) < 3: continue azq[qualid] = azq.get(qualid, {}) azq[qualid][key] = round((q['Betrag'] or 0) + gb, WGMASTER_PRECISION) for qualid, d1 in azq.items(): azq[qualid] = collapse_data(d1) for data_k, data_v in data.copy().items(): if data_v is None or isinstance(data_v, bool) and not data_v: del data[data_k] az_map[p['AZNR']] = data test = (p['TeilzahlungNr'] == 7) if not test: if year not in variant_year_map: variant_year_map[year] = [] variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr'])) dmp = json.dumps(data).replace('/B', '-B').replace('/"', '"').replace('/-', '-') f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None, p['Beschreibung'], dmp) with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay, \ utils.csv_open(f'{out_dir}/delivery_part_bucket.csv') as f_bucket: f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'net_amount') f_bucket.header('year', 'did', 'dpnr', 'bktnr', 'discr', 'value') deliveries = {d['LINR']: d for d in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')} for linr, (y, did, dpnr) in DELIVERY_MAP.items(): p = deliveries[linr] if y not in variant_year_map: continue gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden']) b1 = gew - geb_gew b2 = geb_gew f_bucket.row(y, did, dpnr, 0, '_', b1) attrid = ATTRIBUTE_MAP[p['SANR']] if p['SANR'] else None f_bucket.row(y, did, dpnr, 1, attrid or '', b2) for aznr, avnr, tznr in variant_year_map[y]: val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung'] val = round(val * pow(10, WGMASTER_PRECISION)) f_del_pay.row(y, did, dpnr, avnr, val) def migrate_parameters(in_dir: str, out_dir: str) -> None: global PARAMETERS PARAMETERS = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')} name = PARAMETERS['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und').replace(' Im ', ' im ') suffix = PARAMETERS['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '').split(',')[-1] types = { 'reggenmbh': 'reg. Gen.m.b.H.' } tokens = { WG.MATZEN: ('WGM', 'WG Matzen'), WG.WINZERKELLER: ('WKW', 'Winzerkeller'), WG.WEINLAND: ('WGW', 'WG Weinland'), WG.BADEN: ('WGB', 'WG Baden') }.get(CLIENT, (None, None)) ort = PARAMETERS['MANDANTENORT'].title() new_params: Dict[str, Optional[str]] = { 'CLIENT_NAME_TOKEN': tokens[0], 'CLIENT_NAME_SHORT': tokens[1], 'CLIENT_NAME': name, 'CLIENT_NAME_SUFFIX': None, 'CLIENT_NAME_TYPE': types[suffix], 'CLIENT_PLZ': PARAMETERS['MANDANTENPLZ'], 'CLIENT_ORT': ort, 'CLIENT_ADDRESS': PARAMETERS['MANDANTENSTRASSE'], 'CLIENT_IBAN': None, 'CLIENT_BIC': None, 'CLIENT_USTIDNR': PARAMETERS['MANDANTENUID'].replace(' ', ''), 'CLIENT_LFBISNR': PARAMETERS['MANDANTENBETRIEBSNUMMER'], 'CLIENT_PHONE': normalize_phone_nr(PARAMETERS['MANDANTENTELEFON'], ort), 'CLIENT_FAX': normalize_phone_nr(PARAMETERS['MANDANTENTELEFAX'], ort), 'CLIENT_EMAIL': PARAMETERS['MANDANTENEMAIL'], 'CLIENT_WEBSITE': PARAMETERS.get('MANDANTENHOMEPAGE', None), 'DOCUMENT_SENDER': PARAMETERS.get('ABSENDERTEXT2', None), 'TEXT_DELIVERYNOTE': PARAMETERS.get('LIEFERSCHEINTEXT', None).replace(' daß ', ' dass ').replace('obige Angaben maßgeblicher Veränderungen', 'maßgeblichen Veränderungen obiger Angaben'), 'TEXT_DELIVERYCONFIRMATION': PARAMETERS.get('ANLIEFTEXT', None), 'TEXT_CREDITNOTE': PARAMETERS.get('AUSZAHLUNGTEXT', None), } with utils.csv_open(f'{out_dir}/client_parameter.csv') as f: f.header('param', 'value') for param, value in new_params.items(): f.row(param, value) def main() -> None: global DB_CNX, QUIET, CLIENT parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, help='The input directory where the exported csv files are stored') parser.add_argument('out_dir', type=str, help='The output directory where the migrated csv file should be stored') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Be less verbose') parser.add_argument('-d', '--database', metavar='DB', required=True, help='The sqlite database file to look up information') parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str, choices=[wg.name for wg in WG]) args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) QUIET = args.quiet CLIENT = WG.from_str(args.genossenschaft) DB_CNX = sqlite3.connect(args.database) migrate_parameters(args.in_dir, args.out_dir) migrate_gradation(args.in_dir, args.out_dir) migrate_branches(args.in_dir, args.out_dir) migrate_grosslagen(args.in_dir, args.out_dir) migrate_gemeinden(args.in_dir, args.out_dir) migrate_reeds(args.in_dir, args.out_dir) migrate_attributes(args.in_dir, args.out_dir) migrate_cultivations(args.in_dir, args.out_dir) migrate_area_commitment_types(args.in_dir, args.out_dir) migrate_members(args.in_dir, args.out_dir) migrate_area_commitments(args.in_dir, args.out_dir) migrate_deliveries(args.in_dir, args.out_dir) migrate_payments(args.in_dir, args.out_dir) DB_CNX.close() if __name__ == '__main__': main()