From 12100e977e79aebfe8a37582d39cc474fadf0add Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Sat, 27 May 2023 23:39:57 +0200 Subject: [PATCH] Import gwk --- wgmaster/migrate.py | 206 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 172 insertions(+), 34 deletions(-) diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index 5a21b8e..2443a7f 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -20,7 +20,7 @@ WG: Optional[str] = None USTID_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') -EMAIL_RE = re.compile(r'[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}') +EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}') GRADATION_MAP: Optional[Dict[float, float]] = None CULTIVATION_MAP: Optional[Dict[int, str]] = None @@ -39,6 +39,7 @@ QUAL_MAP: Dict[int, str] = { 5: 'SPL', } +# TODO GWK streetnames STREET_NAMES: Dict[str, str] = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', @@ -155,7 +156,7 @@ def check_iban(iban: str) -> bool: def normalize_phone_nr(nr: Optional[str]) -> Optional[str]: if nr is None: return None - nr = nr.replace('/', ' ') + nr = nr.replace('/', ' ').strip() if nr[0] == '0': nr = '+43 ' + nr[1:] if nr.startswith('+43'): @@ -188,22 +189,42 @@ def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]] def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None + ort = ort.replace('0', 'O').replace('SZ', 'SS') + if ort.upper() == 'PILLICHSDORF' and plz == 2212: + plz = 2211 + elif ort.upper() == 'ENZERSFELD' and plz == 2203: + plz = 2202 + elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212: + ort = 'GROSSENGERSDORF' + elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123: + plz = 2122 + elif ort.upper() == 'FRAUENDORF' and plz == 3710: + plz = 3714 + elif ort.upper() == 'MAISSAU' and plz == 3721: + ort = 'UNTERDÜRNBACH' + elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074: + plz = 2070 + elif ort.upper() == 'DROSENDORF' and plz == 2095: + ort = 'DROSENDORF ALTSTADT' + elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033: + plz = 2023 cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() - ort_m = ort.lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') + ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] + if len(rows_m) > 1: + rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] - parts = address.split(' ') - street = parts[:-1] - nr = int(parts[-1].split('-')[0]) - if ort == 'VELM-GÖTZENDORF': + parts = address.split(' ') + street = parts[:-1] + nr = int(parts[-1].split('-')[0]) if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): @@ -231,23 +252,90 @@ def lookup_kgnr(okz: Optional[int]) -> Optional[int]: def lookup_gem_name(name: str) -> List[Tuple[int, int]]: - if name.lower() == 'dörfles': - return [(6004, 30860)] - elif name.lower() == 'velm-götzendorf': - return [(6027, 30859), (6007, 30859)] - elif name.lower() == 'grub': - name = 'Grub an der March' + gem_name, hkid = None, None + if WG == 'MATZEN': + hkid = "'WLWV'" + if name.lower() == 'dörfles': + gem_name = 'Weikendorf' + elif name.lower() == 'velm-götzendorf': + return [(6027, 30859), (6007, 30859)] + elif name.lower() == 'grub': + name = 'Grub an der March' + elif WG == 'GWK': + hkid = "'WLWV', 'WIEN', 'WLWG', 'WLWA'" + if name.endswith('*'): + # TODO do something with * + name = name[:-1].strip() + if name.lower() == 'kreuttal': + return [(15206, 31627), (15221, 31627), (15226, 31627)] + elif name.lower() == 'hochleithen': + return [(15219, 31622), (15223, 31622), (15202, 31622)] + elif name.lower() == 'wolfpassing': + gem_name = 'Hochleithen' + elif name.lower() == 'seebarn': + gem_name = 'Harmannsdorf' + elif name.lower() == 'königsbrunn': + gem_name = 'Enzersfeld im Weinviertel' + elif name.lower() == 'wien': + return [(1616, 90001), (1617, 90001)] + elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'): + gem_name = 'Sitzendorf an der Schmida' + elif name.lower() == 'dietersdorf': + gem_name = 'Hollabrunn' + elif name.lower() == 'altenmarkt': + name = 'Altenmarkt im Thale' + elif name.lower() == 'eitzerstal': + name = 'Eitzersthal' + elif name.lower() == 'gross': + gem_name = 'Hollabrunn' + elif name.lower() == 'auggenthal': + name = 'Augenthal' + elif name.lower() == 'karlsdorf': + name = 'Pfaffendorf' + elif name.lower() == 'kleinhaugsdorf': + name = 'Augenthal' + elif name.lower() == 'merkersdorf': + gem_name = 'Hardegg' + elif name.lower() == 'retz': + name = 'Retz Altstadt' + elif name.lower() == 'heldenberg': + return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)] + elif name.lower() == 'retzbach': + return [(18129, 31038), (18112, 31038), (18117, 31038)] + elif name.lower() == 'dietmannsdorf': + gem_name = 'Zellerndorf' + elif name.lower() == 'sierndorf': + gem_name = 'Sierndorf' + elif name.lower() == 'waltersdorf': + gem_name = 'Staatz' + elif name.lower() == 'viendorf': + name = 'Viendorf Weingebirge' + elif name.lower() == 'stoitzendorf': + return [(10137, 31105)] + elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'): + name = name.replace(' ', '') + elif name.lower() == 'drosendorf': + name = 'Drosendorf Stadt' + elif name.lower() == 'etzmannsdorf': + name = 'Etzmannsdorf bei Straning' + elif name.lower() == 'roggendorf': + gem_name = 'Röschitz' + elif name.lower() == 'wilhelmsdorf': + gem_name = 'Poysdorf' cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " "FROM AT_kg k " "JOIN AT_gem g ON g.gkz = k.gkz " "JOIN wb_gem wg ON wg.gkz = g.gkz " - "WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid = 'WLWV'", - (name.replace('Gr.', 'Groß ').replace('Groß ', 'Groß').replace('-', ''),)) + f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})", + (name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ') + .replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() cur.close() + if gem_name: + rows = [row for row in rows if row[3] == gem_name] if len(rows) == 1: return [(k, g) for k, _, g, _ in rows] @@ -264,6 +352,7 @@ def lookup_kg_name(kgnr: int) -> str: def lookup_hkid(kgnr: Optional[int], qualid: str) -> str: + hkid = None if qualid in ('WEI', 'RSW'): return 'OEST' elif kgnr is None: @@ -325,12 +414,16 @@ def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP GEM_MAP = {} + inserted = set() with utils.csv_open(f'{out_dir}/wb_kg.csv') as f: f.header('kgnr', 'glnr') for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: + if kgnr in inserted: + continue + inserted.add(kgnr) f.row(kgnr, GROSSLAGE_MAP[g['GLNR']]) @@ -345,10 +438,14 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None: if name.isupper(): name = name.title() - gem = GEM_MAP[r['GNR']] - kgnr = gem[0][0] - if len(gem) != 1: - print(gem, name, '->', gem[0]) + try: + gem = GEM_MAP[r['GNR']] + kgnr = gem[0][0] + if len(gem) != 1: + print(gem, name, '->', gem[0]) + except KeyError: + print(f'Invalid GNR {r["GNR"]} for reed {name}') + continue rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr) @@ -359,7 +456,9 @@ def migrate_attributes(in_dir: str, out_dir: str) -> None: with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f: f.header('attrid', 'name', 'kg_per_ha') for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'): - f.row(a['SANR'], a['Attribut'], int(a['KgProHa'])) + if a['SANR'] is None: + continue + f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None) if WG == 'MATZEN': f.row('M', 'Matzen', 10000) f.row('HU', 'Huber', 10000) @@ -404,6 +503,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') for m in members: + # TODO handle * in GWK mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] @@ -484,8 +584,10 @@ def migrate_members(in_dir: str, out_dir: str) -> None: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr + elif bnr.endswith(' inaktiv'): + bnr = bnr.split(' ')[0] if not check_lfbis_nr(bnr): - if bnr == '1234567': + if bnr in ('0', '1234567'): warning(mgnr, 'BetriebsNr.', bnr) else: invalid(mgnr, 'BetriebsNr.', bnr) @@ -524,6 +626,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' + elif bic == 'RLNWATWMIB': + bic = 'RLNWATWWMIB' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic) bic = None @@ -531,11 +635,28 @@ def migrate_members(in_dir: str, out_dir: str) -> None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] + plz = int(m['PLZ']) if m['PLZ'] else None ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] + + parts = ort.split(' ') if ort else [''] + if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()): + if len(parts) > 1 and parts[-2].isdigit(): + ort = ' '.join(parts[:-2]) + new_address = parts[-2] + parts[-1] + else: + ort = ' '.join(parts[:-1]) + new_address = parts[-1] + if address is not None and address != ' ' and address != new_address: + print(address, new_address) + raise RuntimeError() + address = parts[-1] + if WG == 'GWK' and ort == 'JETZELDORF': + ort = 'JETZELSDORF' + if address is not None: address_old = address - address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), + address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), re.sub(r'\s+', ' ', address).strip().title()) address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße')\ @@ -561,9 +682,14 @@ def migrate_members(in_dir: str, out_dir: str) -> None: if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail']) email = None + else: + parts = email.split('@') + email = f'{parts[0]}@{parts[1].lower()}' zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] - postal_dest = lookup_plz(int(m['PLZ']) if m['PLZ'] else None, m['Ort'], address) + if WG == 'GWK' and plz == 1228: + plz = 1020 + postal_dest = lookup_plz(plz, ort, address) #if mgnr in fbs: # gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020} @@ -587,6 +713,10 @@ def migrate_members(in_dir: str, out_dir: str) -> None: invalid(mgnr, 'PLZ', None) continue + if active and kgnr is None: + print(m) + raise RuntimeError('No default KgNr. set') + pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None f_m.row( mgnr, pred, prefix, given_name, middle_names, family_name, suffix, @@ -739,7 +869,7 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: f_attr.header('fbnr', 'attrid') for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): - if fb['Von'] is None and fb['Bis'] is None: + if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None: continue parz: str = fb['Parzellennummer'] fbnr: int = fb['FBNR'] @@ -750,7 +880,11 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: continue area = int(fb['Flaeche']) - gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) + if WG == 'MATZEN': + gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) + else: + # TODO GstNrs GWK + gstnrs = [] comment, gstnr = None, None if parz is None or parz == '0000': invalid(mgnr, 'GstNr.', f'{kgnr or 0:05}-{parz}') @@ -762,9 +896,9 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: if parz != gstnr.replace('+', '/'): convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr) - rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None + rdnr = REED_MAP.get(fb['RNR'], (None, None))[1] if fb['RNR'] else None to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None - f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area, + f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR'] or 1], area, kgnr, gstnr, rdnr, fb['Von'], to, comment) if fb['SANR']: f_attr.row(fbnr, fb['SANR']) @@ -798,6 +932,8 @@ def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, lsnrs = {d[1] for d in deliveries} for lnr, lsnr, date, zwstnr, mgnr in deliveries: + if len(lsnr) < 8: + continue lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \ else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6])) @@ -830,10 +966,11 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: for mod in modifiers.values(): name: str = mod['Bezeichnung'] - if WG is None: - mod['id'] = str(mod['ASNR']) - elif WG == 'MATZEN': + nr: int = mod['ASNR'] + if WG == 'MATZEN': mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' + elif WG == 'GWK': + mod['id'] = str(nr) else: raise NotImplementedError() @@ -912,7 +1049,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: qualid = QUAL_MAP[d['QSNR']] kgnr, rdnr = None, None if d['GNR']: - gem = GEM_MAP[d['GNR']] + gem = GEM_MAP.get(d['GNR'], []) if len(gem) == 1: kgnr = gem[0][0] if d['RNR']: @@ -978,7 +1115,8 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: f_season.row(y, s['currency'], s['precision'], s['start'], s['end']) for m in modifiers.values(): abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None - f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl']) + f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], + m.get('Standard', None), m['Schnellauswahl']) def migrate_payments(in_dir: str, out_dir: str) -> None: @@ -1003,7 +1141,7 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None: 'PLZ': params['MANDANTENPLZ'], 'ORT': params['MANDANTENORT'], 'ADDRESS': params['MANDANTENSTRASSE'], - 'DOCUMENT_SENDER': params['ABSENDERTEXT2'], + 'DOCUMENT_SENDER': params.get('ABSENDERTEXT2', None), 'IBAN': None, 'BIC': None, 'USTID': params['MANDANTENUID'].replace(' ', ''), @@ -1011,7 +1149,7 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None: 'PHONE': params['MANDANTENTELEFON'], 'FAX': params['MANDANTENTELEFAX'], 'EMAIL': params['MANDANTENEMAIL'], - 'WEBSITE': params['MANDANTENHOMEPAGE'], + 'WEBSITE': params.get('MANDANTENHOMEPAGE', None), } with utils.csv_open(f'{out_dir}/client_parameter.csv') as f: