#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Dict, Any, Tuple, Optional, List, Iterable import argparse import os import re import sys import sqlite3 import requests import datetime import json import string import utils DB_CNX: Optional[sqlite3.Connection] = None QUIET: bool = False WG: Optional[str] = None USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}') GRADATION_MAP: Optional[Dict[float, float]] = None CULTIVATION_MAP: Optional[Dict[int, str]] = None BRANCH_MAP: Optional[Dict[int, str]] = None GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None GROSSLAGE_MAP: Optional[Dict[int, int]] = None MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None MODIFIER_MAP: Optional[Dict[str, Dict]] = None AUSTRIA = 40 WGMASTER_PRECISION = 4 QUAL_MAP: Dict[int, str] = { 0: 'WEI', 1: 'RSW', 2: 'LDW', 3: 'QUW', 4: 'KAB', 5: 'SPL', } ORT_NAMES: Dict[str, Optional[str]] = { 'Pirawarth': None, 'Raggendorf': None, 'Matzen': 'Matzner', 'Matzn': None, 'Stillfried': None, 'Harras': None, 'Gänserndorf': None, 'Sulz': None, 'Brünn': None, 'Wien': None, 'Angern': None, 'Schweinbarth': None, 'Hohenruppersdorf': None, 'Grub': None, 'Auersthal': None, 'Ollersdorf': None, 'Spannberg': None, 'Ebenthal': None, 'Bockfließ': None, 'Dörfless': 'Dörfleser', 'Dörfles': None, 'Ableiding': None, 'Absberg': None, 'Eibesbrunn': None, 'Engersdorf': None, 'Enzersfeld': None, 'Großebersdorf': None, 'Hollabrunn': None, 'Korneuburg': None, 'Königsbrunn': None, 'Laa': None, 'Leopoldau': None, 'Manhartsbrunn': None, 'Mannhartsbrunn': 'Manhartsbrunner', 'Münichsthal': None, 'Pernau': None, 'Pillichsdorf': None, 'Retz': None, 'Russbach': None, 'Schleinbach': None, 'Seefeld': None, 'Seyring': None, 'Stammersdorf': None, 'Stelzendorf': None, 'Traunfeld': None, 'Tresdorf': None, 'Trumau': None, 'Wolkersdorf': None, 'Znaim': None, 'Obersdorf': None, 'Sechshaus': None, } STREET_NAMES: Dict[str, str] = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', 'Kurhaus-Str.': 'Kurhausstraße', 'Kurhaus-Straße': 'Kurhausstraße', 'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße', 'Flustraße': 'Flurstraße', 'St.Laurentstraße': 'St.-Laurentstraße', 'Josef Seitzstraße': 'Josef-Seitz-Straße', 'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße', 'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße', 'R. Virchow-Straße': 'Rudolf-Virchow-Straße', 'Grubere Hauptstraße': 'Gruber Hauptstraße', 'Groß Inzersdorf': 'Großinzersdorf', 'Erdpress': 'Erdpreß', 'Hochleitengasse': 'Hochleithengasse', 'Bei Der Gösselmühle': 'Bei der Gösslmühle', 'Dr. Peschlstraße': 'Dr.-Peschl-Straße', 'Dr.Peschlstraße': 'Dr.-Peschl-Straße', 'Dr. Salzbornstraße': 'Dr.-Salzborn-Straße', 'Elsa Brandström-Straße': 'Elsa-Brandström-Straße', 'Franz Ecker Siedlung': 'Franz-Ecker-Siedlung', 'Franz-Ecker Siedlung': 'Franz-Ecker-Siedlung', 'Franz Gillygasse': 'Franz-Gilly-Gasse', 'Franz V. Zülowstraße': 'Franz-von-Zülow-Straße', 'Gr. Nondorf': 'Großnondorf', 'In Der Trift': 'In der Trift', 'Johann Degengasse': 'Johann-Degen-Gasse', 'Josef Fürnkranz Siedlung': 'Josef-Fürnkranz-Siedlung', 'Kaiser Franz Josef Platz': 'Kaiser-Franz-Josef-Platz', 'Klein Haugsdorf': 'Kleinhaugsdorf', 'Leopold Leuthnerstraße': 'Leopold-Leuthner-Straße', 'Lh.-Mayer-Platz': 'Landeshauptmann-Mayer-Platz', 'Manhartsbr.Straße': 'Manhartsbrunner Straße', 'Maria Lourd Weg': 'Maria-Lourd-Weg', 'U. Weißgasse Straße': 'Untere Weißgerberstraße', } def new(t: str, ids: Any, name: str, comment: str = None) -> None: print(f'\x1B[1;32mNew {t:>6}: {str(ids):>10} ({name}{", " + comment if comment else ""})\x1B[0m', file=sys.stderr) def success(mgnr: int, key: str, value) -> None: if not QUIET: print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None: if not QUIET: print(f'\x1B[1m{lsnr_1:<15} -> {lsnr_2:<15}\x1B[0m', file=sys.stderr) def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def convert(mgnr: int, key: str, old_value: str, new_value) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6}: ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_nr_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def modulo(a: str, b: int) -> int: s = 0 for ch in a: s = (s * 10 + int(ch)) % b return s def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) return modulo(s, 97) == 1 def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]: if nr is None: return None nr = nr.replace('/', ' ').strip() if nr.count('-') > 1 or len(nr.split('-')[-1]) > 3: nr = nr.replace('-', '') if nr[0] == '0': nr = '+43 ' + nr[1:] elif WG == 'GWK' and ort: ort = ort.upper().strip() if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', 'EIBESBRUNN'): nr = f'+43 2245 {nr}' elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'): nr = f'+43 2944 {nr}' elif ort in ('HADRES'): nr = f'+43 2943 {nr}' else: print(nr, ort) raise RuntimeError() if nr.startswith('+43'): if nr[4] == '6': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}' elif nr[4] == '1': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3]} {nr[3:]}' elif nr[4] == '2': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}' return nr.strip() def fix_street_name(name: str) -> str: if name in STREET_NAMES: return STREET_NAMES[name] orte = [(k, v) for k, v in ORT_NAMES.items() if name.startswith(k + 'er')] if (name.endswith('straße') or name.endswith('platz')) and len(orte) == 1: return f'{orte[0][1] or orte[0][0] + "er"} {name[len(orte[0][0]) + 2:].title()}'.replace(' ', ' ') return name def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/') if r.status_code != 200: return None data = r.json() return sum([n['fl'] for n in data['properties']['nutzungen']]) def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: members = {} for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if f['MGNR'] not in members: members[f['MGNR']] = {} members[f['MGNR']][f['FBNR']] = f return members def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None ort = ort.replace('0', 'O').replace('SZ', 'SS') if ort.upper() == 'PILLICHSDORF' and plz == 2212: plz = 2211 elif ort.upper() == 'ENZERSFELD' and plz == 2203: plz = 2202 elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212: ort = 'GROSSENGERSDORF' elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123: plz = 2122 elif ort.upper() == 'FRAUENDORF' and plz == 3710: plz = 3714 elif ort.upper() == 'MAISSAU' and plz == 3721: ort = 'UNTERDÜRNBACH' elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074: plz = 2070 elif ort.upper() == 'DROSENDORF' and plz == 2095: ort = 'DROSENDORF ALTSTADT' elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033: plz = 2023 cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) > 1: rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] if ort == 'VELM-GÖTZENDORF': parts = address.split(' ') street = parts[:-1] nr = int(parts[-1].split('-')[0]) if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): # Velm return plz * 100000 + 3572 else: # Götzendorf return plz * 100000 + 3571 raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})') def lookup_kgnr(okz: Optional[int]) -> Optional[int]: if okz is None: return None cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,)) rows: List[Tuple[int]] = cur.fetchall() cur.close() if len(rows) == 1: return rows[0][0] return None def lookup_gem_name(name: str) -> List[Tuple[int, int]]: gem_name, hkid = None, None if WG == 'MATZEN': hkid = "'WLWV'" if name.lower() == 'dörfles': gem_name = 'Weikendorf' elif name.lower() == 'velm-götzendorf': return [(6027, 30859), (6007, 30859)] elif name.lower() == 'grub': name = 'Grub an der March' elif WG == 'GWK': hkid = "'WLWV', 'WIEN', 'WLWG'" if name.endswith('*'): # TODO GWK do something with * in gemeinde name = name[:-1].strip() if name.lower() == 'joching': return [(12185, 31351)] elif name.lower() == 'kreuttal': return [(15206, 31627), (15221, 31627), (15226, 31627)] elif name.lower() == 'hochleithen': return [(15219, 31622), (15223, 31622), (15202, 31622)] elif name.lower() == 'wolfpassing': gem_name = 'Hochleithen' elif name.lower() == 'seebarn': gem_name = 'Harmannsdorf' elif name.lower() == 'königsbrunn': gem_name = 'Enzersfeld im Weinviertel' elif name.lower() == 'wien': return [(1616, 90001), (1617, 90001)] elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'): gem_name = 'Sitzendorf an der Schmida' elif name.lower() == 'dietersdorf': gem_name = 'Hollabrunn' elif name.lower() == 'altenmarkt': name = 'Altenmarkt im Thale' elif name.lower() == 'eitzerstal': name = 'Eitzersthal' elif name.lower() == 'gross': gem_name = 'Hollabrunn' elif name.lower() == 'auggenthal': name = 'Augenthal' elif name.lower() == 'karlsdorf': name = 'Pfaffendorf' elif name.lower() == 'kleinhaugsdorf': name = 'Augenthal' elif name.lower() == 'merkersdorf': gem_name = 'Hardegg' elif name.lower() == 'retz': name = 'Retz Altstadt' elif name.lower() == 'heldenberg': return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)] elif name.lower() == 'retzbach': return [(18129, 31038), (18112, 31038), (18117, 31038)] elif name.lower() == 'dietmannsdorf': gem_name = 'Zellerndorf' elif name.lower() == 'sierndorf': gem_name = 'Sierndorf' elif name.lower() == 'waltersdorf': gem_name = 'Staatz' elif name.lower() == 'viendorf': name = 'Viendorf Weingebirge' elif name.lower() == 'stoitzendorf': return [(10137, 31105)] elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'): name = name.replace(' ', '') elif name.lower() == 'drosendorf': name = 'Drosendorf Stadt' elif name.lower() == 'etzmannsdorf': name = 'Etzmannsdorf bei Straning' elif name.lower() == 'roggendorf': gem_name = 'Röschitz' elif name.lower() == 'wilhelmsdorf': gem_name = 'Poysdorf' cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " "FROM AT_kg k " "JOIN AT_gem g ON g.gkz = k.gkz " "JOIN wb_gem wg ON wg.gkz = g.gkz " f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})", (name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ') .replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() cur.close() if gem_name: rows = [row for row in rows if row[3] == gem_name] if len(rows) == 1: return [(k, g) for k, _, g, _ in rows] print(name, rows) raise RuntimeError() def lookup_kg_name(kgnr: int) -> str: cur = DB_CNX.cursor() cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() return rows[0][0] if len(rows) > 0 else None def lookup_rnr_name(rnr: int) -> str: return REED_MAP[rnr][2] def lookup_hkid(kgnr: Optional[int], qualid: str) -> str: hkid = None if qualid in ('WEI', 'RSW'): return 'OEST' elif kgnr is None: if WG in ('MATZEN', 'GWK'): hkid = 'WLWV' else: cur = DB_CNX.cursor() cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz " "WHERE kg.kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() hkid = rows[0][0] if qualid == 'LDW': if hkid == 'WIEN': hkid = 'WLXX' elif hkid[:2] in ('WL', 'BL', 'SL'): hkid = hkid[:2] + 'XX' return hkid def guess_glnr(kgnr: int) -> Optional[int]: cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_kg " "WHERE gkz / 100 != 900 AND gkz / 100 = (SELECT gkz / 100 FROM AT_kg WHERE kgnr = ?)", (kgnr,)) rows0 = cur.fetchall() cur.execute("SELECT kgnr FROM AT_kg " "WHERE gkz / 100 != 900 AND gkz = (SELECT gkz FROM AT_kg WHERE kgnr = ?)", (kgnr,)) rows1 = cur.fetchall() cur.close() glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows0 if k in GROSSLAGE_KG_MAP])) if len(glnrs) == 0: return None elif len(glnrs) == 1: return glnrs[0] glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows1 if k in GROSSLAGE_KG_MAP])) return glnrs[0] if len(glnrs) > 0 else None def migrate_gradation(in_dir: str, out_dir: str) -> None: global GRADATION_MAP GRADATION_MAP = {} for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'): GRADATION_MAP[g['Oechsle']] = g['KW'] def migrate_branches(in_dir: str, out_dir: str) -> None: global BRANCH_MAP BRANCH_MAP = {} with utils.csv_open(f'{out_dir}/branch.csv') as f: f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr') for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'): BRANCH_MAP[b['ZNR']] = b['Kennbst'] address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) tel, mob = normalize_phone_nr(b['Telefon']), None if tel and tel[4] == '6': mob, tel = tel, None f.row(b['Kennbst'], b['Name'].strip().title(), AUSTRIA, postal_dest, address, tel, normalize_phone_nr(b['Telefax']), mob) def migrate_grosslagen(in_dir: str, out_dir: str) -> None: global GROSSLAGE_MAP GROSSLAGE_MAP = {} glnr = 0 with utils.csv_open(f'{out_dir}/wb_gl.csv') as f: f.header('glnr', 'name') for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): glnr += 1 if WG == 'GWK' and gl['GLNR'] == 8: GROSSLAGE_MAP[8] = 6 continue GROSSLAGE_MAP[gl['GLNR']] = glnr f.row(glnr, gl['Bezeichnung']) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP, GROSSLAGE_KG_MAP GEM_MAP, GROSSLAGE_KG_MAP = {}, {} inserted = set() with utils.csv_open(f'{out_dir}/wb_kg.csv') as f: f.header('kgnr', 'glnr') for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: if kgnr in inserted: continue inserted.add(kgnr) glnr = GROSSLAGE_MAP[g['GLNR']] GROSSLAGE_KG_MAP[kgnr] = glnr f.row(kgnr, glnr) def migrate_reeds(in_dir: str, out_dir: str) -> None: global REED_MAP REED_MAP = {} with utils.csv_open(f'{out_dir}/wb_rd.csv') as f: f.header('kgnr', 'rdnr', 'name') for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper() or name.islower(): name = name.title() try: gem = GEM_MAP[r['GNR']] kgnr = gem[0][0] if len(gem) != 1: print(gem, name, '->', gem[0]) except KeyError: print(f'Invalid GNR {r["GNR"]} for reed {name}') continue rdnr = max([n for k, n, _ in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr, name) f.row(kgnr, rdnr, name) def migrate_attributes(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f: f.header('attrid', 'name', 'max_kg_per_ha', 'active') for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'): if a['SANR'] is None: continue f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None, True) if WG == 'MATZEN': f.row('M', 'Matzen', None, False) f.row('HU', 'Huber', None, False) elif WG == 'GWK': # TODO GWK attribute F? f.row('F', '?', None, False) def migrate_cultivations(in_dir: str, out_dir: str) -> None: global CULTIVATION_MAP CULTIVATION_MAP = {} with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f: f.header('cultid', 'name') for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): name: str = c['Bezeichnung'] cultid = name[0].upper() if name.isupper(): cultid = name elif 'biolog' in name.lower(): cultid = 'BIO' CULTIVATION_MAP[c['BANR']] = cultid f.row(cultid, name) def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/area_commitment_type.csv') as f: f.header('vtrgid', 'sortid', 'attrid_1', 'attrid_2', 'disc', 'min_kg_per_ha', 'max_kg_per_ha', 'penalty_amount') for t in utils.csv_parse_dict(f'{in_dir}/TLiefermengen.csv'): sortid: str = t['SNR'] if not sortid or sortid == 'SV': continue menge = int(t['ErwarteteLiefermengeProHa']) f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, None, menge, menge, None) bio = [] if WG == 'MATZEN': bio = ['GV', 'ZW', 'MT'] f.row('BM', 'BM', None, None, None, None, None, None) elif WG == 'GWK': bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU'] for sortid in bio: f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None) def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]: letters = string.ascii_letters + 'äöüßÄÖÜẞ-' double_names = ['eva maria', 'maria theresia'] def is_alpha(s: str) -> bool: return all(c in letters for c in s) if s.lower() not in double_names else True if WG == 'GWK': if 'BEZIRKSBAUERNKAMMER' == family_name: return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach' elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'): return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach' elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN': return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen' if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \ len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name): return None, given_name.title(), None, family_name.title(), None, None prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None if given_name.startswith('z.H. '): billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR') parts = given_name.split(' ') given_name = parts[1] family_name = parts[2] given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ') given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)', lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name) given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE) titles = '' def repl_title(m: re.Match) -> str: nonlocal titles, suffix t = m.group(1).lower().replace(' ', '').replace('.', '') match t: case 'jun': suffix = 'jun.' case 'sen': suffix = 'sen.' case 'dr': titles += 'Dr. ' case 'mag': titles += 'Mag. ' case 'ing': titles += 'Ing. ' case 'dipling': titles += 'Dipl.-Ing. ' case 'di': titles += 'Dipl.-Ing. ' case 'dkfm': titles += 'Dipl.-Kfm. ' case 'ökrat': titles += 'ÖkR ' case 'lkr': titles += 'ÖkR ' return ' ' title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE) given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name)) family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name)) if titles: prefix = titles.strip() family_parts = family_name.split(' ') last = family_parts[-1].lower() if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'): family_name = ' '.join(family_parts[:-1]) if ' ' not in family_name and len(family_name) > 4: family_name = family_name.title() billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR') if is_alpha(given_name): return prefix, given_name.title(), middle_names, family_name, suffix, billing_name given_parts = given_name.split(' ') last = given_parts[-1].lower() if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'): given_name = ' '.join(given_parts[:-1]).title() family_name = family_name.title() billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}' return prefix, given_name, middle_names, family_name, suffix, billing_name if ' ' in family_name or '.' in family_name: if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'): billing_name = family_name.title() family_name = ' '.join(family_name.split(' ')[1:]).title() elif family_name.lower().endswith('veltlinerhof'): billing_name = ' '.join(family_name.split(' ')[::-1]).title() family_name = ' '.join(family_name.split(' ')[:-1]).title() elif 'u.' in family_name: billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und ')) family_name = family_name.split(' ')[0].title() else: billing_name = family_name family_name = family_name.split(' ')[-1].title() if ' + ' in given_name: parts = given_name.split(' + ') family_name = family_name.title() billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] + f' {billing_name or family_name}') given_name = parts[0].title() else: family_name = family_name.title() given_name = given_name.title() return prefix, given_name, middle_names, family_name, suffix, billing_name def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')] mgnrs = [m['MGNR'] for m in members] fbs = parse_flaechenbindungen(in_dir) with utils.csv_open(f'{out_dir}/member.csv') as f_m,\ utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \ utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel,\ utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg: f_m.header( 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', 'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'funktionär', 'active', 'deceased', 'iban', 'bic', 'country', 'postal_dest', 'address', 'email', 'default_kgnr', 'comment') f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') for m in members: mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] funktionaer, deceased = False, False if family_name is None and given_name is None: continue given_name = given_name or '' if WG == 'MATZEN' and given_name.startswith(' '): funktionaer = True if WG == 'GWK' and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): deceased = True family_name = family_name.replace('*', '').replace('(+)', '') given_name = given_name.replace('*', '').replace('(+)', '') family_name = utils.remove_spaces(family_name) given_name = utils.remove_spaces(given_name).replace(', ', ',') ret = normalize_name(family_name, given_name) prefix, given_name, middle_names, family_name, suffix, billing_name = ret n1 = utils.remove_spaces(' '.join(r or '' for r in ret)) n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or '')) if billing_name or n1.lower() != n2.lower(): convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) if not given_name or not family_name: given_name = given_name or '' family_name = family_name or '' invalid(mgnr, 'Name', n1) bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr elif bnr.endswith(' inaktiv'): bnr = bnr.split(' ')[0] if not check_lfbis_nr(bnr): if bnr in ('0', '1234567'): warning(mgnr, 'BetriebsNr.', bnr) else: invalid(mgnr, 'BetriebsNr.', bnr) bnr = None ustid_nr: Optional[str] = m['UID'] if ustid_nr is not None: ustid_nr = ustid_nr.replace(' ', '') if len(ustid_nr) == 8 and ustid_nr.isdigit(): ustid_nr = 'ATU' + ustid_nr elif not USTID_NR_RE.fullmatch(ustid_nr): invalid(mgnr, 'UID', ustid_nr) ustid_nr = None if ustid_nr and not check_ustid_nr_at(ustid_nr): if ustid_nr == 'ATU11111111': warning(mgnr, 'UID', ustid_nr) else: invalid(mgnr, 'UID', ustid_nr) ustid_nr = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] kto_nr: Optional[str] = m['KontoNr'] if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban) iban = None if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' elif bic == 'RLNWATWMIB': bic = 'RLNWATWWMIB' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic) bic = None if bic is not None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] plz = int(m['PLZ']) if m['PLZ'] else None ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] parts = ort.split(' ') if ort else [''] if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()): if len(parts) > 1 and parts[-2].isdigit(): ort = ' '.join(parts[:-2]) new_address = parts[-2] + parts[-1] else: ort = ' '.join(parts[:-1]) new_address = parts[-1] if address is not None and address != ' ' and address != new_address: print(address, new_address) raise RuntimeError() address = parts[-1] if WG == 'GWK' and ort == 'JETZELDORF': ort = 'JETZELSDORF' if ort: ort = ort.upper().strip() if address is not None: address_old = address address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), utils.remove_spaces(address).title()) if address.startswith('Haus Nr.') or \ address.startswith('Nr. ') or \ address.startswith('Nr ') or \ address.isdigit() or (len(address) > 1 and address[:-1].isdigit()): address = ort.title() + ' ' + address.split(' ')[-1] address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\ .replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\ .replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\ .replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß') address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address) if address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address) address = utils.remove_spaces(address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) email: Optional[str] = m['EMail'] if email is not None: if email.isupper(): email = email.lower() if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail']) email = None else: parts = email.split('@') email = f'{parts[0]}@{parts[1].lower()}' zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] if WG == 'GWK' and plz == 1228: plz = 1020 postal_dest = lookup_plz(plz, ort, address) #if mgnr in fbs: # gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020} # if len(gems) == 1: # print(GEM_MAP[list(gems)[0]]) okz = postal_dest % 100000 if postal_dest else None kgnr = lookup_kgnr(okz) active = m['Aktives Mitglied'] or False if kgnr is None: invalid(mgnr, 'KGNr.', ort) elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: glnr = guess_glnr(kgnr) if glnr: new('KG', kgnr, lookup_kg_name(kgnr), f'GL {glnr}') f_kg.row(kgnr, glnr) if 9999 not in GEM_MAP: GEM_MAP[9999] = [] GEM_MAP[9999].append((kgnr, 0)) else: kgnr = None if postal_dest is None: invalid(mgnr, 'PLZ', None) continue pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None f_m.row( mgnr, pred, prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid_nr, m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, deceased, iban, bic, AUSTRIA, postal_dest, address or '-', email, kgnr, m['Anmerkung'] ) phone_1: Optional[str] = m['Telefon'] phone_2: Optional[str] = m['Telefax'] phone_3: Optional[str] = m['Mobiltelefon'] numbers = [] if WG == 'GWK': # Telefax (phone_2) not used numbers = {} def add_number(nr: str, fax: bool = False, comment: str = None, fax_only: bool = False) -> None: mob = nr[4] == '6' numbers[nr] = {'mobile': mob, 'landline': not mob and not fax_only, 'fax': fax, 'comment': None} if phone_1: phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\ .replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ') phone_1 = utils.remove_spaces(phone_1) fax = False if phone_1.endswith(' u. fax'): fax = True phone_1 = ' '.join(phone_1.split(' ')[:-2]) if phone_1.replace(' ', '').replace('/', '').replace('-', '').isdigit() and len(phone_1) <= 20: if phone_1[0] != '0' and '/' in phone_1: for nr in phone_1.split('/'): add_number(normalize_phone_nr(nr, ort), fax) else: add_number(normalize_phone_nr(phone_1, ort), fax) elif re.fullmatch(r'0[0-9/ -]+ od\. 0[0-9/ -]+', phone_1): parts = phone_1.split(' od. ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[1], ort), fax) elif re.fullmatch(r'0[0-9/ -]+ od\. [1-9][0-9/ -]+', phone_1): parts = phone_1.split(' od. ') add_number(normalize_phone_nr(parts[0], ort), False) if parts[0][1] == '6': add_number(normalize_phone_nr(parts[1], ort), fax) else: add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), fax) elif re.fullmatch(r'0[0-9/ -]+ fax 0[0-9/ -]+', phone_1): parts = phone_1.split(' fax ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[1], ort), True, fax_only=True) elif re.fullmatch(r'0[0-9/ -]+ fax [1-9][0-9/ -]+', phone_1): parts = phone_1.split(' fax ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), True, fax_only=True) elif '-' in phone_1 and phone_1.endswith('fax'): nr = re.sub(r'-+ ', '-', phone_1) nr = ' '.join(nr.split(' ')[:-1]) add_number(normalize_phone_nr(nr.split('-')[0], ort), False) add_number(normalize_phone_nr(nr, ort), True, fax_only=True) elif 'fax -' in phone_1: parts = phone_1.split('fax') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[0].strip() + parts[1].strip(), ort), True, fax_only=True) elif phone_1.endswith('fax'): nr = phone_1[:-3].strip() add_number(normalize_phone_nr(nr), False) add_number(normalize_phone_nr(nr), True, fax_only=True) elif re.fullmatch(r'0[0-9/ -]+ u\. fax (od\. |u\. )?[0-9/ -]+', phone_1): parts = phone_1.split(' ') add_number(normalize_phone_nr(parts[0], ort), True) nr = parts[-1] if nr[0] == '0': add_number(normalize_phone_nr(nr, ort)) else: add_number(normalize_phone_nr(parts[0][:5] + nr, ort)) else: parts = phone_1.split(' ') if parts[-1].isalpha(): add_number(normalize_phone_nr(parts[0], ort), comment=parts[-1]) else: for nr in parts: add_number(normalize_phone_nr(nr, ort), fax) if phone_3: for nr in phone_3.split(','): nr = nr.strip() parts = nr.split(' ') comment = None if parts[-1].startswith('(') and parts[-1].endswith(')'): nr = nr[:nr.rindex(' ')].strip() comment = parts[-1][1:-1].strip() elif parts[-1].isalpha(): nr = nr[:nr.rindex(' ')].strip() comment = parts[-1].strip() add_number(normalize_phone_nr(nr, ort), comment=comment) count = 0 for nr, data in numbers.items(): if data['mobile']: count += 1 f_tel.row(mgnr, count, 'mobile', nr, data['comment']) if data['landline']: count += 1 f_tel.row(mgnr, count, 'landline', nr, data['comment']) if data['fax']: count += 1 f_tel.row(mgnr, count, 'fax', nr, data['comment']) else: if phone_1: phone_1 = normalize_phone_nr(phone_1) if len(phone_1) <= 10 or phone_1[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Telefon']) else: numbers.append(phone_1) if phone_1[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_1, None) else: f_tel.row(mgnr, len(numbers), 'landline', phone_1, None) if phone_2: phone_2 = normalize_phone_nr(phone_2) if len(phone_2) <= 8 or phone_2[0] != '+': invalid(mgnr, 'Fax.Nr.', m['Telefax']) else: numbers.append(phone_2) if phone_2[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_2, None) else: f_tel.row(mgnr, len(numbers), 'fax', phone_2, None) if phone_3: phone_3 = normalize_phone_nr(phone_3) if len(phone_3) <= 10 or phone_3[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon']) elif phone_3 not in numbers: numbers.append(phone_3) if phone_3[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_3, None) else: f_tel.row(mgnr, len(numbers), 'landline', phone_3, None) MEMBER_MAP[mgnr] = { 'default_kgnr': kgnr } if billing_name: f_mba.row(mgnr, billing_name, AUSTRIA, postal_dest, address or '-') def migrate_area_commitments(in_dir: str, out_dir: str) -> None: def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]: if nr_str is None: return [] elif nr_str.isdigit() and len(nr_str) <= 6: return [nr_str] elif nr_str.count('/') == 1: parts = nr_str.split('/') if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3: return [parts[0], parts[1]] elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3: return [nr_str] if nr_str.count('/') > 1: parts = nr_str.split('/') if all([p.isdigit() for p in parts]): if all([len(p) <= 1 for p in parts[1:]]): return [f'{parts[0]}/{p}' for p in parts[1:]] elif all([len(p) == len(parts[0]) for p in parts]): return parts if nr_str.startswith(f'{kgnr:05}'): return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr) if nr_str.endswith(' 2000'): return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr) parts = re.split(r' *[,;+&] *', nr_str) if len(parts) == 1: parts = nr_str.split(' / ') if len(parts) == 1 and ' ' not in nr_str: parts = nr_str.split(' ') if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str: parts = nr_str.split(' ') if len(parts) > 1: return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)] m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str) if m is not None: b = m.group(1) f = int(m.group(2)) t = int(m.group(3)) if t < f: t += f - (f % pow(10, len(m.group(3)))) if t - f < 50: return [ gst for counter in range(f, t + 1) for p in [f'{b or ""}{counter}'] for gst in parse_gstnrs(p, kgnr, mgnr) ] invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}') return [] def replace_nrs(m: re.Match, sep: str) -> str: end = m.group(0).endswith(sep) parts = [int(p) for p in m.group(0).split(sep)] text = '' last = None for i, p in enumerate(parts): if last is not None: if last + 1 == p: last = p continue else: text += f'{last}{sep}' last = None if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]: last = p text += f'{p}-' else: text += f'{p}{sep}' if last is not None: text += str(last) return text.strip().strip(sep) + (sep if end else '') def format_gstnr(nrs: List[str]) -> Optional[str]: if len(nrs) == 0: return None nrs = [re.sub(r'\b0+', '', nr) for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr) for nr in nrs])] last = None text = '' for nr in nrs: if last is None: text += nr elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]: text += f'+{nr.split("/")[-1]}' else: text += f', {nr}' last = nr text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text) text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text) return text reeds: Dict[int, Dict[int, str]] = {k: {r: n for rk, r, n in REED_MAP.values() if rk == k} for k in set([k for k, _, _ in REED_MAP.values()])} new_reeds: Dict[Tuple[int, int], int] = {} with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \ utils.csv_open(f'{out_dir}/wb_rd.csv', 'a+') as f_rd: f_fb.header('fbnr', 'mgnr', 'vtrgid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr', 'year_from', 'year_to', 'comment') for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None: continue parz: str = fb['Parzellennummer'] fbnr: int = fb['FBNR'] mgnr: int = fb['MGNR'] gem = GEM_MAP[fb['GNR']] kgnrs = [kgnr for kgnr, gkz in gem] rnr = fb['RNR'] rd_kgnr, rdnr, _ = REED_MAP.get(rnr, (None, None, None)) if rnr else (None, None, None) if mgnr not in MEMBER_MAP: continue kgnr = None if rd_kgnr is None: kgnr = kgnrs[0] elif rd_kgnr in kgnrs: kgnr = rd_kgnr elif (kgnrs[0], rnr) in new_reeds: kgnr = kgnrs[0] rdnr = new_reeds[(kgnr, rnr)] else: rname = lookup_rnr_name(rnr) for k in kgnrs: if k not in reeds: continue try: pos = list(reeds[k].values()).index(rname) r = list(reeds[k].keys())[pos] kgnr = k rdnr = r new_reeds[(kgnr, rnr)] = rdnr break except ValueError: continue if kgnr is None: kgnr = kgnrs[0] rdnr = max([r for _, r, _ in REED_MAP.values() if k == kgnr] + [r for (k, _), r in new_reeds.items() if k == kgnr]) + 1 f_rd.row(kgnr, rdnr, rname) new_reeds[(kgnr, rnr)] = rdnr new('Reed', (kgnr, rdnr), rname) area = int(fb['Flaeche']) if WG == 'MATZEN': gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) else: gstnrs = [] comment, gstnr = None, None if parz is None or parz == '0000': if parz is not None: invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}') gstnrs = [] gstnr = '-' if WG == 'MATZEN' and len(gstnrs) == 0: comment = f'KG {kgnr or 0:05}: {parz}' gstnr = format_gstnr(gstnrs) or gstnr or parz if parz != gstnr.replace('+', '/'): convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr) to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None f_fb.row(fbnr, mgnr, fb['SNR'] + (fb['SANR'] or ''), CULTIVATION_MAP[fb['BANR'] or 1], area, kgnr, gstnr, rdnr, fb['Von'], to, comment) def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: dates = {} fixed = {} last_dates = {} def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None: if lsnr not in fixed: fixed[lsnr] = [] dates[lsnr] = date elif unique: return add(lsnr + '/2', linr, date, unique) fixed[lsnr].append(linr) def get_lsnr(date: datetime.date, lsnr: str) -> str: if date.year < 2000: return date.strftime('%y%m%d00') + lsnr[8:] else: return date.strftime('%Y%m%d') + lsnr[8:] deliveries: List[Tuple[int, str, datetime.date, int, int]] = [ (d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR']) for d in deliveries if d['Lieferscheinnummer'] and not d['Storniert'] ] lsnrs = {d[1] for d in deliveries} for lnr, lsnr, date, zwstid, mgnr in deliveries: if len(lsnr) < 8: continue if lsnr.startswith('22'): lsnr = '20' + lsnr[2:] lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \ else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6])) lsnr_zwstid = lsnr[8] if lsnr_zwstid != zwstid and lsnr_zwstid in BRANCH_MAP.values(): zwstid = lsnr_zwstid if len(lsnr) == 12: if date != lsdate: if date.year == lsdate.year: lsnr_n = get_lsnr(date, lsnr) if lsnr_n not in lsnrs: lsnr = lsnr_n else: warning_delivery(lsnr, mgnr, 'date', date) else: date = datetime.date(lsdate.year, date.month, date.day) if zwstid not in last_dates or not date < last_dates[zwstid]: last_dates[zwstid] = date add(lsnr, lnr, date, unique=True) else: add(lsnr[:12], lnr, date) return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()], key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0]) def migrate_deliveries(in_dir: str, out_dir: str) -> None: global DELIVERY_MAP, MODIFIER_MAP DELIVERY_MAP, MODIFIER_MAP = {}, {} modifiers = { m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung'] and m['Bezeichnung'] != '-' } seasons = {} branches = {} for mod in modifiers.values(): name: str = mod['Bezeichnung'].replace('ausser', 'außer') nr: int = mod['ASNR'] MODIFIER_MAP[name] = mod if WG == 'MATZEN': mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' elif WG == 'GWK': mod['id'] = { 1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG', 5: 'VT', 6: 'MV', 7: 'UP', 8: 'VL', 9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG', }[nr] else: raise NotImplementedError() deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')) delivery_dict = {d['LINR']: d for d in deliveries} fixed = fix_deliveries(deliveries) updated_varieties = {} with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \ utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \ utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr: f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment') f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr', 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen', 'temperature', 'acid', 'scale_id', 'weighing_id', 'comment') f_attr.header('year', 'did', 'dpnr', 'attrid') for lsnr, linrs, date in fixed: if date.year not in seasons: seasons[date.year] = { 'currency': 'EUR' if date.year >= 2001 else 'ATS', 'precision': WGMASTER_PRECISION, 'start': date, 'end': date, 'nr': 0, } s = seasons[date.year] if date > s['end']: s['end'] = date s['nr'] += 1 snr = s['nr'] mgnr = delivery_dict[linrs[0]]['MGNR'] znr = delivery_dict[linrs[0]]['ZNR'] glob_waage = set(delivery_dict[linr]['Waagentext'] for linr in linrs if delivery_dict[linr]['Waagentext']) zwstid = lsnr[8] if zwstid not in branches: branches[zwstid] = {} if date not in branches[zwstid]: branches[zwstid][date] = 0 branches[zwstid][date] += 1 lnr = branches[zwstid][date] if BRANCH_MAP[znr] != zwstid: if zwstid not in BRANCH_MAP.values(): zwstid = BRANCH_MAP[znr] comments = [] attributes = set() for dpnr, linr in enumerate(linrs, start=1): d = delivery_dict[linr] DELIVERY_MAP[linr] = (date.year, snr, dpnr) if lsnr != d['Lieferscheinnummer']: renumber_delivery(d['Lieferscheinnummer'], lsnr) oe = d['OechsleOriginal'] or d['Oechsle'] kmw = GRADATION_MAP[oe] sortid = d['SNR'].upper() if d['SANR']: attributes.add(d['SANR']) if len(sortid) != 2: attributes.add(sortid[2:]) sortid = sortid[:2] if WG == 'MATZEN': if sortid == 'HU': # Gr.Veltliner (Huber) sortid = 'GV' attributes.add('HU') elif sortid == 'SV': sortid = 'SW' elif sortid == 'WC': # WEIẞBURGUNDER/CHARDONNAY sortid = 'SW' if 'H' in attributes: attributes.remove('H') attributes.add('HK') if 'W' in attributes: attributes.remove('W') if d['SNR'] != sortid: line = f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{",".join(list(attributes)) or None}' if line not in updated_varieties: updated_varieties[line] = 0 updated_varieties[line] += 1 qualid = QUAL_MAP[d['QSNR']] if qualid != 'WEI' and d['Abgewertet']: if qualid == 'RSW': qualid = 'WEI' else: warning_delivery(lsnr, mgnr, 'qualid', f'{qualid} (abgewertet)') kgnr, rdnr = None, None if d['GNR']: gem = GEM_MAP.get(d['GNR'], []) if len(gem) == 1: kgnr = gem[0][0] if d['RNR']: kgnr, rdnr, _ = REED_MAP[d['RNR']] if kgnr is None: m = MEMBER_MAP[mgnr] kgnr = m['default_kgnr'] if kgnr is None: pass elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: warning_delivery(lsnr, mgnr, 'KGNr.', kgnr) kgnr = None hkid = lookup_hkid(kgnr, qualid) handwiegung = d['Handwiegung'] or False waage = list(glob_waage)[0] if len(glob_waage) == 1 else d['Waagentext'] scale_id, weighing_id = None, None if waage: # Waagenr: 1 ID: 19 # Waagennummer: 1 Speichernummer: 9166 waage = re.split(r' +', waage) scale_id = waage[1] weighing_id = waage[3] if waage[2] == 'Speichernummer' else f'{date}/{waage[3]}' elif len(glob_waage) == 0 and not handwiegung: handwiegung = True comment: Optional[str] = d['Anmerkung'] acid = d['Säure'] hand, lesemaschine = None, None if comment: comment = comment.replace('Söure', 'Säure') if comment.startswith('Säure'): acid = float(comment.split(' ')[-1].replace(',', '.')) comment = None elif comment == 'Maschine': hand = False comment = None elif comment == 'Hand': hand = True comment = None if comment: comments.append(comment) gerebelt = True if WG == 'MATZEN' else d['Gerebelt'] or False f_part.row( date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr, gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False, hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment ) for attrid in attributes: f_attr.row(date.year, snr, dpnr, attrid) f_delivery.row(date.year, snr, date, d['Uhrzeit'], zwstid, lnr, lsnr, mgnr, '; '.join(comments) or None) for k, v in updated_varieties.items(): print(k + (f' ({v} times)' if v > 1 else '')) with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod: f_part_mod.header('year', 'did', 'dpnr', 'modid') for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): if m['LINR'] not in DELIVERY_MAP or m['ASNR'] not in modifiers: continue y, did, dpnr = DELIVERY_MAP[m['LINR']] f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id']) with utils.csv_open(f'{out_dir}/season.csv') as f_season, \ utils.csv_open(f'{out_dir}/modifier.csv') as f_mod: f_season.header('year', 'currency', 'precision', 'start_date', 'end_date') f_mod.header('year', 'modid', 'ordering', 'name', 'abs', 'rel', 'standard', 'quick_select') for y, s in seasons.items(): f_season.row(y, s['currency'], s['precision'], s['start'], s['end']) for m in modifiers.values(): abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None rel_v = m['AZASProzent'] / 100.0 if m['AZASProzent'] is not None else None f_mod.row(y, m['id'], m['ASNR'], m['Bezeichnung'], abs_v, rel_v, m.get('Standard', False), m['Schnellauswahl']) def migrate_payments(in_dir: str, out_dir: str) -> None: variant_map: Dict[int, Tuple[int, int]] = {} variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {} year_map = {} az_map = {} p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv')) sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])} p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv')) qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])} with utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment: f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time', 'bucket_1_name', 'bucket_2_name', 'bucket_3_name', 'comment', 'data') for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'): year = p['Lesejahr'] if year is None: continue if year not in year_map: year_map[year] = 0 year_map[year] += 1 variant_map[p['AZNR']] = (year, year_map[year]) var = p.copy() del var['AZNR'] del var['Datum'] del var['Beschreibung'] del var['Lesejahr'] del var['Titel'] del var['TeilzahlungNr'] data = { 'mode': 'wgmaster', **var, 'AuszahlungSorten': {}, 'AuszahlungSortenQualitätsstufe': {}, } azs = data['AuszahlungSorten'] for s in sort_map.get(p['AZNR'], []): del s['AZNR'] del s['ID'] if s['Oechsle'] is None: continue snr = s['SNR'].upper() sanr = s['SANR'] or '' azs[snr] = azs.get(snr, {}) azs[snr][sanr] = azs[snr].get(sanr, {}) geb = 'Gebunden' if s['gebunden'] else 'NichtGebunden' azs[snr][sanr][geb] = azs[snr][sanr].get(geb, {}) azs[snr][sanr][geb][s['Oechsle']] = s['Betrag'] curves = [] for sortid, d1 in azs.items(): for attrid, d2 in d1.items(): for geb, d3 in d2.items(): oe = [d3.get(n, 0.0) for n in range(max(d3.keys()) + 1)] if oe not in curves: curves.append(oe) azs[sortid][attrid][geb] = curves.index(oe) for i, c in enumerate(curves): n = {} d = 0 for oe, p0, p1, p2 in zip(range(0, len(c) + 1), [0] + c, c, c[1:] + [c[len(c) - 1]]): d1, d2 = round(p1 - p0, 4), round(p2 - p1, 4) if d1 == d: continue d = d2 if p0 > 0: n[f'{oe - 1}oe'] = p0 n[f'{oe}oe'] = p1 if c[len(c) - 1] > 0: n[f'{len(c) - 1}oe'] = c[len(c) - 1] keys = list(n.keys()) vals = list(n.values()) if len(n) >= 2 and vals[0] == vals[1]: del n[keys[0]] del n[keys[1]] n = {keys[1]: vals[1], **n} if len(n) == 1: n = {'73oe': list(n.values())[0]} curves[i] = n azs['Kurven'] = curves azq = data['AuszahlungSortenQualitätsstufe'] for q in qual_map.get(p['AZNR'], []): del q['AZNR'] del q['ID'] qualid = QUAL_MAP[q['QSNR']] snr = q['SNR'] sanr = q['SANR'] or '' azq[qualid] = azq.get(qualid, {}) azq[qualid][snr] = azq[qualid].get(snr, {}) azq[qualid][snr][sanr] = q['Betrag'] for qualid, d1 in azq.items(): for sortid, d2 in d1.items(): if len(set(d2.values())) == 1: azq[qualid][sortid] = list(d2.values())[0] for qualid, d1 in azq.items(): try: if len(set(d1.values())) == 1: azq[qualid] = list(d1.values())[0] except TypeError: pass for k, v in data.copy().items(): if v is None or (type(v) == bool and not v): del data[k] az_map[p['AZNR']] = data test = (p['TeilzahlungNr'] == 7) if not test: if year not in variant_year_map: variant_year_map[year] = [] variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr'])) f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None, 'Gebunden', 'Nicht gebunden', 'Abgewertet', p['Beschreibung'], json.dumps(data)) def get_modifiers(modifiers: str) -> Tuple[int, float]: if modifiers is None or modifiers == '': return 0, 0.0 a, r = 0, 0.0 for name in modifiers.split(' / '): mod = MODIFIER_MAP[name] if mod['AZASProzent']: r += mod['AZASProzent'] / 100.0 if mod['AZAS']: a += round(mod['AZAS'] * pow(10, WGMASTER_PRECISION)) return a, r def get_prices(aznr: int, sortid: str, attribute: Optional[str], oe: int) -> Tuple[int, int, int]: az = az_map[aznr] qs = az['AuszahlungSortenQualitätsstufe'] so = az['AuszahlungSorten'] p1, p2, p3 = 0, 0, 0 if qs: p3 = qs['WEI'] if type(p3) == dict: p3 = p3[sortid] if type(p3) == dict: p3 = p3[attribute or ''] if sortid.upper() in so: so = so[sortid.upper()][attribute or ''] p2 = so['NichtGebunden'][oe] p1 = so['Gebunden'][oe] if 'Gebunden' in so else p2 prec = pow(10, WGMASTER_PRECISION) return round(p1 * prec), round(p2 * prec), round(p3 * prec) with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay: f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'bucket_1', 'bucket_2', 'bucket_3', 'amount') deliveries = {d['LINR']: d for d in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')} for linr, (y, did, dpnr) in DELIVERY_MAP.items(): p = deliveries[linr] if y not in variant_year_map: continue for aznr, avnr, tznr in variant_year_map[y]: val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung'] val = round(val * pow(10, WGMASTER_PRECISION)) b1, b2, b3 = 0, 0, 0 # prices = get_prices(aznr, p['SNR'], p['SANR'], int(p['Oechsle'])) # mod = get_modifiers(p['BAbschlaegeString']) # if not az_map[aznr].get('AbschlägeBerücksichtigen', False): # mod = 0, 0.0 gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden']) if QUAL_MAP[p['QSNR']] == 'WEI': b3 += gew else: b2 += gew - geb_gew b1 += geb_gew # check_val = b1 * (prices[0] + mod[0]) + b2 * (prices[1] + mod[0]) + b3 * (prices[2] + mod[0]) # check_val *= 1 + mod[1] # check_val = round(check_val / 100) * 100 # if check_val != val: # print(p['LINR'], y, did, dpnr, avnr, val, check_val) # else: # print("GOOD") f_del_pay.row(y, did, dpnr, avnr, b1, b2, b3, val) def migrate_parameters(in_dir: str, out_dir: str) -> None: params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')} name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und').replace(' Im ', ' im ') suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '') types = { 'reggenmbh': 'reg. Gen.m.b.H.' } tokens = { 'MATZEN': ('WGM', 'WG Matzen'), 'GWK': ('GWK', 'GWK') }.get(WG, (None, None)) ort = params['MANDANTENORT'].title() new_params: Dict[str, Optional[str]] = { 'CLIENT_NAME_TOKEN': tokens[0], 'CLIENT_NAME_SHORT': tokens[1], 'CLIENT_NAME': name, 'CLIENT_NAME_SUFFIX': None, 'CLIENT_NAME_TYPE': types[suffix], 'CLIENT_PLZ': params['MANDANTENPLZ'], 'CLIENT_ORT': ort, 'CLIENT_ADDRESS': params['MANDANTENSTRASSE'], 'CLIENT_IBAN': None, 'CLIENT_BIC': None, 'CLIENT_USTIDNR': params['MANDANTENUID'].replace(' ', ''), 'CLIENT_LFBISNR': params['MANDANTENBETRIEBSNUMMER'], 'CLIENT_PHONE': normalize_phone_nr(params['MANDANTENTELEFON'], ort), 'CLIENT_FAX': normalize_phone_nr(params['MANDANTENTELEFAX'], ort), 'CLIENT_EMAIL': params['MANDANTENEMAIL'], 'CLIENT_WEBSITE': params.get('MANDANTENHOMEPAGE', None), 'DELIVERY_OBLIGATION': params.get('LIEFERPFLICHT/GA1', None), 'DELIVERY_RIGHT': params.get('LIEFERRECHT/GA1', None), 'VAT_NORMAL': '0.20', 'VAT_REDUCED': '0.10', 'VAT_FLATRATE': '0.13', 'DOCUMENT_SENDER': params.get('ABSENDERTEXT2', None), 'TEXT_DELIVERY_NOTE': params.get('LIEFERSCHEINTEXT', None).replace(' daß ', ' dass ').replace('obige Angaben maßgeblicher Veränderungen', 'maßgeblichen Veränderungen obiger Angaben'), } with utils.csv_open(f'{out_dir}/client_parameter.csv') as f: f.header('param', 'value') for param, value in new_params.items(): f.row(param, value) def main() -> None: global DB_CNX, QUIET, WG parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, help='The input directory where the exported csv files are stored') parser.add_argument('out_dir', type=str, help='The output directory where the migrated csv file should be stored') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Be less verbose') parser.add_argument('-d', '--database', metavar='DB', required=True, help='The sqlite database file to look up information') parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str, choices=('MATZEN', 'GWK')) args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) QUIET = args.quiet WG = args.genossenschaft DB_CNX = sqlite3.connect(args.database) migrate_gradation(args.in_dir, args.out_dir) migrate_branches(args.in_dir, args.out_dir) migrate_grosslagen(args.in_dir, args.out_dir) migrate_gemeinden(args.in_dir, args.out_dir) migrate_reeds(args.in_dir, args.out_dir) migrate_attributes(args.in_dir, args.out_dir) migrate_cultivations(args.in_dir, args.out_dir) migrate_area_commitment_types(args.in_dir, args.out_dir) migrate_members(args.in_dir, args.out_dir) migrate_area_commitments(args.in_dir, args.out_dir) migrate_deliveries(args.in_dir, args.out_dir) migrate_payments(args.in_dir, args.out_dir) migrate_parameters(args.in_dir, args.out_dir) DB_CNX.close() if __name__ == '__main__': main()