#!/usr/bin/env python3 # -*- coding: utf-8 -*- from typing import Dict, Any, Tuple, Optional, List, Iterable from enum import Enum import argparse import os import re import sys import sqlite3 import requests import datetime import json import string import utils class WG(Enum): MATZEN = 1 WINZERKELLER = 2 @classmethod def from_str(cls, name: str): return cls({wg.name: wg.value for wg in WG}[name]) DB_CNX: Optional[sqlite3.Connection] = None QUIET: bool = False CLIENT: Optional[WG] = None USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}') GRADATION_MAP: Optional[Dict[float, float]] = None CULTIVATION_MAP: Optional[Dict[int, str]] = None BRANCH_MAP: Optional[Dict[int, str]] = None GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None GROSSLAGE_MAP: Optional[Dict[int, int]] = None MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None MODIFIER_MAP: Optional[Dict[str, Dict]] = None PARAMETERS: Optional[Dict[str, str]] = None AUSTRIA = 40 WGMASTER_PRECISION = 4 QUAL_MAP: Dict[int, str] = { 0: 'WEI', 1: 'RSW', 2: 'LDW', 3: 'QUW', 4: 'KAB', 5: 'SPL', } ORT_NAMES: Dict[str, Optional[str]] = { 'Pirawarth': None, 'Raggendorf': None, 'Matzen': 'Matzner', 'Matzn': None, 'Stillfried': None, 'Harras': None, 'Gänserndorf': None, 'Sulz': None, 'Brünn': None, 'Wien': None, 'Angern': None, 'Schweinbarth': None, 'Hohenruppersdorf': None, 'Grub': None, 'Auersthal': None, 'Ollersdorf': None, 'Spannberg': None, 'Ebenthal': None, 'Bockfließ': None, 'Dörfless': 'Dörfleser', 'Dörfles': None, 'Ableiding': None, 'Absberg': None, 'Eibesbrunn': None, 'Engersdorf': None, 'Enzersfeld': None, 'Großebersdorf': None, 'Hollabrunn': None, 'Korneuburg': None, 'Königsbrunn': None, 'Laa': None, 'Leopoldau': None, 'Manhartsbrunn': None, 'Mannhartsbrunn': 'Manhartsbrunner', 'Münichsthal': None, 'Pernau': None, 'Pillichsdorf': None, 'Retz': None, 'Russbach': None, 'Schleinbach': None, 'Seefeld': None, 'Seyring': None, 'Stammersdorf': None, 'Stelzendorf': None, 'Traunfeld': None, 'Tresdorf': None, 'Trumau': None, 'Wolkersdorf': None, 'Znaim': None, 'Obersdorf': None, 'Sechshaus': None, 'Rußbach': None, } STREET_NAMES: Dict[str, str] = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', 'Kurhaus-Str.': 'Kurhausstraße', 'Kurhaus-Straße': 'Kurhausstraße', 'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße', 'Flustraße': 'Flurstraße', 'St.Laurentstraße': 'St.-Laurentstraße', 'Josef Seitzstraße': 'Josef-Seitz-Straße', 'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße', 'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße', 'R. Virchow-Straße': 'Rudolf-Virchow-Straße', 'Grubere Hauptstraße': 'Gruber Hauptstraße', 'Groß Inzersdorf': 'Großinzersdorf', 'Erdpress': 'Erdpreß', 'Hochleitengasse': 'Hochleithengasse', 'Bei Der Gösselmühle': 'Bei der Gösslmühle', 'Dr. Peschlstraße': 'Dr.-Peschl-Straße', 'Dr.Peschlstraße': 'Dr.-Peschl-Straße', 'Dr. Salzbornstraße': 'Dr.-Salzborn-Straße', 'Elsa Brandström-Straße': 'Elsa-Brandström-Straße', 'Franz Ecker Siedlung': 'Franz-Ecker-Siedlung', 'Franz-Ecker Siedlung': 'Franz-Ecker-Siedlung', 'Franz Gillygasse': 'Franz-Gilly-Gasse', 'Franz V. Zülowstraße': 'Franz-von-Zülow-Straße', 'Gr. Nondorf': 'Großnondorf', 'In Der Trift': 'In der Trift', 'Johann Degengasse': 'Johann-Degen-Gasse', 'Josef Fürnkranz Siedlung': 'Josef-Fürnkranz-Siedlung', 'Kaiser Franz Josef Platz': 'Kaiser-Franz-Josef-Platz', 'Klein Haugsdorf': 'Kleinhaugsdorf', 'Leopold Leuthnerstraße': 'Leopold-Leuthner-Straße', 'Lh.-Mayer-Platz': 'Landeshauptmann-Mayer-Platz', 'Manhartsbr.Straße': 'Manhartsbrunner Straße', 'Maria Lourd Weg': 'Maria-Lourd-Weg', 'U. Weißgasse Straße': 'Untere Weißgerberstraße', 'Dr. Josef-Levit-Straße': 'Dr.-Josef-Levit-Straße', 'Karl Katschthaler-Straße': 'Karl-Katschthaler-Straße', } def new(t: str, ids: Any, name: str, comment: str = None) -> None: print(f'\x1B[1;32mNew {t:>6}: {str(ids):>10} ({name}{", " + comment if comment else ""})\x1B[0m', file=sys.stderr) def success(mgnr: int, key: str, value) -> None: if not QUIET: print(f'\x1B[1;32m{mgnr:>6} : {key:<12} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value, active: bool) -> None: act = 'A' if active else '?' if active is None else ' ' print(f'\x1B[1;33m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value, active: Optional[bool]) -> None: act = 'A' if active else '?' if active is None else ' ' print(f'\x1B[1;31m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr) def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None: if not QUIET: print(f'\x1B[1m{lsnr_1:<15} -> {lsnr_2:<15}\x1B[0m', file=sys.stderr) def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def convert(mgnr: int, key: str, old_value: str, new_value) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6} : {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: if not QUIET: print(f'\x1B[1m{mgnr:>6} : ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_nr_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def modulo(a: str, b: int) -> int: s = 0 for ch in a: s = (s * 10 + int(ch)) % b return s def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) return modulo(s, 97) == 1 def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]: if nr is None: return None nr = nr.replace('/', ' ').strip() if nr.count('-') > 1 or len(nr.split('-')[-1]) > 3: nr = nr.replace('-', '') if nr[0] == '0': nr = '+43 ' + nr[1:] elif nr.startswith('43'): nr = '+' + nr elif CLIENT == WG.WINZERKELLER and ort: ort = ort.upper().strip() if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', 'EIBESBRUNN'): nr = f'+43 2245 {nr}' elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'): nr = f'+43 2944 {nr}' elif ort in ('HADRES'): nr = f'+43 2943 {nr}' else: print(nr, ort) raise RuntimeError() if nr.startswith('+43'): if nr[4] == '6': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}' elif nr[4] == '1': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3]} {nr[3:]}' elif nr[4] == '2': nr = nr.replace(' ', '') nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}' return nr.strip() def fix_street_name(name: str) -> str: if name in STREET_NAMES: return STREET_NAMES[name] orte = [(k, v) for k, v in ORT_NAMES.items() if name.startswith(k + 'er')] if (name.endswith('straße') or name.endswith('platz')) and len(orte) == 1: return f'{orte[0][1] or orte[0][0] + "er"} {name[len(orte[0][0]) + 2:].title()}'.replace(' ', ' ') return name def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/') if r.status_code != 200: return None data = r.json() return sum([n['fl'] for n in data['properties']['nutzungen']]) def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: members = {} for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if f['MGNR'] not in members: members[f['MGNR']] = {} members[f['MGNR']][f['FBNR']] = f return members def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None ort = ort.replace('0', 'O').replace('SZ', 'SS') if ort.upper() == 'PILLICHSDORF' and plz == 2212: plz = 2211 elif ort.upper() == 'ENZERSFELD' and plz == 2203: plz = 2202 elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212: ort = 'GROSSENGERSDORF' elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123: plz = 2122 elif ort.upper() == 'FRAUENDORF' and plz == 3710: plz = 3714 elif ort.upper() == 'MAISSAU' and plz == 3721: ort = 'UNTERDÜRNBACH' elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074: plz = 2070 elif ort.upper() == 'DROSENDORF' and plz == 2095: ort = 'DROSENDORF ALTSTADT' elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033: plz = 2023 elif ort.upper() == 'NIEDERSCHLEINZ' and plz == 3721: plz = 3714 cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) > 1: rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] if ort == 'VELM-GÖTZENDORF': parts = address.split(' ') street = ' '.join(parts[:-1]) nr = int(parts[-1].split('-')[0]) if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): # Velm return plz * 100000 + 3572 else: # Götzendorf return plz * 100000 + 3571 raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})') def lookup_kgnr(okz: Optional[int]) -> Optional[int]: if okz is None: return None cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,)) rows: List[Tuple[int]] = cur.fetchall() cur.close() if len(rows) == 1: return rows[0][0] return None def lookup_gem_name(name: str) -> List[Tuple[int, int]]: gem_name, hkid = None, None if CLIENT == WG.MATZEN: hkid = "'WLWV'" if name.lower() == 'dörfles': gem_name = 'Weikendorf' elif name.lower() == 'velm-götzendorf': return [(6027, 30859), (6007, 30859)] elif name.lower() == 'grub': name = 'Grub an der March' elif CLIENT == WG.WINZERKELLER: hkid = "'WLWV', 'WIEN', 'WLWG'" if name.endswith('*'): name = name[:-1].strip() if name.lower() == 'joching': return [(12185, 31351)] elif name.lower() == 'kreuttal': return [(15206, 31627), (15221, 31627), (15226, 31627)] elif name.lower() == 'hochleithen': return [(15219, 31622), (15223, 31622), (15202, 31622)] elif name.lower() == 'wolfpassing': gem_name = 'Hochleithen' elif name.lower() == 'seebarn': gem_name = 'Harmannsdorf' elif name.lower() == 'königsbrunn': gem_name = 'Enzersfeld im Weinviertel' elif name.lower() == 'wien': return [(1616, 90001), (1617, 90001)] elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'): gem_name = 'Sitzendorf an der Schmida' elif name.lower() == 'dietersdorf': gem_name = 'Hollabrunn' elif name.lower() == 'altenmarkt': name = 'Altenmarkt im Thale' elif name.lower() == 'eitzerstal': name = 'Eitzersthal' elif name.lower() == 'gross': gem_name = 'Hollabrunn' elif name.lower() == 'auggenthal': name = 'Augenthal' elif name.lower() == 'karlsdorf': name = 'Pfaffendorf' elif name.lower() == 'kleinhaugsdorf': name = 'Augenthal' elif name.lower() == 'merkersdorf': gem_name = 'Hardegg' elif name.lower() == 'retz': name = 'Retz Altstadt' elif name.lower() == 'heldenberg': return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)] elif name.lower() == 'retzbach': return [(18129, 31038), (18112, 31038), (18117, 31038)] elif name.lower() == 'dietmannsdorf': gem_name = 'Zellerndorf' elif name.lower() == 'sierndorf': gem_name = 'Sierndorf' elif name.lower() == 'waltersdorf': gem_name = 'Staatz' elif name.lower() == 'viendorf': name = 'Viendorf Weingebirge' elif name.lower() == 'stoitzendorf': return [(10137, 31105)] elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'): name = name.replace(' ', '') elif name.lower() == 'drosendorf': name = 'Drosendorf Stadt' elif name.lower() == 'etzmannsdorf': name = 'Etzmannsdorf bei Straning' elif name.lower() == 'roggendorf': gem_name = 'Röschitz' elif name.lower() == 'wilhelmsdorf': gem_name = 'Poysdorf' elif name.lower() == 'nappersdorf-kammersdorf': return [(9008, 31028), (9026, 31028), (9032, 31028), (9037, 31028), (9051, 31028), (9067, 31028)] cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " "FROM AT_kg k " "JOIN AT_gem g ON g.gkz = k.gkz " "JOIN wb_gem wg ON wg.gkz = g.gkz " f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})", (name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ') .replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() cur.close() if gem_name: rows = [row for row in rows if row[3] == gem_name] if len(rows) == 1: return [(k, g) for k, _, g, _ in rows] print(name, rows) raise RuntimeError() def lookup_kg_name(kgnr: int) -> str: cur = DB_CNX.cursor() cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() return rows[0][0] if len(rows) > 0 else None def lookup_rnr_name(rnr: int) -> str: return REED_MAP[rnr][2] def lookup_hkid(kgnr: Optional[int], qualid: str) -> str: hkid = None if qualid in ('WEI', 'RSW'): return 'OEST' elif kgnr is None: if CLIENT in (WG.MATZEN, WG.WINZERKELLER): hkid = 'WLNO' else: cur = DB_CNX.cursor() cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz " "WHERE kg.kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() hkid = rows[0][0] if qualid == 'LDW': if hkid == 'WIEN': hkid = 'WLXX' elif hkid[:2] in ('WL', 'BL', 'SL'): hkid = hkid[:2] + 'XX' else: if hkid.startswith('WL') and not hkid.endswith('XX'): hkid = 'WLNO' return hkid def guess_glnr(kgnr: int) -> Optional[int]: cur = DB_CNX.cursor() cur.execute("SELECT kgnr FROM AT_kg " "WHERE gkz / 100 != 900 AND gkz / 100 = (SELECT gkz / 100 FROM AT_kg WHERE kgnr = ?)", (kgnr,)) rows0 = cur.fetchall() cur.execute("SELECT kgnr FROM AT_kg " "WHERE gkz / 100 != 900 AND gkz = (SELECT gkz FROM AT_kg WHERE kgnr = ?)", (kgnr,)) rows1 = cur.fetchall() cur.close() glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows0 if k in GROSSLAGE_KG_MAP])) if len(glnrs) == 0: return None elif len(glnrs) == 1: return glnrs[0] glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows1 if k in GROSSLAGE_KG_MAP])) return glnrs[0] if len(glnrs) > 0 else None def migrate_gradation(in_dir: str, out_dir: str) -> None: global GRADATION_MAP GRADATION_MAP = {} for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'): GRADATION_MAP[g['Oechsle']] = g['KW'] def migrate_branches(in_dir: str, out_dir: str) -> None: global BRANCH_MAP BRANCH_MAP = {} with utils.csv_open(f'{out_dir}/branch.csv') as f: f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr') for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'): BRANCH_MAP[b['ZNR']] = b['Kennbst'] address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) if CLIENT == WG.MATZEN: address = 'Schloßstraße 6' postal_dest = 224303541 tel, mob = normalize_phone_nr(b['Telefon']), None if tel and tel[4] == '6': mob, tel = tel, None f.row(b['Kennbst'], b['Name'].strip().title(), AUSTRIA, postal_dest, address, tel, normalize_phone_nr(b['Telefax']), mob) def migrate_grosslagen(in_dir: str, out_dir: str) -> None: global GROSSLAGE_MAP GROSSLAGE_MAP = {} glnr = 0 with utils.csv_open(f'{out_dir}/wb_gl.csv') as f: f.header('glnr', 'name') for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): glnr += 1 if CLIENT == WG.WINZERKELLER and gl['GLNR'] == 8: GROSSLAGE_MAP[8] = 6 continue GROSSLAGE_MAP[gl['GLNR']] = glnr f.row(glnr, gl['Bezeichnung']) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP, GROSSLAGE_KG_MAP GEM_MAP, GROSSLAGE_KG_MAP = {}, {} inserted = set() with utils.csv_open(f'{out_dir}/wb_kg.csv') as f: f.header('kgnr', 'glnr') for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: if kgnr in inserted: continue inserted.add(kgnr) glnr = GROSSLAGE_MAP[g['GLNR']] GROSSLAGE_KG_MAP[kgnr] = glnr f.row(kgnr, glnr) def migrate_reeds(in_dir: str, out_dir: str) -> None: global REED_MAP REED_MAP = {} with utils.csv_open(f'{out_dir}/wb_rd.csv') as f: f.header('kgnr', 'rdnr', 'name') for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper() or name.islower(): name = name.title() try: gem = GEM_MAP[r['GNR']] kgnr = gem[0][0] if len(gem) != 1: print(gem, name, '->', gem[0]) except KeyError: print(f'Invalid GNR {r["GNR"]} for reed {name}') continue rdnr = max([n for k, n, _ in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr, name) f.row(kgnr, rdnr, name) def migrate_attributes(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f: f.header('attrid', 'name', 'active', 'max_kg_per_ha', 'strict', 'fill_lower') for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'): if a['SANR'] is None: continue max_kg = int(a['KgProHa']) if a['KgProHa'] is not None else None if max_kg == 10_000: max_kg = None f.row(a['SANR'], a['Attribut'], True, max_kg, False, 0) if CLIENT == WG.MATZEN: f.row('M', 'Matzen', False, None, False, 0) f.row('HU', 'Huber', False, None, False, 0) def migrate_cultivations(in_dir: str, out_dir: str) -> None: global CULTIVATION_MAP CULTIVATION_MAP = {} with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f: f.header('cultid', 'name', 'description') for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): name: str = c['Bezeichnung'] cultid = name[0].upper() if name.isupper(): cultid = name elif 'biolog' in name.lower(): cultid = 'BIO' CULTIVATION_MAP[c['BANR']] = cultid f.row(cultid, name, None) def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/area_commitment_type.csv') as f: f.header('vtrgid', 'sortid', 'attrid', 'disc', 'min_kg_per_ha', 'penalty_per_kg', 'penalty_amount', 'penalty_none') for t in utils.csv_parse_dict(f'{in_dir}/TLiefermengen.csv'): sortid: str = t['SNR'] if not sortid or sortid == 'SV': continue menge = int(t['ErwarteteLiefermengeProHa']) f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, menge, None, None, None) bio = [] if CLIENT == WG.MATZEN: bio = ['GV', 'ZW', 'MT'] f.row('BM', 'BM', None, None, None, None, None, None) elif CLIENT == WG.WINZERKELLER: bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU'] for sortid in bio: f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None) def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]: letters = string.ascii_letters + 'äöüßÄÖÜẞ-' double_names = ['eva maria', 'maria theresia'] def is_alpha(s: str) -> bool: return all(c in letters for c in s) if s.lower() not in double_names else True if CLIENT == WG.WINZERKELLER: if 'BEZIRKSBAUERNKAMMER' == family_name: return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach' elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'): return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach' elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN': return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen' if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \ len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name): return None, given_name.title(), None, family_name.title(), None, None prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None if given_name.startswith('z.H. '): billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR') parts = given_name.split(' ') given_name = parts[1] family_name = parts[2] given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ') given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)', lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name) given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE) titles = '' def repl_title(m: re.Match) -> str: nonlocal titles, suffix t = m.group(1).lower().replace(' ', '').replace('.', '') match t: case 'jun': suffix = 'jun.' case 'sen': suffix = 'sen.' case 'dr': titles += 'Dr. ' case 'mag': titles += 'Mag. ' case 'ing': titles += 'Ing. ' case 'dipling': titles += 'Dipl.-Ing. ' case 'di': titles += 'Dipl.-Ing. ' case 'dkfm': titles += 'Dipl.-Kfm. ' case 'ökrat': titles += 'ÖkR ' case 'lkr': titles += 'ÖkR ' return ' ' title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE) given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name)) family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name)) if titles: prefix = titles.strip() family_parts = family_name.split(' ') last = family_parts[-1].lower() if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'): family_name = ' '.join(family_parts[:-1]) if ' ' not in family_name and len(family_name) > 4: family_name = family_name.title() billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR') if is_alpha(given_name): return prefix, given_name.title(), middle_names, family_name, suffix, billing_name given_parts = given_name.split(' ') last = given_parts[-1].lower() if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'): given_name = ' '.join(given_parts[:-1]).title() family_name = family_name.title() billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}' return prefix, given_name, middle_names, family_name, suffix, billing_name if ' ' in family_name or '.' in family_name: if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'): billing_name = family_name.title() family_name = ' '.join(family_name.split(' ')[1:]).title() elif family_name.lower().endswith('veltlinerhof') or family_name.lower().endswith('biohof'): billing_name = ' '.join(family_name.split(' ')[::-1]).title() family_name = ' '.join(family_name.split(' ')[:-1]).title() elif 'u.' in family_name: billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und ')) family_name = family_name.split(' ')[0].title() else: billing_name = family_name family_name = family_name.split(' ')[-1].title() if ' + ' in given_name: parts = given_name.split(' + ') family_name = family_name.title() billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] + f' {billing_name or family_name}') given_name = parts[0].title() else: family_name = family_name.title() given_name = given_name.title() return prefix, given_name, middle_names, family_name, suffix, billing_name def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')] mgnrs = [m['MGNR'] for m in members] fbs = parse_flaechenbindungen(in_dir) with utils.csv_open(f'{out_dir}/member.csv') as f_m, \ utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \ utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel, \ utils.csv_open(f'{out_dir}/member_email_address.csv') as f_email, \ utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg: f_m.header( 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', 'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'organic', 'funktionär', 'active', 'deceased', 'iban', 'bic', 'country', 'postal_dest', 'address', 'default_kgnr', 'comment') f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') f_email.header('mgnr', 'nr', 'address', 'comment') for m in members: mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] funktionaer, deceased = False, False if family_name is None and given_name is None: continue given_name = given_name or '' if CLIENT == WG.MATZEN and given_name.startswith(' '): funktionaer = True if CLIENT == WG.WINZERKELLER and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): deceased = True family_name = family_name.replace('*', '').replace('(+)', '') given_name = given_name.replace('*', '').replace('(+)', '') family_name = utils.remove_spaces(family_name) given_name = utils.remove_spaces(given_name).replace(', ', ',') ret = normalize_name(family_name, given_name) prefix, given_name, middle_names, family_name, suffix, billing_name = ret n1 = utils.remove_spaces(' '.join(r or '' for r in ret)) n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or '')) if billing_name or n1.lower() != n2.lower(): convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) if not given_name or not family_name: given_name = given_name or '' family_name = family_name or '' invalid(mgnr, 'Name', n1, active) bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr elif bnr.endswith(' inaktiv'): bnr = bnr.split(' ')[0] if not check_lfbis_nr(bnr): if bnr in ('0', '1234567'): warning(mgnr, 'BetriebsNr.', bnr, active) else: invalid(mgnr, 'BetriebsNr.', bnr, active) bnr = None ustid_nr: Optional[str] = m['UID'] if ustid_nr is not None: ustid_nr = ustid_nr.replace(' ', '') if len(ustid_nr) == 8 and ustid_nr.isdigit(): ustid_nr = 'ATU' + ustid_nr elif not USTID_NR_RE.fullmatch(ustid_nr): invalid(mgnr, 'UID', ustid_nr, active) ustid_nr = None if ustid_nr and not check_ustid_nr_at(ustid_nr): if ustid_nr == 'ATU11111111': warning(mgnr, 'UID', ustid_nr, active) else: invalid(mgnr, 'UID', ustid_nr, active) ustid_nr = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] kto_nr: Optional[str] = m['KontoNr'] if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban, active) iban = None if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' elif bic == 'RLNWATWMIB': bic = 'RLNWATWWMIB' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic, active) bic = None if bic is not None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] plz = int(m['PLZ']) if m['PLZ'] else None ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] parts = ort.split(' ') if ort else [''] if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()): if len(parts) > 1 and parts[-2].isdigit(): ort = ' '.join(parts[:-2]) new_address = parts[-2] + parts[-1] else: ort = ' '.join(parts[:-1]) new_address = parts[-1] if address is not None and address != ' ' and address != new_address: print(address, new_address) raise RuntimeError() address = parts[-1] if CLIENT == WG.WINZERKELLER and ort == 'JETZELDORF': ort = 'JETZELSDORF' if ort: ort = ort.upper().strip() if address is not None: address_old = address address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), utils.remove_spaces(address).title()) if address.startswith('Haus Nr.') or \ address.startswith('Nr. ') or \ address.startswith('Nr ') or \ (len(address) > 0 and address[0].isdigit()): address = ort.title() + ' ' + address.split(' ')[-1] address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\ .replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\ .replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\ .replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß') address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address) if address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address) address = utils.remove_spaces(address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) email: Optional[str] = m['EMail'] emails = [] if email is not None: for email in email.split(' + '): if email.isupper(): email = email.lower() if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail'], active) else: parts = email.split('@') emails.append(f'{parts[0]}@{parts[1].lower()}') zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] if CLIENT == WG.WINZERKELLER and plz == 1228: plz = 1020 postal_dest = lookup_plz(plz, ort, address) #if mgnr in fbs: # gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020} # if len(gems) == 1: # print(GEM_MAP[list(gems)[0]]) okz = postal_dest % 100000 if postal_dest else None kgnr = lookup_kgnr(okz) active = m['Aktives Mitglied'] or False if kgnr is None: invalid(mgnr, 'KGNr.', ort, active) elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: glnr = guess_glnr(kgnr) if glnr: new('KG', kgnr, lookup_kg_name(kgnr), f'GL {glnr}') f_kg.row(kgnr, glnr) if 9999 not in GEM_MAP: GEM_MAP[9999] = [] GEM_MAP[9999].append((kgnr, 0)) else: kgnr = None if postal_dest is None: invalid(mgnr, 'PLZ', None, active) continue pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None f_m.row( mgnr, pred, prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid_nr, m['Volllieferant'] or False, m['Buchführend'] or False, False, funktionaer, active, deceased, iban, bic, AUSTRIA, postal_dest, address or '-', kgnr, m['Anmerkung'] ) phone_1: Optional[str] = m['Telefon'] phone_2: Optional[str] = m['Telefax'] phone_3: Optional[str] = m['Mobiltelefon'] numbers = [] if CLIENT == WG.WINZERKELLER: # Telefax (phone_2) not used numbers = {} def add_number(nr: str, fax: bool = False, comment: str = None, fax_only: bool = False) -> None: mob = nr[4] == '6' numbers[nr] = {'mobile': mob, 'landline': not mob and not fax_only, 'fax': fax, 'comment': None} if phone_1: phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\ .replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ') phone_1 = utils.remove_spaces(phone_1) fax = False if phone_1.endswith(' u. fax'): fax = True phone_1 = ' '.join(phone_1.split(' ')[:-2]) if phone_1.replace(' ', '').replace('/', '').replace('-', '').isdigit() and len(phone_1) <= 20: if phone_1[0] != '0' and '/' in phone_1: for nr in phone_1.split('/'): add_number(normalize_phone_nr(nr, ort), fax) else: add_number(normalize_phone_nr(phone_1, ort), fax) elif re.fullmatch(r'0[0-9/ -]+ od\. 0[0-9/ -]+', phone_1): parts = phone_1.split(' od. ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[1], ort), fax) elif re.fullmatch(r'0[0-9/ -]+ od\. [1-9][0-9/ -]+', phone_1): parts = phone_1.split(' od. ') add_number(normalize_phone_nr(parts[0], ort), False) if parts[0][1] == '6': add_number(normalize_phone_nr(parts[1], ort), fax) else: add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), fax) elif re.fullmatch(r'0[0-9/ -]+ fax 0[0-9/ -]+', phone_1): parts = phone_1.split(' fax ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[1], ort), True, fax_only=True) elif re.fullmatch(r'0[0-9/ -]+ fax [1-9][0-9/ -]+', phone_1): parts = phone_1.split(' fax ') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), True, fax_only=True) elif '-' in phone_1 and phone_1.endswith('fax'): nr = re.sub(r'-+ ', '-', phone_1) nr = ' '.join(nr.split(' ')[:-1]) add_number(normalize_phone_nr(nr.split('-')[0], ort), False) add_number(normalize_phone_nr(nr, ort), True, fax_only=True) elif 'fax -' in phone_1: parts = phone_1.split('fax') add_number(normalize_phone_nr(parts[0], ort), False) add_number(normalize_phone_nr(parts[0].strip() + parts[1].strip(), ort), True, fax_only=True) elif phone_1.endswith('fax'): nr = phone_1[:-3].strip() add_number(normalize_phone_nr(nr), False) add_number(normalize_phone_nr(nr), True, fax_only=True) elif re.fullmatch(r'0[0-9/ -]+ u\. fax (od\. |u\. )?[0-9/ -]+', phone_1): parts = phone_1.split(' ') add_number(normalize_phone_nr(parts[0], ort), True) nr = parts[-1] if nr[0] == '0': add_number(normalize_phone_nr(nr, ort)) else: add_number(normalize_phone_nr(parts[0][:5] + nr, ort)) else: parts = phone_1.split(' ') if parts[-1].isalpha(): add_number(normalize_phone_nr(parts[0], ort), comment=parts[-1]) else: for nr in parts: add_number(normalize_phone_nr(nr, ort), fax) if phone_3: for nr in phone_3.split(','): nr = nr.strip() parts = nr.split(' ') comment = None if parts[-1].startswith('(') and parts[-1].endswith(')'): nr = nr[:nr.rindex(' ')].strip() comment = parts[-1][1:-1].strip() elif parts[-1].isalpha(): nr = nr[:nr.rindex(' ')].strip() comment = parts[-1].strip() add_number(normalize_phone_nr(nr, ort), comment=comment) count = 0 for nr, data in numbers.items(): if data['mobile']: count += 1 f_tel.row(mgnr, count, 'mobile', nr, data['comment']) if data['landline']: count += 1 f_tel.row(mgnr, count, 'landline', nr, data['comment']) if data['fax']: count += 1 f_tel.row(mgnr, count, 'fax', nr, data['comment']) else: if phone_1: phone_1 = normalize_phone_nr(phone_1) if len(phone_1) <= 10 or phone_1[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Telefon'], active) else: numbers.append(phone_1) if phone_1[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_1, None) else: f_tel.row(mgnr, len(numbers), 'landline', phone_1, None) if phone_2: phone_2 = normalize_phone_nr(phone_2) if len(phone_2) <= 8 or phone_2[0] != '+': invalid(mgnr, 'Fax.Nr.', m['Telefax'], active) else: numbers.append(phone_2) if phone_2[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_2, None) else: f_tel.row(mgnr, len(numbers), 'fax', phone_2, None) if phone_3: phone_3 = normalize_phone_nr(phone_3) if len(phone_3) <= 10 or phone_3[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon'], active) elif phone_3 not in numbers: numbers.append(phone_3) if phone_3[4] == '6': f_tel.row(mgnr, len(numbers), 'mobile', phone_3, None) else: f_tel.row(mgnr, len(numbers), 'landline', phone_3, None) for i, email in enumerate(emails): f_email.row(mgnr, i + 1, email, None) MEMBER_MAP[mgnr] = { 'default_kgnr': kgnr } if billing_name: f_mba.row(mgnr, billing_name, AUSTRIA, postal_dest, address or '-') def migrate_area_commitments(in_dir: str, out_dir: str) -> None: def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]: if nr_str is None: return [] elif nr_str.isdigit() and len(nr_str) <= 6: return [nr_str] elif nr_str.count('/') == 1: parts = nr_str.split('/') if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3: return [parts[0], parts[1]] elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3: return [nr_str] if nr_str.count('/') > 1: parts = nr_str.split('/') if all([p.isdigit() for p in parts]): if all([len(p) <= 1 for p in parts[1:]]): return [f'{parts[0]}/{p}' for p in parts[1:]] elif all([len(p) == len(parts[0]) for p in parts]): return parts if nr_str.startswith(f'{kgnr:05}'): return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr) if nr_str.endswith(' 2000'): return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr) parts = re.split(r' *[,;+&] *', nr_str) if len(parts) == 1: parts = nr_str.split(' / ') if len(parts) == 1 and ' ' not in nr_str: parts = nr_str.split(' ') if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str: parts = nr_str.split(' ') if len(parts) > 1: return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)] m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str) if m is not None: b = m.group(1) f = int(m.group(2)) t = int(m.group(3)) if t < f: t += f - (f % pow(10, len(m.group(3)))) if t - f < 50: return [ gst for counter in range(f, t + 1) for p in [f'{b or ""}{counter}'] for gst in parse_gstnrs(p, kgnr, mgnr) ] invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}', None) return [] def replace_nrs(m: re.Match, sep: str) -> str: end = m.group(0).endswith(sep) parts = [int(p) for p in m.group(0).split(sep)] text = '' last = None for i, p in enumerate(parts): if last is not None: if last + 1 == p: last = p continue else: text += f'{last}{sep}' last = None if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]: last = p text += f'{p}-' else: text += f'{p}{sep}' if last is not None: text += str(last) return text.strip().strip(sep) + (sep if end else '') def format_gstnr(nrs: List[str]) -> Optional[str]: if len(nrs) == 0: return None nrs = [re.sub(r'\b0+', '', nr) for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr) for nr in nrs])] last = None text = '' for nr in nrs: if last is None: text += nr elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]: text += f'+{nr.split("/")[-1]}' else: text += f', {nr}' last = nr text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text) text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text) return text reeds: Dict[int, Dict[int, str]] = {k: {r: n for rk, r, n in REED_MAP.values() if rk == k} for k in set([k for k, _, _ in REED_MAP.values()])} new_reeds: Dict[Tuple[int, int], int] = {} with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \ utils.csv_open(f'{out_dir}/wb_rd.csv', 'a+') as f_rd: f_fb.header('fbnr', 'mgnr', 'vtrgid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr', 'year_from', 'year_to', 'comment') for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None: continue parz: str = fb['Parzellennummer'] fbnr: int = fb['FBNR'] mgnr: int = fb['MGNR'] gem = GEM_MAP[fb['GNR']] kgnrs = [kgnr for kgnr, gkz in gem] rnr = fb['RNR'] rd_kgnr, rdnr, _ = REED_MAP.get(rnr, (None, None, None)) if rnr else (None, None, None) if mgnr not in MEMBER_MAP: continue kgnr = None if rd_kgnr is None: kgnr = kgnrs[0] elif rd_kgnr in kgnrs: kgnr = rd_kgnr elif (kgnrs[0], rnr) in new_reeds: kgnr = kgnrs[0] rdnr = new_reeds[(kgnr, rnr)] else: rname = lookup_rnr_name(rnr) for k in kgnrs: if k not in reeds: continue try: pos = list(reeds[k].values()).index(rname) r = list(reeds[k].keys())[pos] kgnr = k rdnr = r new_reeds[(kgnr, rnr)] = rdnr break except ValueError: continue if kgnr is None: kgnr = kgnrs[0] rdnr = max([r for _, r, _ in REED_MAP.values() if k == kgnr] + [r for (k, _), r in new_reeds.items() if k == kgnr]) + 1 f_rd.row(kgnr, rdnr, rname) new_reeds[(kgnr, rnr)] = rdnr new('Reed', (kgnr, rdnr), rname) area = int(fb['Flaeche']) if CLIENT == WG.MATZEN: gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) else: gstnrs = [] comment, gstnr = None, None if parz is None or parz == '0000': if parz is not None: invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None) gstnrs = [] gstnr = '-' if CLIENT == WG.MATZEN and len(gstnrs) == 0: comment = f'KG {kgnr or 0:05}: {parz}' gstnr = format_gstnr(gstnrs) or gstnr or parz if parz != gstnr.replace('+', '/'): convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr) to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None f_fb.row(fbnr, mgnr, fb['SNR'] + (fb['SANR'] or ''), CULTIVATION_MAP[fb['BANR'] or 1], area, kgnr, gstnr, rdnr, fb['Von'], to, comment) def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: dates = {} fixed = {} last_dates = {} def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None: if lsnr not in fixed: fixed[lsnr] = [] dates[lsnr] = date elif unique: return add(lsnr + '/2', linr, date, unique) fixed[lsnr].append(linr) def get_lsnr(date: datetime.date, lsnr: str) -> str: if date.year < 2000: return date.strftime('%y%m%d00') + lsnr[8:] else: return date.strftime('%Y%m%d') + lsnr[8:] deliveries: List[Tuple[int, str, datetime.date, int, int]] = [ (d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR']) for d in deliveries if d['Lieferscheinnummer'] and not d['Storniert'] ] lsnrs = {d[1] for d in deliveries} for lnr, lsnr, date, zwstid, mgnr in deliveries: if len(lsnr) < 8: continue if lsnr.startswith('22'): lsnr = '20' + lsnr[2:] lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \ else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6])) lsnr_zwstid = lsnr[8] if lsnr_zwstid != zwstid and lsnr_zwstid in BRANCH_MAP.values(): zwstid = lsnr_zwstid if len(lsnr) == 12: if date != lsdate: if date.year == lsdate.year: lsnr_n = get_lsnr(date, lsnr) if lsnr_n not in lsnrs: lsnr = lsnr_n else: warning_delivery(lsnr, mgnr, 'date', date) else: date = datetime.date(lsdate.year, date.month, date.day) if zwstid not in last_dates or not date < last_dates[zwstid]: last_dates[zwstid] = date add(lsnr, lnr, date, unique=True) else: add(lsnr[:12], lnr, date) return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()], key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0]) def migrate_deliveries(in_dir: str, out_dir: str) -> None: global DELIVERY_MAP, MODIFIER_MAP DELIVERY_MAP, MODIFIER_MAP = {}, {} modifiers = { m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung'] and m['Bezeichnung'] != '-' } seasons = {} branches = {} for mod in modifiers.values(): name: str = mod['Bezeichnung'].replace('ausser', 'außer') nr: int = mod['ASNR'] MODIFIER_MAP[name] = mod if CLIENT == WG.MATZEN: mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' elif CLIENT == WG.WINZERKELLER: mod['id'] = { 1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG', 5: 'VT', 6: 'MV', 7: 'UP', 8: 'VL', 9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG', }[nr] else: raise NotImplementedError() deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')) delivery_dict = {d['LINR']: d for d in deliveries} fixed = fix_deliveries(deliveries) updated_varieties = {} with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \ utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part: f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment') f_part.header('year', 'did', 'dpnr', 'sortid', 'attrid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr', 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen', 'gebunden', 'temperature', 'acid', 'scale_id', 'weighing_id', 'weighing_reason', 'comment') for lsnr, linrs, date in fixed: if date.year not in seasons: seasons[date.year] = { 'currency': 'EUR' if date.year >= 2001 else 'ATS', 'precision': WGMASTER_PRECISION, 'start': date, 'end': date, 'nr': 0, } s = seasons[date.year] if date > s['end']: s['end'] = date s['nr'] += 1 snr = s['nr'] mgnr = delivery_dict[linrs[0]]['MGNR'] znr = delivery_dict[linrs[0]]['ZNR'] glob_waage = set(delivery_dict[linr]['Waagentext'] for linr in linrs if delivery_dict[linr]['Waagentext']) zwstid = lsnr[8] if zwstid not in branches: branches[zwstid] = {} if date not in branches[zwstid]: branches[zwstid][date] = 0 branches[zwstid][date] += 1 lnr = branches[zwstid][date] if BRANCH_MAP[znr] != zwstid: if zwstid not in BRANCH_MAP.values(): zwstid = BRANCH_MAP[znr] comments = [] attributes = set() for dpnr, linr in enumerate(linrs, start=1): d = delivery_dict[linr] DELIVERY_MAP[linr] = (date.year, snr, dpnr) if lsnr != d['Lieferscheinnummer']: renumber_delivery(d['Lieferscheinnummer'], lsnr) oe = d['OechsleOriginal'] or d['Oechsle'] kmw = GRADATION_MAP[oe] sortid = d['SNR'].upper() if d['SANR']: attributes.add(d['SANR']) if len(sortid) != 2: attributes.add(sortid[2:]) sortid = sortid[:2] if CLIENT == WG.MATZEN: if sortid == 'HU': # Gr.Veltliner (Huber) sortid = 'GV' attributes.remove('B') attributes.add('HU') elif sortid == 'SV': sortid = 'SW' elif sortid == 'WC': # WEIẞBURGUNDER/CHARDONNAY sortid = 'SW' if 'H' in attributes: attributes.remove('H') attributes.add('HK') if 'W' in attributes: attributes.remove('W') elif CLIENT == WG.WINZERKELLER: if 'F' in attributes: attributes.remove('F') if d['SNR'] != sortid: line = f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{",".join(list(attributes)) or None}' if line not in updated_varieties: updated_varieties[line] = 0 updated_varieties[line] += 1 if d['QSNR'] is None: warning_delivery(lsnr, mgnr, 'qualid', 'UNSET') if d['Oechsle'] >= 86: qualid = 'KAB' else: qualid = QUAL_MAP[d['QSNR']] if qualid != 'WEI' and d['Abgewertet']: if qualid == 'RSW': qualid = 'WEI' else: warning_delivery(lsnr, mgnr, 'qualid', f'{qualid} (abgewertet)') qualid = 'WEI' kgnr, rdnr = None, None if d['GNR']: gem = GEM_MAP.get(d['GNR'], []) if len(gem) == 1: kgnr = gem[0][0] if d['RNR']: kgnr, rdnr, _ = REED_MAP[d['RNR']] if kgnr is None: m = MEMBER_MAP[mgnr] kgnr = m['default_kgnr'] if kgnr is None: pass elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: warning_delivery(lsnr, mgnr, 'KGNr.', kgnr) kgnr = None hkid = lookup_hkid(kgnr, qualid) handwiegung = d['Handwiegung'] or False waage = list(glob_waage)[0] if len(glob_waage) == 1 else d['Waagentext'] scale_id, weighing_id = None, None if waage: # Waagenr: 1 ID: 19 # Waagennummer: 1 Speichernummer: 9166 waage = re.split(r' +', waage) scale_id = waage[1] weighing_id = waage[3] if waage[2] == 'Speichernummer:' else f'{date}/{waage[3]}' elif len(glob_waage) == 0 and not handwiegung: handwiegung = True comment: Optional[str] = d['Anmerkung'] acid = d['Säure'] hand, lesewagen = None, None if comment: comment = comment.replace('Söure', 'Säure') if comment.startswith('Säure'): acid = float(comment.split(' ')[-1].replace(',', '.')) comment = None elif comment in ('Maschine', 'Masschine'): hand = False comment = None elif comment == 'Hand': hand = True comment = None elif comment == '.': comment = None if comment: comments.append(comment) gerebelt = True if CLIENT == WG.MATZEN or (CLIENT == WG.WINZERKELLER and zwstid == 'W') else d['Gerebelt'] or False gebunden = None if CLIENT in (WG.MATZEN, WG.WINZERKELLER) else d['Gebunden'] if len(attributes) > 1: print("ERROR: ", attributes) attrid = attributes.pop() if len(attributes) == 1 else None f_part.row( date.year, snr, dpnr, sortid, attrid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr, gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False, hand, lesewagen, gebunden, d['Temperatur'], acid, scale_id, weighing_id, None, comment ) f_delivery.row(date.year, snr, date, d['Uhrzeit'], zwstid, lnr, lsnr, mgnr, '; '.join(comments) or None) for k, v in updated_varieties.items(): print(k + (f' ({v} times)' if v > 1 else '')) with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod: f_part_mod.header('year', 'did', 'dpnr', 'modid') for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): if m['LINR'] not in DELIVERY_MAP or m['ASNR'] not in modifiers: continue y, did, dpnr = DELIVERY_MAP[m['LINR']] f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id']) with utils.csv_open(f'{out_dir}/season.csv') as f_season, \ utils.csv_open(f'{out_dir}/modifier.csv') as f_mod: f_season.header('year', 'currency', 'precision', 'max_kg_per_ha', 'vat_normal', 'vat_flatrate', 'min_kg_per_bs', 'max_kg_per_bs', 'penalty_per_kg', 'penalty_amount', 'penalty_none', 'start_date', 'end_date') f_mod.header('year', 'modid', 'ordering', 'name', 'abs', 'rel', 'standard', 'quick_select') for y, s in seasons.items(): f_season.row(y, s['currency'], s['precision'], 10_000, 0.10, 0.13, PARAMETERS['LIEFERPFLICHT/GA1'], PARAMETERS['LIEFERRECHT/GA1'], None, None, None, s['start'], s['end']) for m in modifiers.values(): abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None rel_v = m['AZASProzent'] / 100.0 if m['AZASProzent'] is not None else None f_mod.row(y, m['id'], m['ASNR'], m['Bezeichnung'], abs_v, rel_v, m.get('Standard', False), m['Schnellauswahl']) def migrate_payments(in_dir: str, out_dir: str) -> None: variant_map: Dict[int, Tuple[int, int]] = {} variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {} year_map = {} az_map = {} p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv')) sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])} p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv')) qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])} def collapse_data(data: dict[str, Any]): rev = {} for k, v in data.items(): if k == 'default' or k.startswith('/'): continue rev[v] = rev.get(v, []) rev[v].append(k) if 'default' not in data.keys(): if len(rev) == 1: return set(rev.keys()).pop() for v, ks in rev.items(): if len(ks) >= len(data) / 2: for k in ks: del data[k] data['default'] = v return collapse_data(data) for idx1 in {'/' + k.split('/')[1] for k in data.keys() if '/' in k and len(k) > 3}: len1 = len(list(k for k, _ in data.items() if k.endswith(idx1))) for v, ks in rev.items(): my_ks = list(k for k in ks if k.endswith(idx1)) if len(my_ks) > 1 and len(my_ks) >= len1 / 2: for k in my_ks: del data[k] data[idx1] = v return data def collapse_curve(curve: list[float]): n = {} d = 0 for oe, p0, p1, p2 in zip(range(0, len(curve) + 1), [0] + curve, curve, curve[1:] + [curve[len(curve) - 1]]): d1, d2 = round(p1 - p0, WGMASTER_PRECISION), round(p2 - p1, WGMASTER_PRECISION) if d1 == d: continue d = d2 if p0 > 0: n[f'{oe - 1}oe'] = p0 n[f'{oe}oe'] = p1 if curve[len(curve) - 1] > 0: n[f'{len(curve) - 1}oe'] = curve[len(curve) - 1] keys = list(n.keys()) vals = list(n.values()) while len(n) >= 2 and vals[0] == vals[1]: del n[keys[0]] del n[keys[1]] n = {keys[1]: vals[1], **n} keys = list(n.keys()) vals = list(n.values()) while len(n) >= 2 and vals[len(vals) - 1] == vals[len(vals) - 2]: del n[keys[len(vals) - 1]] del n[keys[len(vals) - 2]] n = {**n, keys[len(vals) - 2]: vals[len(vals) - 2]} keys = list(n.keys()) vals = list(n.values()) if len(n) == 0: n = {'73oe': 0} elif len(n) == 1: n = {'73oe': list(n.values())[0]} return n with (utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment): f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time', 'comment', 'data') for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'): year = p['Lesejahr'] if year is None: continue if year not in year_map: year_map[year] = 0 year_map[year] += 1 variant_map[p['AZNR']] = (year, year_map[year]) var = p.copy() del var['AZNR'] del var['Datum'] del var['Beschreibung'] del var['Lesejahr'] del var['Titel'] del var['TeilzahlungNr'] data = { 'mode': 'wgmaster', **var, 'AuszahlungSorten': {}, 'AuszahlungSortenQualitätsstufe': {}, } azs = data['AuszahlungSorten'] for s in sort_map.get(p['AZNR'], []): del s['AZNR'] del s['ID'] if s['Oechsle'] is None: continue key = f'{s["SNR"].upper()}/{s["SANR"] or ""}' azs[key] = azs.get(key, {'Gebunden': {}, 'NichtGebunden': {}}) azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = s['Betrag'] curves = [] curve_zero = False for key, d1 in azs.items(): oe = [d1['NichtGebunden'].get(n, 0.0) for n in range(max(d1['NichtGebunden'].keys()) + 1)] if len(d1['Gebunden']) > 0: oe_geb = [d1['Gebunden'].get(n, 0.0) for n in range(max(d1['Gebunden'].keys()) + 1)] else: oe_geb = None if len(set(oe)) <= 2 and oe[0] == 0 and \ (oe_geb is None or (len(set(oe_geb)) <= 2 and oe[len(oe) - 1] == oe_geb[len(oe_geb) - 1])): azs[key] = oe[len(oe) - 1] if azs[key] == 0: azs[key] = 'curve:0' curve_zero = True else: c = (oe, oe_geb) if c not in curves: curves.append(c) azs[key] = f'curve:{curves.index(c) + 1}' data['AuszahlungSorten'] = collapse_data(azs) for i, cs in enumerate(curves): c, c_geb = cs geb = None if c_geb is not None: diff = {round(b - a, WGMASTER_PRECISION) for a, b in zip(c, c_geb)} diff.remove(0.0) if len(diff) == 1: geb = diff.pop() elif len(diff) > 1: geb = collapse_curve(c_geb) curves[i] = { 'id': i + 1, 'mode': 'oe', 'data': collapse_curve(c), } if geb is not None: curves[i]['geb'] = geb if curve_zero: curves.insert(0, { 'id': 0, 'mode': 'oe', 'data': data['Grundbetrag'], 'geb': data['GBZS'] or 0 }) data['Kurven'] = curves azq = data['AuszahlungSortenQualitätsstufe'] for q in qual_map.get(p['AZNR'], []): del q['AZNR'] del q['ID'] qualid = QUAL_MAP[q['QSNR']] key = f'{q["SNR"].upper()}/{q["SANR"] or ""}' azq[qualid] = azq.get(qualid, {}) azq[qualid][key] = q['Betrag'] for qualid, d1 in azq.items(): azq[qualid] = collapse_data(d1) for data_k, data_v in data.copy().items(): if data_v is None or isinstance(data_v, bool) and not data_v: del data[data_k] az_map[p['AZNR']] = data test = (p['TeilzahlungNr'] == 7) if not test: if year not in variant_year_map: variant_year_map[year] = [] variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr'])) f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None, p['Beschreibung'], json.dumps(data)) with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay, \ utils.csv_open(f'{out_dir}/delivery_part_bucket.csv') as f_bucket: f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'net_amount') f_bucket.header('year', 'did', 'dpnr', 'bktnr', 'discr', 'value') deliveries = {d['LINR']: d for d in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')} for linr, (y, did, dpnr) in DELIVERY_MAP.items(): p = deliveries[linr] if y not in variant_year_map: continue gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden']) b1 = gew - geb_gew b2 = geb_gew f_bucket.row(y, did, dpnr, 0, '_', b1) f_bucket.row(y, did, dpnr, 1, p['SANR'] or '', b2) for aznr, avnr, tznr in variant_year_map[y]: val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung'] val = round(val * pow(10, WGMASTER_PRECISION)) f_del_pay.row(y, did, dpnr, avnr, val) def migrate_parameters(in_dir: str, out_dir: str) -> None: global PARAMETERS PARAMETERS = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')} name = PARAMETERS['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und').replace(' Im ', ' im ') suffix = PARAMETERS['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '') types = { 'reggenmbh': 'reg. Gen.m.b.H.' } tokens = { WG.MATZEN: ('WGM', 'WG Matzen'), WG.WINZERKELLER: ('WKW', 'Winzerkeller') }.get(CLIENT, (None, None)) ort = PARAMETERS['MANDANTENORT'].title() new_params: Dict[str, Optional[str]] = { 'CLIENT_NAME_TOKEN': tokens[0], 'CLIENT_NAME_SHORT': tokens[1], 'CLIENT_NAME': name, 'CLIENT_NAME_SUFFIX': None, 'CLIENT_NAME_TYPE': types[suffix], 'CLIENT_PLZ': PARAMETERS['MANDANTENPLZ'], 'CLIENT_ORT': ort, 'CLIENT_ADDRESS': PARAMETERS['MANDANTENSTRASSE'], 'CLIENT_IBAN': None, 'CLIENT_BIC': None, 'CLIENT_USTIDNR': PARAMETERS['MANDANTENUID'].replace(' ', ''), 'CLIENT_LFBISNR': PARAMETERS['MANDANTENBETRIEBSNUMMER'], 'CLIENT_PHONE': normalize_phone_nr(PARAMETERS['MANDANTENTELEFON'], ort), 'CLIENT_FAX': normalize_phone_nr(PARAMETERS['MANDANTENTELEFAX'], ort), 'CLIENT_EMAIL': PARAMETERS['MANDANTENEMAIL'], 'CLIENT_WEBSITE': PARAMETERS.get('MANDANTENHOMEPAGE', None), 'DOCUMENT_SENDER': PARAMETERS.get('ABSENDERTEXT2', None), 'TEXT_DELIVERYNOTE': PARAMETERS.get('LIEFERSCHEINTEXT', None).replace(' daß ', ' dass ').replace('obige Angaben maßgeblicher Veränderungen', 'maßgeblichen Veränderungen obiger Angaben'), 'TEXT_DELIVERYCONFIRMATION': PARAMETERS.get('ANLIEFTEXT', None), } with utils.csv_open(f'{out_dir}/client_parameter.csv') as f: f.header('param', 'value') for param, value in new_params.items(): f.row(param, value) def main() -> None: global DB_CNX, QUIET, CLIENT parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, help='The input directory where the exported csv files are stored') parser.add_argument('out_dir', type=str, help='The output directory where the migrated csv file should be stored') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Be less verbose') parser.add_argument('-d', '--database', metavar='DB', required=True, help='The sqlite database file to look up information') parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str, choices=[wg.name for wg in WG]) args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) QUIET = args.quiet CLIENT = WG.from_str(args.genossenschaft) DB_CNX = sqlite3.connect(args.database) migrate_parameters(args.in_dir, args.out_dir) migrate_gradation(args.in_dir, args.out_dir) migrate_branches(args.in_dir, args.out_dir) migrate_grosslagen(args.in_dir, args.out_dir) migrate_gemeinden(args.in_dir, args.out_dir) migrate_reeds(args.in_dir, args.out_dir) migrate_attributes(args.in_dir, args.out_dir) migrate_cultivations(args.in_dir, args.out_dir) migrate_area_commitment_types(args.in_dir, args.out_dir) migrate_members(args.in_dir, args.out_dir) migrate_area_commitments(args.in_dir, args.out_dir) migrate_deliveries(args.in_dir, args.out_dir) migrate_payments(args.in_dir, args.out_dir) DB_CNX.close() if __name__ == '__main__': main()