#!/bin/env python3 # -*- coding: utf-8 -*- from typing import Dict, Any, Tuple, Optional, List, Iterable import argparse import os import re import sys import sqlite3 import requests import datetime import csv DB_CNX: Optional[sqlite3.Connection] = None HKID: Optional[str] = None USTID_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') EMAIL_RE = re.compile(r'[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}') GRADATION_MAP: Optional[Dict[float, float]] = None CULTIVATION_MAP: Optional[Dict[int, str]] = None BRANCH_MAP: Optional[Dict[int, str]] = None GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None REED_MAP: Optional[Dict[int, Tuple[int, int]]] = None GROSSLAGE_MAP: Optional[Dict[int, int]] = None MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None QUAL_MAP: Dict[int, str] = { 0: 'WEI', 1: 'RSW', 2: 'LDW', 3: 'QUW', 4: 'KAB', 5: 'SPL', } STREET_NAMES: Dict[str, str] = { 'Hans-Wagnerstraße': 'Hans-Wagner-Straße', 'J.Seitzstraße': 'Josef-Seitz-Straße', 'Kurhaus-Str.': 'Kurhausstraße', 'Kurhaus-Straße': 'Kurhausstraße', 'Pirawartherstraße': 'Pirawarther Straße', 'Raggendorferstraße': 'Raggendorfer Straße', 'Matznerstraße': 'Matzner Straße', 'Stillfriederstraße': 'Stillfrieder Straße', 'Harraserstraße': 'Harraser Straße', 'Gänserndorferstraße': 'Gänserdorfer Straße', 'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße', 'Sulzerstraße': 'Sulzer Straße', 'Brünnerstraße': 'Brünner Straße', 'Flustraße': 'Flurstraße', 'Wienerstraße': 'Wiener Straße', 'St.Laurentstraße': 'St.-Laurentstraße', 'Angernerstraße': 'Angerner Straße', 'Schweinbartherstraße': 'Schweinbarther Straße', 'Hohenruppersdorferstraße': 'Hohenruppersdorfer Straße', 'Gruberhauptstraße': 'Gruber Hauptstraße', 'Josef Seitzstraße': 'Josef-Seitz-Straße', 'Auersthalerstraße': 'Auerstahler Straße', 'Ollersdorferstraße': 'Ollersdorfer Straße', 'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße', 'Spannbergerstraße': 'Spannberger Straße', 'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße', 'R. Virchow-Straße': 'Rudolf-Virchow-Straße', 'Ebenthalerstraße': 'Ebenthaler Straße', 'Bockfließerstraße': 'Bockfließer Straße', 'Dörfleserstraße': 'Dörfleser Straße', 'Dörflesserstraße': 'Dörfleser Straße', 'Grubere Hauptstraße': 'Gruber Hauptstraße', 'Groß Inzersdorf': 'Großinzersdorf', } def success(mgnr: int, key: str, value) -> None: if not args.quiet: print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def warning(mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid(mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) def convert(mgnr: int, key: str, old_value: str, new_value) -> None: if not args.quiet: print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: if not args.quiet: print(f'\x1B[1m{mgnr:>6}: ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr) def check_lfbis_nr(nr: str) -> bool: # https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41 if len(nr) != 7 or not nr.isdigit(): return False s = 0 for i, ch in enumerate(nr[:-1]): s += int(ch) * (7 - i) v = (11 - (s % 11)) % 10 return v == int(nr[-1]) def check_ustid_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): return False s = 0 for i, ch in enumerate(nr[3:-1]): s += sum(map(int, str(int(ch) * (i % 2 + 1)))) v = (96 - s) % 10 return v == int(nr[-1]) def modulo(a: str, b: int) -> int: s = 0 for ch in a: s = (s * 10 + int(ch)) % b return s def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) return modulo(s, 97) == 1 def normalize_phone_nr(nr: str) -> str: nr = re.sub('[ /-]', '', nr) if nr[0] == '0': nr = '+43' + nr[1:] return nr def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/') if r.status_code != 200: return None data = r.json() return sum([n['fl'] for n in data['properties']['nutzungen']]) def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: fbs = csv.parse(f'{in_dir}/TFlaechenbindungen.csv') members = {} for f in fbs: if f['MGNR'] not in members: members[f['MGNR']] = {} members[f['MGNR']][f['FBNR']] = f return members def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]: if plz is None or ort is None: return None cur = DB_CNX.cursor() cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) rows: List[Tuple[int, str, str]] = cur.fetchall() cur.close() ort_m = ort.lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss') rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')] if len(rows_m) == 1: return plz * 100000 + rows_m[0] parts = address.split(' ') street = parts[:-1] nr = int(parts[-1].split('-')[0]) if ort == 'VELM-GÖTZENDORF': if street == 'Landstraße' and nr <= 48 \ or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \ or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)): # Velm return plz * 100000 + 3572 else: # Götzendorf return plz * 100000 + 3571 raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})') def lookup_kgnr(okz: Optional[int]) -> Optional[int]: if okz is None: return None cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr FROM AT_ort o JOIN wb_kg k ON k.kgnr = o.kgnr WHERE okz = ?", (okz,)) rows: List[Tuple[int]] = cur.fetchall() cur.close() if len(rows) == 1: return rows[0][0] return None def lookup_gem_name(name: str) -> List[Tuple[int, int]]: if name.lower() == 'dörfles': return [(6004, 30860)] elif name.lower() == 'velm-götzendorf': return [(6027, 30859), (6007, 30859)] elif name.lower() == 'grub': name = 'Grub an der March' cur = DB_CNX.cursor() cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " "FROM AT_kg k " "JOIN AT_gem g ON g.gkz = k.gkz " "JOIN wb_gem wg ON wg.gkz = g.gkz " "WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid = 'WLWV'", (name.replace('Gr.', 'Groß ').replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() cur.close() if len(rows) == 1: return [(k, g) for k, _, g, _ in rows] print(name, rows) raise RuntimeError() def lookup_kg_name(kgnr: int) -> str: cur = DB_CNX.cursor() cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,)) rows = cur.fetchall() cur.close() return rows[0][0] def migrate_gradation(in_dir: str, out_dir: str) -> None: global GRADATION_MAP GRADATION_MAP = {} for g in csv.parse(f'{in_dir}/TUmrechnung.csv'): GRADATION_MAP[g['Oechsle']] = g['KW'] def migrate_branches(in_dir: str, out_dir: str) -> None: global BRANCH_MAP BRANCH_MAP = {} with open(f'{out_dir}/branch.csv', 'w+') as f: f.write('zwstid;name;country;postal_dest;address;phone_nr\n') for b in csv.parse(f'{in_dir}/TZweigstellen.csv'): BRANCH_MAP[b['ZNR']] = b['Kennbst'] address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) f.write(csv.format_row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon'])) def migrate_grosslagen(in_dir: str, out_dir: str) -> None: global GROSSLAGE_MAP GROSSLAGE_MAP = {} glnr = 0 with open(f'{out_dir}/wb_gl.csv', 'w+') as f: f.write('glnr;name\n') for gl in csv.parse(f'{in_dir}/TGrosslagen.csv'): glnr += 1 GROSSLAGE_MAP[gl['GLNR']] = glnr f.write(csv.format_row(glnr, gl['Bezeichnung'])) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP GEM_MAP = {} with open(f'{out_dir}/wb_kg.csv', 'w+') as f: f.write('kgnr;glnr\n') for g in csv.parse(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: f.write(csv.format_row(kgnr, GROSSLAGE_MAP[g['GLNR']])) def migrate_reeds(in_dir: str, out_dir: str) -> None: global REED_MAP REED_MAP = {} with open(f'{out_dir}/wb_rd.csv', 'w+') as f: f.write('kgnr;rdnr;name\n') for r in csv.parse(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper(): name = name.title() gem = GEM_MAP[r['GNR']] kgnr = gem[0][0] if len(gem) != 1: print(gem, name, '->', gem[0]) rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr) f.write(csv.format_row(kgnr, rdnr, name)) def migrate_attributes(in_dir: str, out_dir: str) -> None: with open(f'{out_dir}/wine_attribute.csv', 'w+') as f: f.write('attrid;name;kg_per_ha\n') for a in csv.parse(f'{in_dir}/TSortenAttribute.csv'): f.write(csv.format_row(a['SANR'], a['Attribut'], int(a['KgProHa']))) def migrate_cultivations(in_dir: str, out_dir: str) -> None: global CULTIVATION_MAP CULTIVATION_MAP = {} with open(f'{out_dir}/wine_cultivation.csv', 'w+') as f: f.write('cultid;name\n') for c in csv.parse(f'{in_dir}/TBewirtschaftungsarten.csv'): name: str = c['Bezeichnung'] cultid = name[0].upper() if name.isupper(): cultid = name elif 'biolog' in name.lower(): cultid = 'BIO' CULTIVATION_MAP[c['BANR']] = cultid f.write(csv.format_row(cultid, name)) def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} members = csv.parse(f'{in_dir}/TMitglieder.csv') fbs = parse_flaechenbindungen(in_dir) with open(f'{out_dir}/member.csv', 'w+') as f_m,\ open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba,\ open(f'{out_dir}/wb_kg.csv', 'a') as f_kg: f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' 'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' 'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' 'country;postal_dest;address;' 'email;phone_landline;phone_mobile_1;phone_mobile_2;' 'default_kgnr;comment\n') f_mba.write('mgnr;name;country;postal_dest;address\n') for m in members: mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] prefix: Optional[str] = None middle_names: Optional[str] = None suffix: Optional[str] = None billing_name: Optional[str] = None if family_name is None and given_name is None: continue given_name = given_name or '' family_name = re.sub(r'\s+', ' ', family_name).strip() given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',') if ' ' in family_name or '.' in family_name or ',' in family_name: if family_name.endswith(' KG'): parts = family_name.split(' ') family_name = parts[0].title() billing_name = f'{family_name} KG' convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): parts = given_name.split(' ') family_name = family_name.title() billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' given_name = parts[0].title() elif given_name.lower().endswith(' gesbr'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} GesbR' elif given_name.endswith(' KeG.'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} KEG' elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \ given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): family_name = family_name.title() parts = given_name.split(' ') suffix = parts[-1].lower() if suffix[-1] != '.': suffix += '.' given_name = parts[0].title() elif ',' in given_name: family_name = family_name.title() parts = given_name.split(',') given_name = parts[0].title() prefix = ' '.join([p.title() for p in parts[1:]]) elif given_name.endswith(' DI'): family_name = family_name.title() given_name = given_name.split(' ')[0].title() prefix = 'Dipl.-Ing.' elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \ given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): family_name = family_name.title() parts = given_name.split(' ') given_name = parts[0].title() prefix = parts[-1].title() if prefix[-1] != '.': prefix += '.' convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) else: family_name = family_name.title() given_name = given_name.title() bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: bnr = bnr.replace('.', '') if len(bnr) == 10: bnr = bnr.removesuffix('000') elif len(bnr) == 6: bnr = '0' + bnr if not check_lfbis_nr(bnr): if bnr == '1234567': warning(mgnr, 'BetriebsNr.', bnr) else: invalid(mgnr, 'BetriebsNr.', bnr) bnr = None ustid: Optional[str] = m['UID'] if ustid is not None: ustid = ustid.replace(' ', '') if len(ustid) == 8 and ustid.isdigit(): ustid = 'ATU' + ustid elif not USTID_RE.fullmatch(ustid): invalid(mgnr, 'UID', ustid) ustid = None if ustid and not check_ustid_at(ustid): if ustid == 'ATU11111111': warning(mgnr, 'UID', ustid) else: invalid(mgnr, 'UID', ustid) ustid = None iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] kto_nr: Optional[str] = m['KontoNr'] if iban is None: pass if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): invalid(mgnr, 'IBAN', iban) iban = None if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': bic = 'RLNWATWWAUE' if not BIC_RE.fullmatch(bic): invalid(mgnr, 'BIC', bic) bic = None if bic is not None: if len(bic) == 11 and bic.endswith('XXX'): bic = bic[:-3] ort: Optional[str] = m['Ort'] address: Optional[str] = m['Straße'] if address is not None: address_old = address address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), re.sub(r'\s+', ' ', address).strip().title()) address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\ .replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\ .replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\ .replace('Haupstraße', 'Hauptstraße') address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address) if address.startswith('Nr. ') or address.startswith('Nr ') or address.isdigit(): address = ort.title() + ' ' + address.split(' ')[-1] elif address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) phone_1: Optional[str] = m['Telefon'] phone_2: Optional[str] = m['Mobiltelefon'] email: Optional[str] = m['EMail'] phone_landline = None phone_mobile = [] if email is not None: if email.isupper(): email = email.lower() if not EMAIL_RE.fullmatch(email): invalid(mgnr, 'E-Mail', m['EMail']) email = None if phone_1: phone_1 = normalize_phone_nr(phone_1) if len(phone_1) <= 8 or phone_1[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Telefon']) else: if phone_1[3] == '6': phone_mobile.append(phone_1) else: phone_landline = phone_1 if phone_2: phone_2 = normalize_phone_nr(phone_2) if len(phone_2) <= 8 or phone_2[0] != '+': invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon']) else: if phone_2[3] == '6': if phone_2 not in phone_mobile: phone_mobile.append(phone_2) elif phone_landline is None: phone_landline = phone_2 elif phone_landline != phone_2: invalid(mgnr, 'Tel.Nr.', phone_2) zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] postal_dest = lookup_plz(int(m['PLZ']) if m['PLZ'] else None, m['Ort'], address) #if mgnr in fbs: # gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020} # if len(gems) == 1: # print(GEM_MAP[list(gems)[0]]) okz = postal_dest % 100000 if postal_dest else None kgnr = lookup_kgnr(okz) active = m['Aktives Mitglied'] or False if kgnr is None: invalid(mgnr, 'KGNr.', ort) active = False elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: glnr = list(GROSSLAGE_MAP.values())[0] print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})') f_kg.write(csv.format_row(kgnr, glnr)) if 9999 not in GEM_MAP: GEM_MAP[9999] = [] GEM_MAP[9999].append((kgnr, 0)) if postal_dest is None: invalid(mgnr, 'PLZ', None) continue f_m.write(csv.format_row( mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid, m['Volllieferant'] or False, m['Buchführend'] or False, False, active, iban, bic, 'AT', postal_dest, address or '-', email, phone_landline, phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None, kgnr, m['Anmerkung'] )) MEMBER_MAP[mgnr] = { 'default_kgnr': kgnr } if billing_name: f_mba.write(csv.format_row(mgnr, billing_name, 'AT', postal_dest, address or '-')) def migrate_contracts(in_dir: str, out_dir: str) -> None: def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]: if nr_str is None: return [] elif nr_str.isdigit() and len(nr_str) <= 6: return [nr_str] elif nr_str.count('/') == 1: parts = nr_str.split('/') if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit(): return [parts[0], parts[1]] elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3: return [nr_str] if nr_str.count('/') > 1: parts = nr_str.split('/') if all([p.isdigit() for p in parts]): if all([len(p) <= 3 for p in parts[1:]]): return [f'{parts[0]}/{p}' for p in parts[1:]] elif all([len(p) == len(parts[0]) for p in parts]): return parts if nr_str.startswith(f'{kgnr:05}'): return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr) if nr_str.endswith(' 2000'): return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr) parts = re.split(r' *[,;+&] *', nr_str) if len(parts) == 1: parts = nr_str.split(' / ') if len(parts) == 1 and ' ' not in nr_str: parts = nr_str.split(' ') if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str: parts = nr_str.split(' ') if len(parts) > 1: return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)] m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str) if m is not None: b = m.group(1) f = int(m.group(2)) t = int(m.group(3)) if t < f: t += f - (f % pow(10, len(m.group(3)))) if t - f < 50: return [ gst for counter in range(f, t + 1) for p in [f'{b or ""}{counter}'] for gst in parse_gstnrs(p, kgnr, mgnr) ] invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}') return [] with open(f'{out_dir}/contract.csv', 'w+') as f_c, open(f'{out_dir}/area_commitment.csv', 'w+') as f_fb: f_c.write('vnr;mgnr;year_from;year_to\n') f_fb.write('vnr;kgnr;gstnr;rdnr;area;sortid;attrid;cultid\n') for fb in csv.parse(f'{in_dir}/TFlaechenbindungen.csv'): if fb['Von'] is None and fb['Bis'] is None: continue parz: str = fb['Parzellennummer'] vnr: int = fb['FBNR'] gem = GEM_MAP[fb['GNR']] kgnr = gem[0][0] f_c.write(csv.format_row( vnr, fb['MGNR'], fb['Von'], fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None )) gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) area = int(fb['Flaeche']) gst_area = int(area / (len(gstnrs) or 1)) if parz is None or parz == '0000': invalid(fb['MGNR'], 'GstNr.', f'{kgnr or 0:05}-{parz}') gstnrs = ['99999'] elif len(gstnrs) > 1 or (len(gstnrs) == 1 and gstnrs[0] != parz): convert(fb['MGNR'], 'GstNr.', f'{kgnr or 0:05}-{parz or ""}', ', '.join(gstnrs)) for i, gstnr in enumerate(gstnrs): a = area - gst_area * (len(gstnrs) - 1) if i == 0 else gst_area rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None f_fb.write(csv.format_row( vnr, kgnr, gstnr, rdnr, a, fb['SNR'], fb['SANR'], CULTIVATION_MAP[fb['BANR']] )) def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: dates = {} fixed = {} last_dates = {} def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None: if lsnr not in fixed: fixed[lsnr] = [] dates[lsnr] = date elif unique: return add(lsnr + '/2', linr, date, unique) fixed[lsnr].append(linr) def get_lsnr(date: datetime.date, lsnr: str) -> str: if date.year < 2000: return date.strftime('%y%m%d00') + lsnr[8:] else: return date.strftime('%Y%m%d') + lsnr[8:] deliveries: List[Tuple[int, str, datetime.date, int, int]] = [ (d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR']) for d in deliveries if d['Lieferscheinnummer'] and not d['Storniert'] ] lsnrs = {d[1] for d in deliveries} for lnr, lsnr, date, zwstnr, mgnr in deliveries: lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \ else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6])) if len(lsnr) == 12: if date != lsdate: if date.year == lsdate.year: lsnr_n = get_lsnr(date, lsnr) if lsnr_n not in lsnrs: print(f'{lsnr} -> {lsnr_n}') lsnr = lsnr_n else: warning_delivery(lsnr, mgnr, 'date', date) else: date = datetime.date(lsdate.year, date.month, date.day) if zwstnr not in last_dates or not date < last_dates[zwstnr]: last_dates[zwstnr] = date add(lsnr, lnr, date, unique=True) else: add(lsnr[:12], lnr, date) return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()], key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0]) def migrate_deliveries(in_dir: str, out_dir: str) -> None: modifiers = {m['ASNR']: m for m in csv.parse(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']} delivery_map = {} seasons = {} branches = {} deliveries = list(csv.parse(f'{in_dir}/TLieferungen.csv')) delivery_dict = {d['LINR']: d for d in deliveries} fixed = fix_deliveries(deliveries) with open(f'{out_dir}/delivery.csv', 'w+') as f_delivery, \ open(f'{out_dir}/delivery_part.csv', 'w+') as f_part: f_delivery.write('year;did;date;time;zwstid;lnr;lsnr;mgnr;comment\n') f_part.write('year;did;dpnr;sortid;attrid;weight;kmw;qualid;hkid;kgnr;rdnr;gerebelt;manual_weighing;spl_check;' 'hand_picked;lesemaschine;temperature;acid;scale_id;weighing_id;comment\n') for lsnr, linrs, date in fixed: if date.year not in seasons: seasons[date.year] = { 'currency': 'EUR' if date.year >= 2001 else 'ATS', 'precision': 4, 'start': date, 'end': date, 'nr': 0, } s = seasons[date.year] if date > s['end']: s['end'] = date s['nr'] += 1 snr = s['nr'] znr = delivery_dict[linrs[0]]['ZNR'] if znr not in branches: branches[znr] = {} if date not in branches[znr]: branches[znr][date] = 0 branches[znr][date] += 1 lnr = branches[znr][date] comments = [] for dpnr, linr in enumerate(linrs, start=1): d = delivery_dict[linr] delivery_map[linr] = (date.year, snr, dpnr) oe = d['OechsleOriginal'] or d['Oechsle'] kmw = GRADATION_MAP[oe] sortid, attrid = d['SNR'], d['SANR'] if len(sortid) != 2: attrid = sortid[-1] sortid = sortid[:2] print(f'{d["SNR"]} -> {sortid}/{attrid}') kgnr, rdnr = None, None if d['GNR']: gem = GEM_MAP[d['GNR']] if len(gem) == 1: kgnr = gem[0][0] if d['RNR']: rd = REED_MAP[d['RNR']] # TODO reed nr if kgnr is None: m = MEMBER_MAP[d['MGNR']] kgnr = m['default_kgnr'] if kgnr is None: warning_delivery(lsnr, d['MGNR'], 'KGNr.', None) elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: warning_delivery(lsnr, d['MGNR'], 'KGNr.', kgnr) kgnr = None waage = d['Waagentext'] scale_id, weighing_id = None, None if waage: waage = re.split(r' +', waage) scale_id = waage[1] weighing_id = waage[3] comment: Optional[str] = d['Anmerkung'] acid = d['Säure'] hand, lesemaschine = None, None if comment: comment = comment.replace('Söure', 'Säure') if comment.startswith('Säure'): acid = float(comment.split(' ')[-1].replace(',', '.')) comment = None elif comment == 'Maschine': hand = False comment = None elif comment == 'Hand': hand = True comment = None if comment: comments.append(comment) f_part.write(csv.format_row( date.year, snr, dpnr, sortid, attrid, int(d['Gewicht']), kmw, QUAL_MAP[d['QSNR']], HKID, kgnr, rdnr, d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False, hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment )) f_delivery.write(csv.format_row( date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'], '; '.join(comments) or None )) with open(f'{out_dir}/delivery_part_modifier.csv', 'w+') as f_part_mod: f_part_mod.write('year;did;dpnr;mnr\n') for m in csv.parse(f'{in_dir}/TLieferungAbschlag.csv'): if m['LINR'] not in delivery_map: continue nid = delivery_map[m['LINR']] f_part_mod.write(csv.format_row(nid[0], nid[1], nid[2], m['ASNR'])) with open(f'{out_dir}/season.csv', 'w+') as f_season, open(f'{out_dir}/modifier.csv', 'w+') as f_mod: f_season.write('year;currency;precision;start_date;end_date\n') f_mod.write('year;mnr;name;abs;rel;standard;quick_select\n') for y, s in seasons.items(): f_season.write(csv.format_row(y, s['currency'], s['precision'], s['start'], s['end'])) for m in modifiers.values(): abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None f_mod.write(csv.format_row( y, m['ASNR'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl'] )) def migrate_payments(in_dir: str, out_dir: str) -> None: pass # TODO migrate payments if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, help='The input directory where the exported csv files are stored') parser.add_argument('out_dir', type=str, help='The output directory where the migrated csv file should be stored') parser.add_argument('-q', '--quiet', action='store_true', default=False, help='Be less verbose') parser.add_argument('-d', '--database', metavar='DB', required=True, help='The sqlite database file to look up information') parser.add_argument('-o', '--origin', metavar='HKID', required=True, help='The default wine origin identifier ' '(consider that the origin is ALWAYS set according to the KGNr if available)') args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) HKID = args.origin DB_CNX = sqlite3.connect(args.database) migrate_gradation(args.in_dir, args.out_dir) migrate_branches(args.in_dir, args.out_dir) migrate_grosslagen(args.in_dir, args.out_dir) migrate_gemeinden(args.in_dir, args.out_dir) migrate_reeds(args.in_dir, args.out_dir) migrate_attributes(args.in_dir, args.out_dir) migrate_cultivations(args.in_dir, args.out_dir) migrate_members(args.in_dir, args.out_dir) migrate_contracts(args.in_dir, args.out_dir) migrate_deliveries(args.in_dir, args.out_dir) migrate_payments(args.in_dir, args.out_dir) DB_CNX.close()