Files
elwig-misc/wgmaster/migrate.py

2047 lines
86 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Dict, Any, Tuple, Optional, List, Iterable
from enum import Enum
import argparse
import os
import re
import sys
import sqlite3
import requests
import datetime
import json
import string
import utils
class WG(Enum):
MATZEN = 1
WINZERKELLER = 2
WEINLAND = 3
BADEN = 4
@classmethod
def from_str(cls, name: str):
return cls({wg.name: wg.value for wg in WG}[name])
DB_CNX: Optional[sqlite3.Connection] = None
QUIET: bool = False
CLIENT: Optional[WG] = None
USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}')
BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}')
GRADATION_MAP: Optional[Dict[float, float]] = None
CULTIVATION_MAP: Optional[Dict[int, str]] = None
BRANCH_MAP: Optional[Dict[int, str]] = None
GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None
REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None
GROSSLAGE_MAP: Optional[Dict[int, int]] = None
MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None
GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None
DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None
MODIFIER_MAP: Optional[Dict[str, Dict]] = None
SORT_MAP: Optional[Dict[str, str]] = None
ATTRIBUTE_MAP: Optional[Dict[str, str]] = None
PARAMETERS: Optional[Dict[str, str]] = None
AUSTRIA = 40
WGMASTER_PRECISION = 4
QUAL_MAP: Dict[int, str] = {
0: 'WEI',
1: 'RSW',
2: 'LDW',
3: 'QUW',
4: 'KAB',
5: 'SPL',
}
ORT_NAMES: Dict[str, Optional[str]] = {
'Pirawarth': None,
'Raggendorf': None,
'Matzen': 'Matzner',
'Matzn': None,
'Stillfried': None,
'Harras': None,
'Gänserndorf': None,
'Sulz': None,
'Brünn': None,
'Wien': None,
'Angern': None,
'Schweinbarth': None,
'Hohenruppersdorf': None,
'Grub': None,
'Auersthal': None,
'Ollersdorf': None,
'Spannberg': None,
'Ebenthal': None,
'Bockfließ': None,
'Dörfless': 'Dörfleser',
'Dörfles': None,
'Ableiding': None,
'Absberg': None,
'Eibesbrunn': None,
'Engersdorf': None,
'Enzersfeld': None,
'Großebersdorf': None,
'Hollabrunn': None,
'Korneuburg': None,
'Königsbrunn': None,
'Laa': None,
'Leopoldau': None,
'Manhartsbrunn': None,
'Mannhartsbrunn': 'Manhartsbrunner',
'Münichsthal': None,
'Pernau': None,
'Pillichsdorf': None,
'Retz': None,
'Russbach': None,
'Schleinbach': None,
'Seefeld': None,
'Seyring': None,
'Stammersdorf': None,
'Stelzendorf': None,
'Traunfeld': None,
'Tresdorf': None,
'Trumau': None,
'Wolkersdorf': None,
'Znaim': None,
'Obersdorf': None,
'Sechshaus': None,
'Rußbach': None,
'Pfaffstätten': 'Pfaffstättner',
'Berndorf': None,
'Teesdorf': None,
'Baden': 'Badner',
'Dornau': None,
'Pottendorf': None,
'Möllersdorf': None,
'Wienersdorf': None,
'Münchendorf': None,
'Hernstein': None,
'Großau': None,
'Oberwaltersdorf': None,
'Vöslau': None,
'Tribuswinkel': 'Tribuswinkler',
'Sollenau': None,
'Gutenbrunn': None,
'Kottingbrunn': None,
'Siebenhaus': None,
'Mariazell': None,
}
STREET_NAMES: Dict[str, str] = {
'Hans-Wagnerstraße': 'Hans-Wagner-Straße',
'J.Seitzstraße': 'Josef-Seitz-Straße',
'Kurhaus-Str.': 'Kurhausstraße',
'Kurhaus-Straße': 'Kurhausstraße',
'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße',
'Flustraße': 'Flurstraße',
'St.Laurentstraße': 'St.-Laurentstraße',
'Josef Seitzstraße': 'Josef-Seitz-Straße',
'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße',
'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße',
'R. Virchow-Straße': 'Rudolf-Virchow-Straße',
'Grubere Hauptstraße': 'Gruber Hauptstraße',
'Groß Inzersdorf': 'Großinzersdorf',
'Erdpress': 'Erdpreß',
'Hochleitengasse': 'Hochleithengasse',
'Bei Der Gösselmühle': 'Bei der Gösslmühle',
'Dr. Peschlstraße': 'Dr.-Peschl-Straße',
'Dr.Peschlstraße': 'Dr.-Peschl-Straße',
'Dr. Salzbornstraße': 'Dr.-Salzborn-Straße',
'Elsa Brandström-Straße': 'Elsa-Brandström-Straße',
'Franz Ecker Siedlung': 'Franz-Ecker-Siedlung',
'Franz-Ecker Siedlung': 'Franz-Ecker-Siedlung',
'Franz Gillygasse': 'Franz-Gilly-Gasse',
'Franz V. Zülowstraße': 'Franz-von-Zülow-Straße',
'Gr. Nondorf': 'Großnondorf',
'In Der Trift': 'In der Trift',
'Johann Degengasse': 'Johann-Degen-Gasse',
'Josef Fürnkranz Siedlung': 'Josef-Fürnkranz-Siedlung',
'Kaiser Franz Josef Platz': 'Kaiser-Franz-Josef-Platz',
'Klein Haugsdorf': 'Kleinhaugsdorf',
'Leopold Leuthnerstraße': 'Leopold-Leuthner-Straße',
'Lh.-Mayer-Platz': 'Landeshauptmann-Mayer-Platz',
'Manhartsbr.Straße': 'Manhartsbrunner Straße',
'Maria Lourd Weg': 'Maria-Lourd-Weg',
'U. Weißgasse Straße': 'Untere Weißgerberstraße',
'Dr. Josef-Levit-Straße': 'Dr.-Josef-Levit-Straße',
'Karl Katschthaler-Straße': 'Karl-Katschthaler-Straße',
'Pfaffstättnerstraße': 'Paffstättner Straße',
'Badnerstraße': 'Badner Straße',
'Anton Krennstraße': 'Anton-Krenn-Straße',
'Fr.Jonasstraße': 'Franz-Jonas-Straße',
'Wr.Neustädterstraße': 'Wiener Neustädter Straße',
'Wr. Neustädterstraße': 'Wiener Neustädter Straße',
'Wr. Neustäderstraße': 'Wiener Neustädter Straße',
'Ob.Ödlitzerstraße': 'Obere Ödlitzer Straße',
'Obere Ödlitzerstraße': 'Obere Ödlitzer Straße',
'Triesterstraße': 'Triester Straße',
'Dr. Dolpstraße': 'Dr.-Dolp-Straße',
'Wienersd.Hauptstr.': 'Wienersdorfer Hauptstraße',
'Wienersd.Hauptstraße': 'Wienersdorfer Hauptstraße',
'Tr.Bundesstr.': 'Triester Bundesstraße',
'Tr.Bundesstraße': 'Triester Bundesstraße',
'J.Brunastraße': 'Josef-Bruna-Straße',
'J. Brunastraße': 'Josef-Bruna-Straße',
'Ferdinand Pichlergasse': 'Ferdinand-Pichler-Gasse',
'Dr. Figlstraße': 'Dr.-Figl-Straße',
'Franz Broschekplatz': 'Franz-Borschek-Platz',
'Tribuswinklerstraße': 'Tribuswinkler Straße',
'Rudolf Kaspargasse': 'Rudolf-Kaspar-Gasse',
'Traiskirchnerstraße': 'Traiskirchner Straße',
'Dr. Theodor Körnerstraße': 'Dr.-Theodor-Körner-Straße',
'Richard Klingerstraße': 'Richard-Klinger-Straße',
'Karl Langegasse': 'Karl-Lange-Gasse',
'Leopold Hörbingerstraße': 'Leopold-Hörbiger-Straße',
'Leopold Hörbinger Straße': 'Leopold-Hörbiger-Straße',
'Rudolf Zöllnergasse': 'Rudolf-Zöllner-Gasse',
'Anton Rauchstraße': 'Anton-Rauch-Straße',
'Isabellestraße': 'Erzherzogin-Isabelle-Straße',
'Erzherzogin Isabelle Straße': 'Erzherzogin-Isabelle-Straße',
'E. Penzig Franz Straße': 'Edgar-Penzing-Franz-Straße',
'Hernsteinerstr Straße': 'Hernsteiner Straße',
}
def new(t: str, ids: Any, name: str, comment: str = None) -> None:
print(f'\x1B[1;32mNew {t:>6}: {str(ids):>10} ({name}{", " + comment if comment else ""})\x1B[0m', file=sys.stderr)
def success(mgnr: int, key: str, value) -> None:
if not QUIET:
print(f'\x1B[1;32m{mgnr:>6} : {key:<12} {value}\x1B[0m', file=sys.stderr)
def warning(mgnr: int, key: str, value, active: bool) -> None:
act = 'A' if active else '?' if active is None else ' '
print(f'\x1B[1;33m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def invalid(mgnr: int, key: str, value, active: Optional[bool]) -> None:
act = 'A' if active else '?' if active is None else ' '
print(f'\x1B[1;31m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None:
if not QUIET:
print(f'\x1B[1m{lsnr_1:<15} -> {lsnr_2:<15}\x1B[0m', file=sys.stderr)
def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
print(f'\x1B[1;33m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
print(f'\x1B[1;31m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def convert(mgnr: int, key: str, old_value: str, new_value) -> None:
if not QUIET:
print(f'\x1B[1m{mgnr:>6} : {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str],
billing: Optional[str] = None) -> None:
if not QUIET:
print(f'\x1B[1m{mgnr:>6} : '
f'{" / ".join([e or "" for e in old_name])} -> '
f'{" / ".join([e or "" for e in new_name])}'
f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr)
def check_lfbis_nr(nr: str) -> bool:
# https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41
if len(nr) != 7 or not nr.isdigit():
return False
s = 0
for i, ch in enumerate(nr[:-1]):
s += int(ch) * (7 - i)
v = (11 - (s % 11)) % 10
return v == int(nr[-1])
def check_ustid_nr_at(nr: str) -> bool:
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
return False
s = 0
for i, ch in enumerate(nr[3:-1]):
s += sum(map(int, str(int(ch) * (i % 2 + 1))))
v = (96 - s) % 10
return v == int(nr[-1])
def modulo(a: str, b: int) -> int:
s = 0
for ch in a:
s = (s * 10 + int(ch)) % b
return s
def check_iban(iban: str) -> bool:
if not IBAN_RE.fullmatch(iban):
return False
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
return modulo(s, 97) == 1
def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]:
if nr is None:
return None
nr = nr.replace('/', ' ').strip()
if nr.count('-') > 1 or len(nr.split('-')[-1]) > 3:
nr = nr.replace('-', '')
if nr[0] == '0':
nr = '+43 ' + nr[1:]
elif nr.startswith('43'):
nr = '+' + nr
elif CLIENT == WG.WINZERKELLER and ort:
ort = ort.upper().strip()
if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', 'EIBESBRUNN'):
nr = f'+43 2245 {nr}'
elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'):
nr = f'+43 2944 {nr}'
elif ort in ('HADRES'):
nr = f'+43 2943 {nr}'
else:
raise RuntimeError(f'Unable to find telephone number of "{ort}" ({nr})')
if nr.startswith('+43'):
if nr[4] == '6':
nr = nr.replace(' ', '')
nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}'
elif nr[4] == '1':
nr = nr.replace(' ', '')
nr = f'{nr[:3]} {nr[3]} {nr[3:]}'
elif nr[4] == '2':
nr = nr.replace(' ', '')
nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}'
return nr.strip()
def check_phone_nr(nr: str, mgnr: int, active: Optional[bool]) -> Tuple[Optional[str], Optional[str], Optional[str]]:
m = re.fullmatch(r'(.*?) ([A-Za-zäöüÄÖÜßẞ]+)$', nr)
if m is not None:
nr = m.group(1)
comment = m.group(2).strip()
if comment == 'Fi':
comment = 'Firma'
else:
comment = None
nnr = normalize_phone_nr(nr)
if len(nnr) <= 10 or nnr[0] != '+' or re.fullmatch(r'[+0-9 \-]+', nnr) is None:
invalid(mgnr, 'Tel.Nr.', nr, active)
return nnr, None, None
return nnr, 'mobile' if nnr[4] == '6' else 'landline', comment
def fix_street_name(name: str) -> str:
if name in STREET_NAMES:
return STREET_NAMES[name]
orte = [(k, v) for k, v in ORT_NAMES.items() if name.startswith(k + 'er')]
if (name.endswith('straße') or name.endswith('platz')) and len(orte) == 1:
return f'{orte[0][1] or orte[0][0] + "er"} {name[len(orte[0][0]) + 2:].title()}'.replace(' ', ' ')
return name
def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/')
if r.status_code != 200:
return None
data = r.json()
return sum([n['fl'] for n in data['properties']['nutzungen']])
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
members = {}
for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
if f['MGNR'] not in members:
members[f['MGNR']] = {}
members[f['MGNR']][f['FBNR']] = f
return members
def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]:
if plz is None or ort is None:
return None
ort = ort.replace('0', 'O').replace('SZ', 'SS')
if ort.upper() == 'PILLICHSDORF' and plz == 2212:
plz = 2211
elif ort.upper() == 'ENZERSFELD' and plz == 2203:
plz = 2202
elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212:
ort = 'GROSSENGERSDORF'
elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123:
plz = 2122
elif ort.upper() == 'FRAUENDORF' and plz == 3710:
plz = 3714
elif ort.upper() == 'MAISSAU' and plz == 3721:
ort = 'UNTERDÜRNBACH'
elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074:
plz = 2070
elif ort.upper() == 'DROSENDORF' and plz == 2095:
ort = 'DROSENDORF ALTSTADT'
elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033:
plz = 2023
elif ort.upper() == 'NIEDERSCHLEINZ' and plz == 3721:
plz = 3714
elif ort.upper() == 'OEYNHAUSEN' and plz == 2500:
plz = 2512
elif ort.upper() == 'MÖLLERSDORF' and plz == 2513:
plz = 2514
elif ort.upper() == 'SOOSS' and plz == 2500:
ort = 'SOOẞ'
plz = 2504
elif ort.upper() == 'ÖDLITZ' and plz == 2560:
ort = 'BERNDORF'
elif ort.upper() == 'ST.VEIT' and plz == 2562:
ort = 'BERNDORF'
plz = 2560
elif ort.upper() == 'SCHÖNAU/TRIESTING' and plz == 2525:
ort = 'SCHÖNAU AN DER TRIESTING'
elif ort.upper() == 'BAD FISCHAU - BRUNN' and plz == 2721:
ort = 'BAD FISCHAU-BRUNN'
elif ort.upper() == 'NEUSIEDL/ZAYA':
ort = 'NEUSIEDL AN DER ZAYA'
elif ort.upper() == 'SIERNDORF/MARCH':
ort = 'SIERNDORF AN DER MARCH'
cur = DB_CNX.cursor()
cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,))
rows: List[Tuple[int, str, str]] = cur.fetchall()
cur.close()
ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss')
rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
if len(rows_m) > 1:
rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
if len(rows_m) == 1:
return plz * 100000 + rows_m[0]
if ort == 'VELM-GÖTZENDORF':
parts = address.split(' ')
street = ' '.join(parts[:-1])
nr = int(parts[-1].split('-')[0])
if street == 'Landstraße' and nr <= 48 \
or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \
or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)):
# Velm
return plz * 100000 + 3572
else:
# Götzendorf
return plz * 100000 + 3571
elif ort == 'BAD FISCHAU-BRUNN':
if 'viaduktstraße' in address.lower():
return plz * 100000 + 6560
elif 'teichplatz' in address.lower():
return plz * 100000 + 6560
raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})')
def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
if okz is None:
return None
cur = DB_CNX.cursor()
cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,))
rows: List[Tuple[int]] = cur.fetchall()
cur.close()
if len(rows) == 1:
return rows[0][0]
return None
def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
gem_name, hkid = None, None
if CLIENT == WG.MATZEN:
hkid = "'WLWV'"
if name.lower() == 'dörfles':
gem_name = 'Weikendorf'
elif name.lower() == 'velm-götzendorf':
return [(6027, 30859), (6007, 30859)]
elif name.lower() == 'grub':
name = 'Grub an der March'
elif CLIENT == WG.WINZERKELLER:
hkid = "'WLWV', 'WIEN', 'WLWG'"
if name.endswith('*'):
name = name[:-1].strip()
if name.lower() == 'joching':
return [(12185, 31351)]
elif name.lower() == 'kreuttal':
return [(15206, 31627), (15221, 31627), (15226, 31627)]
elif name.lower() == 'hochleithen':
return [(15219, 31622), (15223, 31622), (15202, 31622)]
elif name.lower() == 'wolfpassing':
gem_name = 'Hochleithen'
elif name.lower() == 'seebarn':
gem_name = 'Harmannsdorf'
elif name.lower() == 'königsbrunn':
gem_name = 'Enzersfeld im Weinviertel'
elif name.lower() == 'wien':
return [(1616, 90001), (1617, 90001)]
elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'):
gem_name = 'Sitzendorf an der Schmida'
elif name.lower() == 'dietersdorf':
gem_name = 'Hollabrunn'
elif name.lower() == 'altenmarkt':
name = 'Altenmarkt im Thale'
elif name.lower() == 'eitzerstal':
name = 'Eitzersthal'
elif name.lower() == 'gross':
gem_name = 'Hollabrunn'
elif name.lower() == 'auggenthal':
name = 'Augenthal'
elif name.lower() == 'karlsdorf':
name = 'Pfaffendorf'
elif name.lower() == 'kleinhaugsdorf':
name = 'Augenthal'
elif name.lower() == 'merkersdorf':
gem_name = 'Hardegg'
elif name.lower() == 'retz':
name = 'Retz Altstadt'
elif name.lower() == 'heldenberg':
return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)]
elif name.lower() == 'retzbach':
return [(18129, 31038), (18112, 31038), (18117, 31038)]
elif name.lower() == 'dietmannsdorf':
gem_name = 'Zellerndorf'
elif name.lower() == 'sierndorf':
gem_name = 'Sierndorf'
elif name.lower() == 'waltersdorf':
gem_name = 'Staatz'
elif name.lower() == 'viendorf':
name = 'Viendorf Weingebirge'
elif name.lower() == 'stoitzendorf':
return [(10137, 31105)]
elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'):
name = name.replace(' ', '')
elif name.lower() == 'drosendorf':
name = 'Drosendorf Stadt'
elif name.lower() == 'etzmannsdorf':
name = 'Etzmannsdorf bei Straning'
elif name.lower() == 'roggendorf':
gem_name = 'Röschitz'
elif name.lower() == 'wilhelmsdorf':
gem_name = 'Poysdorf'
elif name.lower() == 'nappersdorf-kammersdorf':
return [(9008, 31028), (9026, 31028), (9032, 31028), (9037, 31028), (9051, 31028), (9067, 31028)]
elif CLIENT == WG.WEINLAND:
hkid = "'WLWV'"
if name.lower() == 'neusiedl/zaya':
name = 'Neusiedl an der Zaya'
elif name.lower() == 'bad pirawarth':
name = 'pirawarth'
elif name.lower() == 'sierndorf':
gem_name = 'Jedenspeigen'
elif name.lower() == 'velm-götzendorf':
return [(6027, 30859), (6007, 30859)]
elif CLIENT == WG.BADEN:
hkid = "'WLTH'"
if name.lower() == 'baden':
gem_name = 'Baden'
elif name.lower() in ('bad fischau-brunn', 'bad fischau - brunn'):
return [(23402, 32301), (23401, 32301)]
elif name.lower() == 'bad vöslau':
return [(4005, 30603), (4009, 30603), (4035, 30603)]
elif name.lower() == 'berndorf':
return [(4303, 30605), (4304, 30605), (4032, 30605), (4305, 30605)]
elif name.lower() in ('berndorf-ödlitz', 'ödlitz'):
return [(4304, 30605)]
elif name.lower() == 'eggendorf':
return [(23437, 32305), (23426, 32305)]
elif name.lower() == 'purkersdorf':
return []
elif name.lower() == 'schönau':
gem_name = 'Schönau an der Triesting'
elif name.lower() == 'siegersdorf':
gem_name = 'Pottendorf'
elif name.lower() == 'sooss':
name = 'Sooß'
elif name.lower() == 'st.veit':
return [(4303, 30605)]
elif name.lower() == 'wien':
return []
elif name.lower() == 'gramatneusiedl':
return []
else:
raise NotImplementedError(f'Gemeinde lookup for {CLIENT} not yet implemented')
cur = DB_CNX.cursor()
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name "
"FROM AT_kg k "
"JOIN AT_gem g ON g.gkz = k.gkz "
"JOIN wb_gem wg ON wg.gkz = g.gkz "
f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})",
(name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ')
.replace('Groß ', 'Groß').replace('-', ''),))
rows: List[Tuple[int, str, int, str]] = cur.fetchall()
cur.close()
if gem_name:
rows = [row for row in rows if row[3] == gem_name]
if len(rows) == 1:
return [(k, g) for k, _, g, _ in rows]
raise RuntimeError(f'Unable to find Gemeinde "{name}" ({rows})')
def lookup_kg_name(kgnr: int) -> str:
cur = DB_CNX.cursor()
cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,))
rows = cur.fetchall()
cur.close()
return rows[0][0] if len(rows) > 0 else None
def lookup_rnr_name(rnr: int) -> str:
return REED_MAP[rnr][2]
def lookup_hkid(kgnr: Optional[int], qualid: str) -> str:
hkid = None
if qualid in ('WEI', 'RSW'):
return 'OEST'
elif kgnr is None:
if CLIENT in (WG.MATZEN, WG.WINZERKELLER, WG.BADEN, WG.WEINLAND):
hkid = 'WLNO'
else:
raise NotImplementedError(f'Default hkid for {CLIENT} not implemented yet')
else:
cur = DB_CNX.cursor()
cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz "
"WHERE kg.kgnr = ?", (kgnr,))
rows = cur.fetchall()
cur.close()
hkid = rows[0][0]
if qualid == 'LDW':
if hkid == 'WIEN':
hkid = 'WLXX'
elif hkid[:2] in ('WL', 'BL', 'SL'):
hkid = hkid[:2] + 'XX'
else:
if hkid.startswith('WL') and not hkid.endswith('XX'):
hkid = 'WLNO'
return hkid
def guess_glnr(kgnr: int) -> Optional[int]:
cur = DB_CNX.cursor()
cur.execute("SELECT kgnr FROM AT_kg "
"WHERE gkz / 100 != 900 AND gkz / 100 = (SELECT gkz / 100 FROM AT_kg WHERE kgnr = ?)", (kgnr,))
rows0 = cur.fetchall()
cur.execute("SELECT kgnr FROM AT_kg "
"WHERE gkz / 100 != 900 AND gkz = (SELECT gkz FROM AT_kg WHERE kgnr = ?)", (kgnr,))
rows1 = cur.fetchall()
cur.close()
glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows0 if k in GROSSLAGE_KG_MAP]))
if len(glnrs) == 0:
return None
elif len(glnrs) == 1:
return glnrs[0]
glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows1 if k in GROSSLAGE_KG_MAP]))
return glnrs[0] if len(glnrs) > 0 else None
def migrate_gradation(in_dir: str, out_dir: str) -> None:
global GRADATION_MAP
GRADATION_MAP = {}
for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'):
GRADATION_MAP[g['Oechsle']] = g['KW']
def migrate_branches(in_dir: str, out_dir: str) -> None:
global BRANCH_MAP
BRANCH_MAP = {}
with utils.csv_open(f'{out_dir}/branch.csv') as f:
f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr')
for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'):
kennbst = b['Kennbst'] or ('G' if CLIENT == WG.WEINLAND else None)
BRANCH_MAP[b['ZNR']] = kennbst
address = b['Straße']
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
if CLIENT == WG.MATZEN:
address = 'Schloßstraße 6'
postal_dest = 224303541
tel, mob = normalize_phone_nr(b['Telefon']), None
if tel and tel[4] == '6':
mob, tel = tel, None
f.row(kennbst, b['Name'].strip().title(), AUSTRIA, postal_dest, address,
tel, normalize_phone_nr(b['Telefax']), mob)
def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
global GROSSLAGE_MAP
GROSSLAGE_MAP = {}
glnr = 0
with utils.csv_open(f'{out_dir}/wb_gl.csv') as f:
f.header('glnr', 'name')
for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
glnr += 1
if CLIENT == WG.WINZERKELLER and gl['GLNR'] == 8:
GROSSLAGE_MAP[8] = 6
continue
GROSSLAGE_MAP[gl['GLNR']] = glnr
f.row(glnr, gl['Bezeichnung'])
def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
global GEM_MAP, GROSSLAGE_KG_MAP
GEM_MAP, GROSSLAGE_KG_MAP = {}, {}
inserted = set()
with utils.csv_open(f'{out_dir}/wb_kg.csv') as f:
f.header('kgnr', 'glnr')
for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'):
gems = lookup_gem_name(g['Bezeichnung'])
GEM_MAP[g['GNR']] = gems
for kgnr, gkz in gems:
if kgnr in inserted:
continue
inserted.add(kgnr)
glnr = GROSSLAGE_MAP[g['GLNR']]
GROSSLAGE_KG_MAP[kgnr] = glnr
f.row(kgnr, glnr)
def migrate_reeds(in_dir: str, out_dir: str) -> None:
global REED_MAP
REED_MAP = {}
with utils.csv_open(f'{out_dir}/wb_rd.csv') as f:
f.header('kgnr', 'rdnr', 'name')
for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'):
name: str = r['Bezeichnung'].strip()
if name.isupper() or name.islower():
name = name.title()
try:
gem = GEM_MAP[r['GNR']]
kgnr = gem[0][0]
if len(gem) != 1:
print(gem, name, '->', gem[0])
except KeyError:
print(f'Invalid GNR {r["GNR"]} for reed {name}')
continue
rdnr = max([n for k, n, _ in REED_MAP.values() if k == kgnr] or [0]) + 1
REED_MAP[r['RNR']] = (kgnr, rdnr, name)
f.row(kgnr, rdnr, name)
def migrate_attributes(in_dir: str, out_dir: str) -> None:
global ATTRIBUTE_MAP
ATTRIBUTE_MAP = {}
with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
f.header('attrid', 'name', 'active', 'max_kg_per_ha', 'strict', 'fill_lower')
for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
if a['SANR'] is None:
continue
max_kg = int(a['KgProHa']) if a['KgProHa'] is not None else None
if max_kg == 10_000:
max_kg = None
attrid = a['SANR']
if attrid == 'BIO':
attrid = 'B'
ATTRIBUTE_MAP[a['SANR']] = attrid
if attrid == 'B':
continue
f.row(attrid, a['Attribut'], True, max_kg, False, 0)
if CLIENT == WG.MATZEN:
f.row('M', 'Matzen', False, None, False, 0)
f.row('HU', 'Huber', False, None, False, 0)
ATTRIBUTE_MAP['M'] = 'M'
ATTRIBUTE_MAP['HU'] = 'HU'
elif CLIENT == WG.WINZERKELLER:
f.row('F', 'Fixpreis', False, None, False, 0)
ATTRIBUTE_MAP['F'] = 'F'
elif CLIENT == WG.BADEN:
f.row('D', 'DAC', False, 7500, False, 0)
f.row('K', 'Kabinett', False, None, False, 0)
ATTRIBUTE_MAP['D'] = 'D'
ATTRIBUTE_MAP['K'] = 'K'
def migrate_cultivations(in_dir: str, out_dir: str) -> None:
global CULTIVATION_MAP
CULTIVATION_MAP = {}
with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f:
f.header('cultid', 'name', 'description')
for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
name: str = c['Bezeichnung']
cultid = name[0].upper()
if name.isupper():
cultid = name
elif name == 'Normal':
cultid = None
elif 'biolog' in name.lower():
cultid = 'B'
name = 'Bio'
CULTIVATION_MAP[c['BANR']] = cultid
if cultid is None:
continue
f.row(cultid, name, None)
def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None:
with utils.csv_open(f'{out_dir}/area_commitment_type.csv') as f:
f.header('vtrgid', 'sortid', 'attrid', 'disc', 'min_kg_per_ha',
'penalty_per_kg', 'penalty_amount', 'penalty_none')
if not os.path.exists(f'{in_dir}/TLiefermengen.csv'):
return
for t in utils.csv_parse_dict(f'{in_dir}/TLiefermengen.csv'):
sortid: str = t['SNR']
if not sortid or sortid == 'SV':
continue
menge = int(t['ErwarteteLiefermengeProHa'])
attrid = ATTRIBUTE_MAP[t['SANR']] if t['SANR'] else None
f.row(sortid + (attrid or ''), sortid[:2], attrid or sortid[2:] or None, None, menge,
None, None, None)
if CLIENT == WG.MATZEN:
f.row('BM', 'BM', None, None, None, None, None, None, None)
def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]:
letters = string.ascii_letters + 'äöüßÄÖÜẞ-'
double_names = ['eva maria', 'maria theresia']
def is_alpha(s: str) -> bool:
return all(c in letters for c in s) if s.lower() not in double_names else True
if CLIENT == WG.WINZERKELLER:
if 'BEZIRKSBAUERNKAMMER' == family_name:
return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach'
elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'):
return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach'
elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN':
return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen'
elif CLIENT == WG.BADEN:
if family_name in ('Marktgemeinde', 'Weinbauverein'):
return None, None, None, None, None, f'{family_name} {given_name}'
if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \
len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name):
return None, given_name.title(), None, family_name.title(), None, None
prefix: Optional[str] = None
middle_names: Optional[str] = None
suffix: Optional[str] = None
billing_name: Optional[str] = None
if given_name.startswith('z.H. '):
billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR')
parts = given_name.split(' ')
given_name = parts[1]
family_name = parts[2]
given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ')
given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)',
lambda m: m.group(0) if m.group(1) == 'FH' else
f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name)
given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE)
titles = ''
def repl_title(m: re.Match) -> str:
nonlocal titles, suffix
t = m.group(1).lower().replace(' ', '').replace('.', '')
match t:
case 'jun': suffix = 'jun.'
case 'sen': suffix = 'sen.'
case 'dr': titles += 'Dr. '
case 'mag': titles += 'Mag. '
case 'ing': titles += 'Ing. '
case 'di(fh)': titles += 'DI (FH) '
case 'dipling': titles += 'DI '
case 'dipli': titles += 'DI '
case 'di': titles += 'DI '
case 'dkfm': titles += 'Dipl.-Kfm. '
case 'ökrat': titles += 'ÖkR '
case 'lkr': titles += 'ÖkR '
return ' '
title_re = re.compile(r',?((di ?\(fh\))|\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?i(ng)?|di|ök\.rat|lkr)\b)\.?', re.IGNORECASE)
given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name))
family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name))
if titles:
prefix = titles.strip()
if given_name.lower() in ('weingut', 'weinbau'):
parts = family_name.split(' ')
return prefix, None, middle_names, ' '.join(parts[:-1]), suffix, given_name + ' ' + ' '.join(parts)
elif given_name.lower().startswith('weinbau ') or given_name.startswith('weingut '):
parts = given_name.split(' ')
return prefix, None, middle_names, family_name, suffix, ' '.join(parts[:-1]) + ' ' + family_name + ' ' + parts[-1]
elif family_name.lower() in ('weingut', 'weinbau'):
parts = given_name.split(' ')
return prefix, None, middle_names, ' '.join(parts[:-1]), suffix, family_name + ' ' + ' '.join(parts)
family_parts = family_name.split(' ')
last = family_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr', 'og', 'gmbh'):
family_name = ' '.join(family_parts[:-1])
if ' ' not in family_name and len(family_name) > 4:
family_name = family_name.title()
if family_name.startswith('Gem.'):
family_name = 'GeM ' + family_name[5:]
billing_name = family_name + ' ' + {'kg': 'KG', 'keg.': 'KEG', 'og': 'OG', 'gmbh': 'GmbH'}.get(last, 'GesbR')
if given_name.count(' ') == 1:
parts = given_name.split(' ')
return None, parts[0], None, parts[1], None, billing_name
elif is_alpha(given_name):
return prefix, given_name.title(), middle_names, family_name, suffix, billing_name
given_parts = given_name.split(' ')
last = given_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr', 'og', 'gmbh'):
given_name = ' '.join(given_parts[:-1]).title()
family_name = family_name.title()
billing_name = family_name + ' ' + {'kg': 'KG', 'keg.': 'KEG', 'og': 'OG', 'gmbh': 'GmbH'}.get(last, 'GesbR')
return prefix, given_name, middle_names, family_name, suffix, billing_name
if ' ' in family_name or '.' in family_name:
if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'):
billing_name = family_name.title()
family_name = ' '.join(family_name.split(' ')[1:]).title()
elif family_name.lower().endswith('veltlinerhof') or family_name.lower().endswith('biohof'):
billing_name = ' '.join(family_name.split(' ')[::-1]).title()
family_name = ' '.join(family_name.split(' ')[:-1]).title()
elif 'u.' in family_name:
billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und '))
family_name = family_name.split(' ')[0].title()
else:
billing_name = family_name
family_name = family_name.split(' ')[-1].title()
if ' + ' in given_name:
parts = given_name.split(' + ')
family_name = family_name.title()
billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] +
f' {billing_name or family_name}')
given_name = parts[0].title()
else:
family_name = family_name.title()
given_name = given_name.title()
return prefix, given_name, middle_names, family_name, suffix, billing_name
def migrate_members(in_dir: str, out_dir: str) -> None:
global MEMBER_MAP
MEMBER_MAP = {}
members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')]
mgnrs = [m['MGNR'] for m in members]
fbs = parse_flaechenbindungen(in_dir)
with (utils.csv_open(f'{out_dir}/member.csv') as f_m, \
utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \
utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel, \
utils.csv_open(f'{out_dir}/member_email_address.csv') as f_email, \
utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg):
f_m.header(
'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'organic', 'funktionär', 'active', 'deceased',
'iban', 'bic', 'country', 'postal_dest', 'address', 'default_kgnr', 'comment')
f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
f_tel.header('mgnr', 'nr', 'type', 'number', 'comment')
f_email.header('mgnr', 'nr', 'address', 'comment')
for m in members:
mgnr: int = m['MGNR']
family_name: str = m['Nachname']
given_name: str = m['Vorname']
funktionaer, deceased = False, False
if family_name is None and given_name is None:
continue
elif m['Anmerkung'] == 'Musterbetrieb':
continue
elif CLIENT == WG.BADEN and family_name == 'Winzergenoss.':
continue
given_name = given_name or ''
if CLIENT == WG.MATZEN and given_name.startswith(' '):
funktionaer = True
if CLIENT == WG.WINZERKELLER and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name):
deceased = True
family_name = family_name.replace('*', '').replace('(+)', '')
given_name = given_name.replace('*', '').replace('(+)', '')
family_name = utils.remove_spaces(family_name)
given_name = utils.remove_spaces(given_name).replace(', ', ',')
ret = normalize_name(family_name, given_name)
prefix, given_name, middle_names, family_name, suffix, billing_name = ret
n1 = utils.remove_spaces(' '.join(r or '' for r in ret))
n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or ''))
if billing_name or n1.lower() != n2.lower():
convert_name(mgnr, (m['Nachname'], m['Vorname']),
(prefix, given_name, middle_names, family_name, suffix), billing_name)
if not given_name or not family_name:
given_name = given_name or ''
family_name = family_name or ''
invalid(mgnr, 'Name', n1, active)
bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None
if bnr is not None:
bnr = bnr.replace('.', '')
if len(bnr) == 10:
bnr = bnr.removesuffix('000')
elif len(bnr) == 6:
bnr = '0' + bnr
elif bnr.endswith(' inaktiv'):
bnr = bnr.split(' ')[0]
if not check_lfbis_nr(bnr):
if bnr in ('0', '1234567'):
warning(mgnr, 'BetriebsNr.', bnr, active)
else:
invalid(mgnr, 'BetriebsNr.', bnr, active)
bnr = None
ustid_nr: Optional[str] = m['UID']
if ustid_nr is not None:
ustid_nr = ustid_nr.replace(' ', '')
if len(ustid_nr) == 8 and ustid_nr.isdigit():
ustid_nr = 'ATU' + ustid_nr
elif not USTID_NR_RE.fullmatch(ustid_nr):
invalid(mgnr, 'UID', ustid_nr, active)
ustid_nr = None
if ustid_nr and not check_ustid_nr_at(ustid_nr):
if ustid_nr == 'ATU11111111':
warning(mgnr, 'UID', ustid_nr, active)
else:
invalid(mgnr, 'UID', ustid_nr, active)
ustid_nr = None
iban: Optional[str] = m['IBAN']
bic: Optional[str] = m['BIC']
blz: Optional[int] = m['BLZ']
kto_nr: Optional[str] = m['KontoNr']
if iban is not None:
iban = iban.replace(' ', '')
if not check_iban(iban):
invalid(mgnr, 'IBAN', iban, active)
iban = None
if bic is not None:
bic = bic.upper()
if bic == 'RLNWATAUE':
bic = 'RLNWATWWAUE'
elif bic == 'RLNWATWMIB':
bic = 'RLNWATWWMIB'
elif bic == 'VBÖEATWW':
bic = 'VBOEATWW'
elif bic == 'RLNWATBAD':
bic = 'RLNWATWWBAD'
elif bic == 'SPBDATXXX':
bic = 'SPBDAT21'
elif bic == 'IBAATWWXXX':
bic = 'GIBAATWW'
if not BIC_RE.fullmatch(bic):
invalid(mgnr, 'BIC', bic, active)
bic = None
if bic is not None:
if len(bic) == 11 and bic.endswith('XXX'):
bic = bic[:-3]
plz = int(m['PLZ']) if m['PLZ'] else None
ort: Optional[str] = m['Ort']
address: Optional[str] = m['Straße']
parts = ort.split(' ') if ort else ['']
if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()):
if len(parts) > 1 and parts[-2].isdigit():
ort = ' '.join(parts[:-2])
new_address = parts[-2] + parts[-1]
else:
ort = ' '.join(parts[:-1])
new_address = parts[-1]
if address is not None and address != ' ' and address != new_address:
raise RuntimeError(f'Unable to rewrite address: "{address}" -> "{new_address}"')
address = parts[-1]
if CLIENT == WG.WINZERKELLER and ort == 'JETZELDORF':
ort = 'JETZELSDORF'
if ort:
ort = ort.upper().strip()
if address is not None:
address_old = address
address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(),
utils.remove_spaces(address).title())
if address.startswith('Haus Nr.') or \
address.startswith('Nr. ') or \
address.startswith('Nr ') or \
(len(address) > 0 and address[0].isdigit()):
address = ort.title() + ' ' + address.split(' ')[-1]
address = address.replace('Pelz.', 'Pelzg.').replace('strasse', 'straße').replace('strassse', 'straße')\
.replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße').replace('st.', 'straße ')\
.replace('str.', 'straße').replace('ster.', 'straße').replace('g.', 'gasse ').replace('pl.', 'platz ')\
.replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\
.replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\
.replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß')
address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address)
if address.startswith('Ob. '):
address = address.replace('Ob. ', 'Obere ', 1)
address = address.replace(' Nr. ', ' ')
address = re.sub(r'([^0-9]+?)( +[0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address)
address = utils.remove_spaces(address)
if address_old != address:
convert(mgnr, 'Adresse', address_old, address)
email: Optional[str] = m['EMail']
emails = []
if email is not None:
for email in (email.split(' ') if CLIENT == WG.BADEN else email.split(' + ')):
if email.isupper():
email = email.lower()
if not EMAIL_RE.fullmatch(email):
invalid(mgnr, 'E-Mail', m['EMail'], active)
else:
parts = email.split('@')
emails.append(f'{parts[0]}@{parts[1].lower()}')
zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0]
if CLIENT == WG.WINZERKELLER and plz == 1228:
plz = 1020
postal_dest = lookup_plz(plz, ort, address)
#if mgnr in fbs:
# gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020}
# if len(gems) == 1:
# print(GEM_MAP[list(gems)[0]])
okz = postal_dest % 100000 if postal_dest else None
kgnr = lookup_kgnr(okz)
active = m['Aktives Mitglied'] or False
if kgnr is None:
invalid(mgnr, 'KGNr.', ort, active)
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
glnr = guess_glnr(kgnr)
if glnr:
new('KG', kgnr, lookup_kg_name(kgnr), f'GL {glnr}')
f_kg.row(kgnr, glnr)
if 9999 not in GEM_MAP:
GEM_MAP[9999] = []
GEM_MAP[9999].append((kgnr, 0))
else:
kgnr = None
if postal_dest is None:
invalid(mgnr, 'PLZ', None, active)
continue
pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None
f_m.row(
mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
m['BHKontonummer'], zwstid, bnr, ustid_nr,
m['Volllieferant'] or False, m['Buchführend'] or False, False, funktionaer, active, deceased,
iban, bic, AUSTRIA, postal_dest, address or '-', kgnr, m['Anmerkung']
)
phone_1: Optional[str] = m['Telefon']
phone_2: Optional[str] = m['Telefax']
phone_3: Optional[str] = m['Mobiltelefon']
phone_4: Optional[str] = m['EMail'] if m['EMail'] and '@' not in m['EMail'] else None
numbers = []
if CLIENT == WG.WINZERKELLER:
# Telefax (phone_2) not used
numbers = {}
def add_number(nr: str, fax: bool = False, comment: str = None, fax_only: bool = False) -> None:
mob = nr[4] == '6'
numbers[nr] = {'mobile': mob, 'landline': not mob and not fax_only, 'fax': fax, 'comment': None}
if phone_1:
phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\
.replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ')
phone_1 = utils.remove_spaces(phone_1)
fax = False
if phone_1.endswith(' u. fax'):
fax = True
phone_1 = ' '.join(phone_1.split(' ')[:-2])
if phone_1.replace(' ', '').replace('/', '').replace('-', '').isdigit() and len(phone_1) <= 20:
if phone_1[0] != '0' and '/' in phone_1:
for nr in phone_1.split('/'):
add_number(normalize_phone_nr(nr, ort), fax)
else:
add_number(normalize_phone_nr(phone_1, ort), fax)
elif re.fullmatch(r'0[0-9/ -]+ od\. 0[0-9/ -]+', phone_1):
parts = phone_1.split(' od. ')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[1], ort), fax)
elif re.fullmatch(r'0[0-9/ -]+ od\. [1-9][0-9/ -]+', phone_1):
parts = phone_1.split(' od. ')
add_number(normalize_phone_nr(parts[0], ort), False)
if parts[0][1] == '6':
add_number(normalize_phone_nr(parts[1], ort), fax)
else:
add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), fax)
elif re.fullmatch(r'0[0-9/ -]+ fax 0[0-9/ -]+', phone_1):
parts = phone_1.split(' fax ')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[1], ort), True, fax_only=True)
elif re.fullmatch(r'0[0-9/ -]+ fax [1-9][0-9/ -]+', phone_1):
parts = phone_1.split(' fax ')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), True, fax_only=True)
elif '-' in phone_1 and phone_1.endswith('fax'):
nr = re.sub(r'-+ ', '-', phone_1)
nr = ' '.join(nr.split(' ')[:-1])
add_number(normalize_phone_nr(nr.split('-')[0], ort), False)
add_number(normalize_phone_nr(nr, ort), True, fax_only=True)
elif 'fax -' in phone_1:
parts = phone_1.split('fax')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[0].strip() + parts[1].strip(), ort), True, fax_only=True)
elif phone_1.endswith('fax'):
nr = phone_1[:-3].strip()
add_number(normalize_phone_nr(nr), False)
add_number(normalize_phone_nr(nr), True, fax_only=True)
elif re.fullmatch(r'0[0-9/ -]+ u\. fax (od\. |u\. )?[0-9/ -]+', phone_1):
parts = phone_1.split(' ')
add_number(normalize_phone_nr(parts[0], ort), True)
nr = parts[-1]
if nr[0] == '0':
add_number(normalize_phone_nr(nr, ort))
else:
add_number(normalize_phone_nr(parts[0][:5] + nr, ort))
else:
parts = phone_1.split(' ')
if parts[-1].isalpha():
add_number(normalize_phone_nr(parts[0], ort), comment=parts[-1])
else:
for nr in parts:
add_number(normalize_phone_nr(nr, ort), fax)
if phone_3:
for nr in phone_3.split(','):
nr = nr.strip()
parts = nr.split(' ')
comment = None
if parts[-1].startswith('(') and parts[-1].endswith(')'):
nr = nr[:nr.rindex(' ')].strip()
comment = parts[-1][1:-1].strip()
elif parts[-1].isalpha():
nr = nr[:nr.rindex(' ')].strip()
comment = parts[-1].strip()
add_number(normalize_phone_nr(nr, ort), comment=comment)
count = 0
for nr, data in numbers.items():
if data['mobile']:
count += 1
f_tel.row(mgnr, count, 'mobile', nr, data['comment'])
if data['landline']:
count += 1
f_tel.row(mgnr, count, 'landline', nr, data['comment'])
if data['fax']:
count += 1
f_tel.row(mgnr, count, 'fax', nr, data['comment'])
else:
if phone_1:
phone_1, t, c = check_phone_nr(phone_1, mgnr, active)
if t is not None:
numbers.append(phone_1)
f_tel.row(mgnr, len(numbers), t, phone_1, c)
if phone_2:
phone_2, t, c = check_phone_nr(phone_2, mgnr, active)
if t is not None:
numbers.append(phone_2)
f_tel.row(mgnr, len(numbers), 'fax' if t == 'landline' else t, phone_2, c)
if phone_3:
if phone_3.startswith('Handy'):
phone_3 = phone_3[5:].strip(':').strip()
phone_3, t, c = check_phone_nr(phone_3, mgnr, active)
if t is not None and phone_3 not in numbers:
numbers.append(phone_3)
f_tel.row(mgnr, len(numbers), t, phone_3, c)
if phone_4:
phone_4, t, c = check_phone_nr(phone_4, mgnr, active)
if t is not None and phone_4 not in numbers:
numbers.append(phone_4)
f_tel.row(mgnr, len(numbers), t, phone_4, c)
for i, email in enumerate(emails):
f_email.row(mgnr, i + 1, email, None)
MEMBER_MAP[mgnr] = {
'default_kgnr': kgnr
}
if billing_name:
f_mba.row(mgnr, billing_name, AUSTRIA, postal_dest, address or '-')
def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]:
if nr_str is None:
return []
elif nr_str.isdigit() and len(nr_str) <= 6:
return [nr_str]
elif nr_str.count('/') == 1:
parts = nr_str.split('/')
if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3:
return [parts[0], parts[1]]
elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3:
return [nr_str]
if nr_str.count('/') > 1:
parts = nr_str.split('/')
if all([p.isdigit() for p in parts]):
if all([len(p) <= 1 for p in parts[1:]]):
return [f'{parts[0]}/{p}' for p in parts[1:]]
elif all([len(p) == len(parts[0]) for p in parts]):
return parts
if nr_str.startswith(f'{kgnr:05}'):
return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr)
if nr_str.endswith(' 2000'):
return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr)
parts = re.split(r' *[,;+&] *', nr_str)
if len(parts) == 1:
parts = nr_str.split(' / ')
if len(parts) == 1 and ' ' not in nr_str:
parts = nr_str.split(' ')
if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str:
parts = nr_str.split(' ')
if len(parts) > 1:
return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)]
m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str)
if m is not None:
b = m.group(1)
f = int(m.group(2))
t = int(m.group(3))
if t < f:
t += f - (f % pow(10, len(m.group(3))))
if t - f < 50:
return [
gst
for counter in range(f, t + 1)
for p in [f'{b or ""}{counter}']
for gst in parse_gstnrs(p, kgnr, mgnr)
]
invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}', None)
return []
def replace_nrs(m: re.Match, sep: str) -> str:
end = m.group(0).endswith(sep)
parts = [int(p) for p in m.group(0).split(sep)]
text = ''
last = None
for i, p in enumerate(parts):
if last is not None:
if last + 1 == p:
last = p
continue
else:
text += f'{last}{sep}'
last = None
if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]:
last = p
text += f'{p}-'
else:
text += f'{p}{sep}'
if last is not None:
text += str(last)
return text.strip().strip(sep) + (sep if end else '')
def format_gstnr(nrs: List[str]) -> Optional[str]:
if len(nrs) == 0:
return None
nrs = [re.sub(r'\b0+', '', nr)
for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr)
for nr in nrs])]
last = None
text = ''
for nr in nrs:
if last is None:
text += nr
elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]:
text += f'+{nr.split("/")[-1]}'
else:
text += f', {nr}'
last = nr
text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text)
text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text)
return text
reeds: Dict[int, Dict[int, str]] = {k: {r: n
for rk, r, n in REED_MAP.values() if rk == k}
for k in set([k for k, _, _ in REED_MAP.values()])}
new_reeds: Dict[Tuple[int, int], int] = {}
with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \
utils.csv_open(f'{out_dir}/wb_rd.csv', 'a+') as f_rd:
f_fb.header('fbnr', 'mgnr', 'vtrgid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr',
'year_from', 'year_to', 'comment')
for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None:
continue
parz: str = fb['Parzellennummer']
fbnr: int = fb['FBNR']
mgnr: int = fb['MGNR']
gem = GEM_MAP[fb['GNR']]
kgnrs = [kgnr for kgnr, gkz in gem]
rnr = fb['RNR']
rd_kgnr, rdnr, _ = REED_MAP.get(rnr, (None, None, None)) if rnr else (None, None, None)
if mgnr not in MEMBER_MAP:
continue
kgnr = None
if rd_kgnr is None:
kgnr = kgnrs[0]
elif rd_kgnr in kgnrs:
kgnr = rd_kgnr
elif (kgnrs[0], rnr) in new_reeds:
kgnr = kgnrs[0]
rdnr = new_reeds[(kgnr, rnr)]
else:
rname = lookup_rnr_name(rnr)
for k in kgnrs:
if k not in reeds:
continue
try:
pos = list(reeds[k].values()).index(rname)
r = list(reeds[k].keys())[pos]
kgnr = k
rdnr = r
new_reeds[(kgnr, rnr)] = rdnr
break
except ValueError:
continue
if kgnr is None:
kgnr = kgnrs[0]
rdnr = max([r for _, r, _ in REED_MAP.values() if k == kgnr] +
[r for (k, _), r in new_reeds.items() if k == kgnr]) + 1
f_rd.row(kgnr, rdnr, rname)
new_reeds[(kgnr, rnr)] = rdnr
new('Reed', (kgnr, rdnr), rname)
area = int(fb['Flaeche'])
if CLIENT == WG.MATZEN:
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
else:
gstnrs = []
comment, gstnr = None, None
if parz is None or parz == '0000':
if parz is not None:
invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None)
gstnrs = []
gstnr = '-'
if CLIENT == WG.MATZEN and len(gstnrs) == 0:
comment = f'KG {kgnr or 0:05}: {parz}'
gstnr = format_gstnr(gstnrs) or gstnr or parz
if parz != gstnr.replace('+', '/'):
convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr)
to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
attrid = ATTRIBUTE_MAP[fb['SANR']] if fb['SANR'] else None
if attrid == 'B':
attrid = None
f_fb.row(fbnr, mgnr, fb['SNR'] + (attrid or ''), CULTIVATION_MAP[fb['BANR'] or 1], area,
kgnr, gstnr, rdnr, fb['Von'], to, comment)
def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]:
dates = {}
fixed = {}
last_dates = {}
def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None:
if lsnr not in fixed:
fixed[lsnr] = []
dates[lsnr] = date
elif unique:
return add(lsnr + '/2', linr, date, unique)
fixed[lsnr].append(linr)
def get_lsnr(date: datetime.date, lsnr: str) -> str:
if date.year < 2000:
return date.strftime('%y%m%d00') + lsnr[8:]
else:
return date.strftime('%Y%m%d') + lsnr[8:]
deliveries: List[Tuple[int, str, datetime.date, int, int]] = [
(d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR'])
for d in deliveries
if (d['Lieferscheinnummer'] or (CLIENT == WG.WEINLAND and d['Oechsle'] and d['MGNR'])) and not d['Storniert']
]
lsnrs = {d[1] for d in deliveries}
local_lnr = 99
for lnr, lsnr, date, zwstid, mgnr in deliveries:
if CLIENT == WG.WEINLAND and not lsnr or len(lsnr) <= 8:
local_lnr += 1
add(f'{date:%Y%m%d}G{local_lnr:03}', lnr, date)
continue
elif len(lsnr) < 8:
continue
if lsnr.startswith('22'):
lsnr = '20' + lsnr[2:]
lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \
else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6]))
lsnr_zwstid = lsnr[8]
if lsnr_zwstid != zwstid and lsnr_zwstid in BRANCH_MAP.values():
zwstid = lsnr_zwstid
if len(lsnr) == 12:
if date != lsdate:
if date.year == lsdate.year:
lsnr_n = get_lsnr(date, lsnr)
if lsnr_n not in lsnrs:
lsnr = lsnr_n
else:
warning_delivery(lsnr, mgnr, 'date', date)
else:
date = datetime.date(lsdate.year, date.month, date.day)
if zwstid not in last_dates or not date < last_dates[zwstid]:
last_dates[zwstid] = date
add(lsnr, lnr, date, unique=True)
else:
add(lsnr[:12], lnr, date)
return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()],
key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0])
def migrate_deliveries(in_dir: str, out_dir: str) -> None:
global DELIVERY_MAP, MODIFIER_MAP, SORT_MAP
DELIVERY_MAP, MODIFIER_MAP, SORT_MAP = {}, {}, {'HU/': 'GV/HU', 'SV/': 'SW/', 'MEF/B': 'ME/F', 'CSF/B': 'CS/F'}
modifiers = {
m['ASNR']: m
for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv')
if m['Bezeichnung'] and m['Bezeichnung'] != '-'
}
seasons = {}
branches = {}
for mod in modifiers.values():
name: str = mod['Bezeichnung'].replace('ausser', 'außer')
nr: int = mod['ASNR']
MODIFIER_MAP[name] = mod
if CLIENT == WG.MATZEN:
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS'
elif CLIENT == WG.WINZERKELLER:
mod['id'] = {
1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG',
5: 'VT', 6: 'MV', 7: 'UP', 8: 'VL',
9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG',
13: 'KU',
}[nr]
elif CLIENT == WG.BADEN:
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'UE'
elif CLIENT == WG.WEINLAND:
mod['id'] = {
1: 'PZS', 2: 'TB', 3: 'LM'
}[nr]
else:
raise NotImplementedError(f'Modifier migration for {CLIENT} not yet implemented')
deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
delivery_dict = {d['LINR']: d for d in deliveries}
fixed = fix_deliveries(deliveries)
updated_varieties = {}
with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \
utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part:
f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment')
f_part.header('year', 'did', 'dpnr', 'sortid', 'attrid', 'cultid', 'weight', 'kmw', 'qualid',
'hkid', 'kgnr', 'rdnr', 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen',
'gebunden', 'temperature', 'acid', 'scale_id', 'weighing_data', 'weighing_reason', 'comment')
for lsnr, linrs, date in fixed:
if date.year not in seasons:
seasons[date.year] = {
'currency': 'EUR' if date.year >= 2001 else 'ATS',
'precision': WGMASTER_PRECISION,
'start': date,
'end': date,
'nr': 0,
}
s = seasons[date.year]
if date > s['end']:
s['end'] = date
s['nr'] += 1
snr = s['nr']
mgnr = delivery_dict[linrs[0]]['MGNR']
znr = delivery_dict[linrs[0]]['ZNR'] or (1 if CLIENT == WG.WEINLAND else None)
glob_waage = set(delivery_dict[linr]['Waagentext'] for linr in linrs if delivery_dict[linr]['Waagentext'])
zwstid = lsnr[8]
if zwstid not in branches:
branches[zwstid] = {}
if date not in branches[zwstid]:
branches[zwstid][date] = 0
branches[zwstid][date] += 1
lnr = branches[zwstid][date]
if BRANCH_MAP[znr] != zwstid:
if zwstid not in BRANCH_MAP.values():
zwstid = BRANCH_MAP[znr]
comments = []
attributes = set()
for dpnr, linr in enumerate(linrs, start=1):
d = delivery_dict[linr]
DELIVERY_MAP[linr] = (date.year, snr, dpnr)
if lsnr != d['Lieferscheinnummer']:
renumber_delivery(d['Lieferscheinnummer'] or '', lsnr)
oe = d['OechsleOriginal'] or d['Oechsle']
kmw = GRADATION_MAP[oe]
sortid = d['SNR'].upper()
attrid = ATTRIBUTE_MAP[d['SANR']] if d['SANR'] else None
cultid = None
if attrid == 'B':
cultid = 'B'
elif attrid:
attributes.add(attrid)
if len(sortid) != 2:
attributes.add(sortid[2:])
sortid = sortid[:2]
if CLIENT == WG.MATZEN:
if sortid == 'HU':
# Gr.Veltliner (Huber)
sortid = 'GV'
attributes.remove('B')
attributes.add('HU')
elif sortid == 'SV':
sortid = 'SW'
elif sortid == 'WC':
# WEIẞBURGUNDER/CHARDONNAY
sortid = 'SW'
if 'H' in attributes:
attributes.remove('H')
attributes.add('HK')
if 'W' in attributes:
attributes.remove('W')
elif CLIENT == WG.BADEN:
if sortid == 'GO':
sortid = 'SO'
if d['SNR'] != sortid:
SORT_MAP[f'{d["SNR"]}/{attrid or ""}'] = f'{sortid}/{",".join(list(attributes)) or ""}'
line = f'{d["SNR"]}/{attrid} -> {sortid}/{",".join(list(attributes)) or None}'
if line not in updated_varieties:
updated_varieties[line] = 0
updated_varieties[line] += 1
if d['QSNR'] is None:
warning_delivery(lsnr, mgnr, 'qualid', 'UNSET')
if d['Oechsle'] >= 86:
qualid = 'KAB'
else:
qualid = QUAL_MAP[d['QSNR']]
if qualid != 'WEI' and d['Abgewertet']:
if qualid == 'RSW':
qualid = 'WEI'
else:
warning_delivery(lsnr, mgnr, 'qualid', f'{qualid} (abgewertet)')
qualid = 'WEI'
kgnr, rdnr = None, None
if d['GNR']:
gem = GEM_MAP.get(d['GNR'], [])
if len(gem) == 1:
kgnr = gem[0][0]
if d['RNR']:
kgnr, rdnr, _ = REED_MAP[d['RNR']]
if kgnr is None:
m = MEMBER_MAP[mgnr]
kgnr = m['default_kgnr']
if kgnr is None:
pass
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
warning_delivery(lsnr, mgnr, 'KGNr.', kgnr)
kgnr = None
hkid = lookup_hkid(kgnr, qualid)
handwiegung = d['Handwiegung'] or False
waage = list(glob_waage)[0] if len(glob_waage) == 1 else d['Waagentext']
scale_id, weighing_data = None, None
if waage:
# Waagenr: 1 ID: 19
# Waagennummer: 1 Speichernummer: 9166
# 1
waage = re.split(r' +', waage)
scale_id = waage[1] if len(waage) > 2 else '1'
weighing_id = waage[-1] if len(waage) > 2 and waage[2] == 'Speichernummer:' else f'{date}/{waage[-1]}'
weighing_data = f'{{"id":"{weighing_id}","nr":{waage[-1]}}}'
elif len(glob_waage) == 0 and not handwiegung:
handwiegung = True
comment: Optional[str] = d['Anmerkung']
acid = d['Säure']
hand, lesewagen = None, None
if comment:
comment = comment.replace('Söure', 'Säure')
if comment.startswith('Säure'):
acid = float(comment.split(' ')[-1].replace(',', '.'))
comment = None
elif comment in ('Maschine', 'Masschine'):
hand = False
comment = None
elif comment == 'Hand':
hand = True
comment = None
elif comment == '.':
comment = None
elif comment in ('LW', 'lw'):
lesewagen = True
comment = None
elif 'LW' in comment:
lesewagen = True
comment = comment.replace('LW', '').strip()
if comment == '':
comment = None
if comment:
comments.append(comment)
gerebelt = True if CLIENT == WG.MATZEN or (CLIENT == WG.WINZERKELLER and zwstid == 'W') else d['Gerebelt'] or False
gebunden = None if CLIENT in (WG.MATZEN, WG.WINZERKELLER) else d['Gebunden']
if len(attributes) > 1:
print("ERROR: ", attributes)
attrid = attributes.pop() if len(attributes) == 1 else None
f_part.row(
date.year, snr, dpnr, sortid, attrid, cultid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr,
gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False,
hand, lesewagen, gebunden, d['Temperatur'], acid, scale_id, weighing_data, None, comment
)
f_delivery.row(date.year, snr, date, d['Uhrzeit'], zwstid, lnr, lsnr, mgnr, '; '.join(comments) or None)
for k, v in updated_varieties.items():
print(k + (f' ({v} times)' if v > 1 else ''))
with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
f_part_mod.header('year', 'did', 'dpnr', 'modid')
for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
if m['LINR'] not in DELIVERY_MAP or m['ASNR'] not in modifiers:
continue
y, did, dpnr = DELIVERY_MAP[m['LINR']]
f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id'])
with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
f_season.header('year', 'currency', 'precision', 'max_kg_per_ha', 'vat_normal', 'vat_flatrate',
'min_kg_per_bs', 'max_kg_per_bs', 'penalty_per_kg', 'penalty_amount', 'penalty_none',
'penalty_per_bs_amount', 'penalty_per_bs_none', 'start_date', 'end_date')
f_mod.header('year', 'modid', 'ordering', 'name', 'abs', 'rel', 'active')
for y, s in seasons.items():
f_season.row(y, s['currency'], s['precision'], 10_000, 0.10, 0.13,
PARAMETERS['LIEFERPFLICHT/GA1'], PARAMETERS['LIEFERRECHT/GA1'],
None, None, None, None, None, s['start'], s['end'])
for m in modifiers.values():
abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
rel_v = m['AZASProzent'] / 100.0 if m['AZASProzent'] is not None else None
f_mod.row(y, m['id'], m['ASNR'], m['Bezeichnung'], abs_v, rel_v, True)
def migrate_payments(in_dir: str, out_dir: str) -> None:
variant_map: Dict[int, Tuple[int, int]] = {}
variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {}
year_map = {}
az_map = {}
p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv'))
sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])}
p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv'))
qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])}
def collapse_data(data: dict[str, Any]):
rev = {}
for k, v in data.items():
if k == 'default' or k.startswith('/'):
continue
rev[v] = rev.get(v, [])
rev[v].append(k)
if 'default' not in data.keys():
if len(rev) == 1:
return set(rev.keys()).pop()
for v, ks in rev.items():
if len(ks) >= len(data) / 2:
for k in ks:
del data[k]
data['default'] = v
return collapse_data(data)
for idx1 in {'/' + k.split('/')[1] for k in data.keys() if '/' in k and len(k) > 3}:
len1 = len(list(k for k, _ in data.items() if k.endswith(idx1)))
for v, ks in rev.items():
my_ks = list(k for k in ks if k.endswith(idx1))
if len(my_ks) > 1 and len(my_ks) >= len1 / 2:
for k in my_ks:
del data[k]
data[idx1] = v
return data
def collapse_curve(curve: list[float]):
n = {}
d = 0
for oe, p0, p1, p2 in zip(range(0, len(curve) + 1), [0] + curve, curve, curve[1:] + [curve[len(curve) - 1]]):
d1, d2 = round(p1 - p0, WGMASTER_PRECISION), round(p2 - p1, WGMASTER_PRECISION)
if d1 == d:
continue
d = d2
if p0 > 0:
n[f'{oe - 1}oe'] = p0
n[f'{oe}oe'] = p1
if curve[len(curve) - 1] > 0:
n[f'{len(curve) - 1}oe'] = curve[len(curve) - 1]
keys = list(n.keys())
vals = list(n.values())
while len(n) >= 2 and vals[0] == vals[1]:
del n[keys[0]]
del n[keys[1]]
n = {keys[1]: vals[1], **n}
keys = list(n.keys())
vals = list(n.values())
while len(n) >= 2 and vals[len(vals) - 1] == vals[len(vals) - 2]:
del n[keys[len(vals) - 1]]
del n[keys[len(vals) - 2]]
n = {**n, keys[len(vals) - 2]: vals[len(vals) - 2]}
keys = list(n.keys())
vals = list(n.values())
if len(n) == 0:
n = {'15kmw': 0}
elif len(n) == 1:
n = {'15kmw': list(n.values())[0]}
return n
with (utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment):
f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time', 'comment', 'data')
for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'):
year = p['Lesejahr']
if year is None:
continue
if year not in year_map:
year_map[year] = 0
year_map[year] += 1
variant_map[p['AZNR']] = (year, year_map[year])
var = p.copy()
del var['AZNR']
del var['Datum']
del var['Beschreibung']
del var['Lesejahr']
del var['Titel']
del var['TeilzahlungNr']
data = {
'mode': 'wgmaster',
**var,
'AuszahlungSorten': {},
'AuszahlungSortenQualitätsstufe': {},
}
gb = data['Grundbetrag'] or 0
gbzs = data['GBZS']
azs = data['AuszahlungSorten']
for s in sort_map.get(p['AZNR'], []):
del s['AZNR']
del s['ID']
if s['Oechsle'] is None:
continue
attrid = ATTRIBUTE_MAP[s['SANR']] if s['SANR'] else None
key = SORT_MAP.get(f'{s["SNR"]}/{attrid or ""}', f'{s["SNR"].upper()}/{attrid or ""}')
if key is None or len(key) < 3:
continue
azs[key] = azs.get(key, {'Gebunden': {}, 'NichtGebunden': {}})
azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = round(s['Betrag'] + gb, WGMASTER_PRECISION)
curves = []
curve_zero = False
for key, d1 in azs.items():
oe = [d1['NichtGebunden'].get(n, 0.0) for n in range(max(d1['NichtGebunden'].keys()) + 1)]
if len(d1['Gebunden']) > 0:
oe_geb = [d1['Gebunden'].get(n, 0.0) for n in range(max(d1['Gebunden'].keys()) + 1)]
else:
oe_geb = None
if len(set(oe)) <= 2 and oe[0] == 0 and \
(oe_geb is None or (len(set(oe_geb)) <= 2 and oe[len(oe) - 1] == oe_geb[len(oe_geb) - 1])):
azs[key] = oe[len(oe) - 1]
if azs[key] == 0:
azs[key] = 'curve:0'
curve_zero = True
else:
c = (oe, oe_geb)
if c not in curves:
curves.append(c)
azs[key] = f'curve:{curves.index(c) + 1}'
data['AuszahlungSorten'] = collapse_data(azs)
for i, cs in enumerate(curves):
c, c_geb = cs
geb = None
if c_geb is not None:
diff = {round(b - a, WGMASTER_PRECISION) for a, b in zip(c, c_geb)}
diff.remove(0.0)
if len(diff) == 1:
geb = diff.pop()
elif len(diff) > 1:
geb = collapse_curve(c_geb)
curves[i] = {
'id': i + 1,
'mode': 'oe',
'data': collapse_curve(c),
}
if geb is not None:
curves[i]['geb'] = geb
if curve_zero:
curves.insert(0, {
'id': 0,
'mode': 'oe',
'data': gb,
'geb': gbzs or 0
})
data['Kurven'] = curves
azq = data['AuszahlungSortenQualitätsstufe']
for q in qual_map.get(p['AZNR'], []):
del q['AZNR']
del q['ID']
qualid = QUAL_MAP[q['QSNR']]
attrid = ATTRIBUTE_MAP[s['SANR']] if s['SANR'] else None
key = SORT_MAP.get(f'{q["SNR"]}/{attrid or ""}', f'{q["SNR"].upper()}/{attrid or ""}')
if key is None or len(key) < 3:
continue
azq[qualid] = azq.get(qualid, {})
azq[qualid][key] = round((q['Betrag'] or 0) + gb, WGMASTER_PRECISION)
for qualid, d1 in azq.items():
azq[qualid] = collapse_data(d1)
for data_k, data_v in data.copy().items():
if data_v is None or isinstance(data_v, bool) and not data_v:
del data[data_k]
az_map[p['AZNR']] = data
test = (p['TeilzahlungNr'] == 7)
if not test:
if year not in variant_year_map:
variant_year_map[year] = []
variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr']))
dmp = json.dumps(data).replace('/B', '-B').replace('/"', '"').replace('/-', '-')
f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None, p['Beschreibung'], dmp)
with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay, \
utils.csv_open(f'{out_dir}/delivery_part_bucket.csv') as f_bucket:
f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'net_amount')
f_bucket.header('year', 'did', 'dpnr', 'bktnr', 'discr', 'value')
deliveries = {d['LINR']: d for d in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')}
for linr, (y, did, dpnr) in DELIVERY_MAP.items():
p = deliveries[linr]
if y not in variant_year_map:
continue
gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden'])
b1 = gew - geb_gew
b2 = geb_gew
f_bucket.row(y, did, dpnr, 0, '_', b1)
attrid = ATTRIBUTE_MAP[p['SANR']] if p['SANR'] else None
f_bucket.row(y, did, dpnr, 1, attrid or '', b2)
for aznr, avnr, tznr in variant_year_map[y]:
val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung']
val = round(val * pow(10, WGMASTER_PRECISION))
f_del_pay.row(y, did, dpnr, avnr, val)
def migrate_parameters(in_dir: str, out_dir: str) -> None:
global PARAMETERS
PARAMETERS = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')}
name = PARAMETERS['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und').replace(' Im ', ' im ')
suffix = PARAMETERS['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '').split(',')[-1]
types = {
'reggenmbh': 'reg. Gen.m.b.H.'
}
tokens = {
WG.MATZEN: ('WGM', 'WG Matzen'),
WG.WINZERKELLER: ('WKW', 'Winzerkeller'),
WG.WEINLAND: ('WGW', 'WG Weinland'),
WG.BADEN: ('WGB', 'WG Baden')
}.get(CLIENT, (None, None))
ort = PARAMETERS['MANDANTENORT'].title()
new_params: Dict[str, Optional[str]] = {
'CLIENT_NAME_TOKEN': tokens[0],
'CLIENT_NAME_SHORT': tokens[1],
'CLIENT_NAME': name,
'CLIENT_NAME_SUFFIX': None,
'CLIENT_NAME_TYPE': types[suffix],
'CLIENT_PLZ': PARAMETERS['MANDANTENPLZ'],
'CLIENT_ORT': ort,
'CLIENT_ADDRESS': PARAMETERS['MANDANTENSTRASSE'],
'CLIENT_IBAN': None,
'CLIENT_BIC': None,
'CLIENT_USTIDNR': PARAMETERS['MANDANTENUID'].replace(' ', ''),
'CLIENT_LFBISNR': PARAMETERS['MANDANTENBETRIEBSNUMMER'],
'CLIENT_PHONE': normalize_phone_nr(PARAMETERS['MANDANTENTELEFON'], ort),
'CLIENT_FAX': normalize_phone_nr(PARAMETERS['MANDANTENTELEFAX'], ort),
'CLIENT_EMAIL': PARAMETERS['MANDANTENEMAIL'],
'CLIENT_WEBSITE': PARAMETERS.get('MANDANTENHOMEPAGE', None),
'DOCUMENT_SENDER': PARAMETERS.get('ABSENDERTEXT2', None),
'TEXT_DELIVERYNOTE': PARAMETERS.get('LIEFERSCHEINTEXT', None).replace(' daß ', ' dass ').replace('obige Angaben maßgeblicher Veränderungen', 'maßgeblichen Veränderungen obiger Angaben'),
'TEXT_DELIVERYCONFIRMATION': PARAMETERS.get('ANLIEFTEXT', None),
'TEXT_CREDITNOTE': PARAMETERS.get('AUSZAHLUNGTEXT', None),
}
with utils.csv_open(f'{out_dir}/client_parameter.csv') as f:
f.header('param', 'value')
for param, value in new_params.items():
f.row(param, value)
def main() -> None:
global DB_CNX, QUIET, CLIENT
parser = argparse.ArgumentParser()
parser.add_argument('in_dir', type=str,
help='The input directory where the exported csv files are stored')
parser.add_argument('out_dir', type=str,
help='The output directory where the migrated csv file should be stored')
parser.add_argument('-q', '--quiet', action='store_true', default=False,
help='Be less verbose')
parser.add_argument('-d', '--database', metavar='DB', required=True,
help='The sqlite database file to look up information')
parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str,
choices=[wg.name for wg in WG])
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
QUIET = args.quiet
CLIENT = WG.from_str(args.genossenschaft)
DB_CNX = sqlite3.connect(args.database)
migrate_parameters(args.in_dir, args.out_dir)
migrate_gradation(args.in_dir, args.out_dir)
migrate_branches(args.in_dir, args.out_dir)
migrate_grosslagen(args.in_dir, args.out_dir)
migrate_gemeinden(args.in_dir, args.out_dir)
migrate_reeds(args.in_dir, args.out_dir)
migrate_attributes(args.in_dir, args.out_dir)
migrate_cultivations(args.in_dir, args.out_dir)
migrate_area_commitment_types(args.in_dir, args.out_dir)
migrate_members(args.in_dir, args.out_dir)
migrate_area_commitments(args.in_dir, args.out_dir)
migrate_deliveries(args.in_dir, args.out_dir)
migrate_payments(args.in_dir, args.out_dir)
DB_CNX.close()
if __name__ == '__main__':
main()