1825 lines
76 KiB
Python
Executable File
1825 lines
76 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Dict, Any, Tuple, Optional, List, Iterable
|
|
from enum import Enum
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
import sqlite3
|
|
import requests
|
|
import datetime
|
|
import json
|
|
import string
|
|
|
|
import utils
|
|
|
|
|
|
class WG(Enum):
|
|
MATZEN = 1
|
|
WINZERKELLER = 2
|
|
|
|
@classmethod
|
|
def from_str(cls, name: str):
|
|
return cls({wg.name: wg.value for wg in WG}[name])
|
|
|
|
|
|
DB_CNX: Optional[sqlite3.Connection] = None
|
|
QUIET: bool = False
|
|
CLIENT: Optional[WG] = None
|
|
|
|
USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}')
|
|
BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
|
|
IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
|
|
EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}')
|
|
|
|
GRADATION_MAP: Optional[Dict[float, float]] = None
|
|
CULTIVATION_MAP: Optional[Dict[int, str]] = None
|
|
BRANCH_MAP: Optional[Dict[int, str]] = None
|
|
GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None
|
|
REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None
|
|
GROSSLAGE_MAP: Optional[Dict[int, int]] = None
|
|
MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None
|
|
GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None
|
|
DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None
|
|
MODIFIER_MAP: Optional[Dict[str, Dict]] = None
|
|
SORT_MAP: Optional[Dict[str, str]] = None
|
|
PARAMETERS: Optional[Dict[str, str]] = None
|
|
|
|
AUSTRIA = 40
|
|
WGMASTER_PRECISION = 4
|
|
|
|
QUAL_MAP: Dict[int, str] = {
|
|
0: 'WEI',
|
|
1: 'RSW',
|
|
2: 'LDW',
|
|
3: 'QUW',
|
|
4: 'KAB',
|
|
5: 'SPL',
|
|
}
|
|
|
|
ORT_NAMES: Dict[str, Optional[str]] = {
|
|
'Pirawarth': None,
|
|
'Raggendorf': None,
|
|
'Matzen': 'Matzner',
|
|
'Matzn': None,
|
|
'Stillfried': None,
|
|
'Harras': None,
|
|
'Gänserndorf': None,
|
|
'Sulz': None,
|
|
'Brünn': None,
|
|
'Wien': None,
|
|
'Angern': None,
|
|
'Schweinbarth': None,
|
|
'Hohenruppersdorf': None,
|
|
'Grub': None,
|
|
'Auersthal': None,
|
|
'Ollersdorf': None,
|
|
'Spannberg': None,
|
|
'Ebenthal': None,
|
|
'Bockfließ': None,
|
|
'Dörfless': 'Dörfleser',
|
|
'Dörfles': None,
|
|
'Ableiding': None,
|
|
'Absberg': None,
|
|
'Eibesbrunn': None,
|
|
'Engersdorf': None,
|
|
'Enzersfeld': None,
|
|
'Großebersdorf': None,
|
|
'Hollabrunn': None,
|
|
'Korneuburg': None,
|
|
'Königsbrunn': None,
|
|
'Laa': None,
|
|
'Leopoldau': None,
|
|
'Manhartsbrunn': None,
|
|
'Mannhartsbrunn': 'Manhartsbrunner',
|
|
'Münichsthal': None,
|
|
'Pernau': None,
|
|
'Pillichsdorf': None,
|
|
'Retz': None,
|
|
'Russbach': None,
|
|
'Schleinbach': None,
|
|
'Seefeld': None,
|
|
'Seyring': None,
|
|
'Stammersdorf': None,
|
|
'Stelzendorf': None,
|
|
'Traunfeld': None,
|
|
'Tresdorf': None,
|
|
'Trumau': None,
|
|
'Wolkersdorf': None,
|
|
'Znaim': None,
|
|
'Obersdorf': None,
|
|
'Sechshaus': None,
|
|
'Rußbach': None,
|
|
}
|
|
|
|
STREET_NAMES: Dict[str, str] = {
|
|
'Hans-Wagnerstraße': 'Hans-Wagner-Straße',
|
|
'J.Seitzstraße': 'Josef-Seitz-Straße',
|
|
'Kurhaus-Str.': 'Kurhausstraße',
|
|
'Kurhaus-Straße': 'Kurhausstraße',
|
|
'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße',
|
|
'Flustraße': 'Flurstraße',
|
|
'St.Laurentstraße': 'St.-Laurentstraße',
|
|
'Josef Seitzstraße': 'Josef-Seitz-Straße',
|
|
'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße',
|
|
'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße',
|
|
'R. Virchow-Straße': 'Rudolf-Virchow-Straße',
|
|
'Grubere Hauptstraße': 'Gruber Hauptstraße',
|
|
'Groß Inzersdorf': 'Großinzersdorf',
|
|
'Erdpress': 'Erdpreß',
|
|
'Hochleitengasse': 'Hochleithengasse',
|
|
'Bei Der Gösselmühle': 'Bei der Gösslmühle',
|
|
'Dr. Peschlstraße': 'Dr.-Peschl-Straße',
|
|
'Dr.Peschlstraße': 'Dr.-Peschl-Straße',
|
|
'Dr. Salzbornstraße': 'Dr.-Salzborn-Straße',
|
|
'Elsa Brandström-Straße': 'Elsa-Brandström-Straße',
|
|
'Franz Ecker Siedlung': 'Franz-Ecker-Siedlung',
|
|
'Franz-Ecker Siedlung': 'Franz-Ecker-Siedlung',
|
|
'Franz Gillygasse': 'Franz-Gilly-Gasse',
|
|
'Franz V. Zülowstraße': 'Franz-von-Zülow-Straße',
|
|
'Gr. Nondorf': 'Großnondorf',
|
|
'In Der Trift': 'In der Trift',
|
|
'Johann Degengasse': 'Johann-Degen-Gasse',
|
|
'Josef Fürnkranz Siedlung': 'Josef-Fürnkranz-Siedlung',
|
|
'Kaiser Franz Josef Platz': 'Kaiser-Franz-Josef-Platz',
|
|
'Klein Haugsdorf': 'Kleinhaugsdorf',
|
|
'Leopold Leuthnerstraße': 'Leopold-Leuthner-Straße',
|
|
'Lh.-Mayer-Platz': 'Landeshauptmann-Mayer-Platz',
|
|
'Manhartsbr.Straße': 'Manhartsbrunner Straße',
|
|
'Maria Lourd Weg': 'Maria-Lourd-Weg',
|
|
'U. Weißgasse Straße': 'Untere Weißgerberstraße',
|
|
'Dr. Josef-Levit-Straße': 'Dr.-Josef-Levit-Straße',
|
|
'Karl Katschthaler-Straße': 'Karl-Katschthaler-Straße',
|
|
}
|
|
|
|
|
|
def new(t: str, ids: Any, name: str, comment: str = None) -> None:
|
|
print(f'\x1B[1;32mNew {t:>6}: {str(ids):>10} ({name}{", " + comment if comment else ""})\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def success(mgnr: int, key: str, value) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1;32m{mgnr:>6} : {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def warning(mgnr: int, key: str, value, active: bool) -> None:
|
|
act = 'A' if active else '?' if active is None else ' '
|
|
print(f'\x1B[1;33m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def invalid(mgnr: int, key: str, value, active: Optional[bool]) -> None:
|
|
act = 'A' if active else '?' if active is None else ' '
|
|
print(f'\x1B[1;31m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1m{lsnr_1:<15} -> {lsnr_2:<15}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
|
|
print(f'\x1B[1;33m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
|
|
print(f'\x1B[1;31m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def convert(mgnr: int, key: str, old_value: str, new_value) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1m{mgnr:>6} : {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str],
|
|
billing: Optional[str] = None) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1m{mgnr:>6} : '
|
|
f'{" / ".join([e or "" for e in old_name])} -> '
|
|
f'{" / ".join([e or "" for e in new_name])}'
|
|
f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def check_lfbis_nr(nr: str) -> bool:
|
|
# https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41
|
|
if len(nr) != 7 or not nr.isdigit():
|
|
return False
|
|
s = 0
|
|
for i, ch in enumerate(nr[:-1]):
|
|
s += int(ch) * (7 - i)
|
|
v = (11 - (s % 11)) % 10
|
|
return v == int(nr[-1])
|
|
|
|
|
|
def check_ustid_nr_at(nr: str) -> bool:
|
|
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
|
|
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
|
|
return False
|
|
s = 0
|
|
for i, ch in enumerate(nr[3:-1]):
|
|
s += sum(map(int, str(int(ch) * (i % 2 + 1))))
|
|
v = (96 - s) % 10
|
|
return v == int(nr[-1])
|
|
|
|
|
|
def modulo(a: str, b: int) -> int:
|
|
s = 0
|
|
for ch in a:
|
|
s = (s * 10 + int(ch)) % b
|
|
return s
|
|
|
|
|
|
def check_iban(iban: str) -> bool:
|
|
if not IBAN_RE.fullmatch(iban):
|
|
return False
|
|
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
|
|
return modulo(s, 97) == 1
|
|
|
|
|
|
def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]:
|
|
if nr is None:
|
|
return None
|
|
nr = nr.replace('/', ' ').strip()
|
|
if nr.count('-') > 1 or len(nr.split('-')[-1]) > 3:
|
|
nr = nr.replace('-', '')
|
|
if nr[0] == '0':
|
|
nr = '+43 ' + nr[1:]
|
|
elif nr.startswith('43'):
|
|
nr = '+' + nr
|
|
elif CLIENT == WG.WINZERKELLER and ort:
|
|
ort = ort.upper().strip()
|
|
if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF',
|
|
'EIBESBRUNN'):
|
|
nr = f'+43 2245 {nr}'
|
|
elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'):
|
|
nr = f'+43 2944 {nr}'
|
|
elif ort in ('HADRES'):
|
|
nr = f'+43 2943 {nr}'
|
|
else:
|
|
print(nr, ort)
|
|
raise RuntimeError()
|
|
if nr.startswith('+43'):
|
|
if nr[4] == '6':
|
|
nr = nr.replace(' ', '')
|
|
nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}'
|
|
elif nr[4] == '1':
|
|
nr = nr.replace(' ', '')
|
|
nr = f'{nr[:3]} {nr[3]} {nr[3:]}'
|
|
elif nr[4] == '2':
|
|
nr = nr.replace(' ', '')
|
|
nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}'
|
|
return nr.strip()
|
|
|
|
|
|
def fix_street_name(name: str) -> str:
|
|
if name in STREET_NAMES:
|
|
return STREET_NAMES[name]
|
|
orte = [(k, v) for k, v in ORT_NAMES.items() if name.startswith(k + 'er')]
|
|
if (name.endswith('straße') or name.endswith('platz')) and len(orte) == 1:
|
|
return f'{orte[0][1] or orte[0][0] + "er"} {name[len(orte[0][0]) + 2:].title()}'.replace(' ', ' ')
|
|
return name
|
|
|
|
|
|
def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
|
|
r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/')
|
|
if r.status_code != 200:
|
|
return None
|
|
data = r.json()
|
|
return sum([n['fl'] for n in data['properties']['nutzungen']])
|
|
|
|
|
|
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
|
|
members = {}
|
|
for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
|
|
if f['MGNR'] not in members:
|
|
members[f['MGNR']] = {}
|
|
members[f['MGNR']][f['FBNR']] = f
|
|
return members
|
|
|
|
|
|
def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]:
|
|
if plz is None or ort is None:
|
|
return None
|
|
ort = ort.replace('0', 'O').replace('SZ', 'SS')
|
|
if ort.upper() == 'PILLICHSDORF' and plz == 2212:
|
|
plz = 2211
|
|
elif ort.upper() == 'ENZERSFELD' and plz == 2203:
|
|
plz = 2202
|
|
elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212:
|
|
ort = 'GROSSENGERSDORF'
|
|
elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123:
|
|
plz = 2122
|
|
elif ort.upper() == 'FRAUENDORF' and plz == 3710:
|
|
plz = 3714
|
|
elif ort.upper() == 'MAISSAU' and plz == 3721:
|
|
ort = 'UNTERDÜRNBACH'
|
|
elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074:
|
|
plz = 2070
|
|
elif ort.upper() == 'DROSENDORF' and plz == 2095:
|
|
ort = 'DROSENDORF ALTSTADT'
|
|
elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033:
|
|
plz = 2023
|
|
elif ort.upper() == 'NIEDERSCHLEINZ' and plz == 3721:
|
|
plz = 3714
|
|
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,))
|
|
rows: List[Tuple[int, str, str]] = cur.fetchall()
|
|
cur.close()
|
|
|
|
ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss')
|
|
rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
|
|
if len(rows_m) > 1:
|
|
rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
|
|
if len(rows_m) == 1:
|
|
return plz * 100000 + rows_m[0]
|
|
|
|
if ort == 'VELM-GÖTZENDORF':
|
|
parts = address.split(' ')
|
|
street = ' '.join(parts[:-1])
|
|
nr = int(parts[-1].split('-')[0])
|
|
if street == 'Landstraße' and nr <= 48 \
|
|
or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \
|
|
or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)):
|
|
# Velm
|
|
return plz * 100000 + 3572
|
|
else:
|
|
# Götzendorf
|
|
return plz * 100000 + 3571
|
|
|
|
raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})')
|
|
|
|
|
|
def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
|
|
if okz is None:
|
|
return None
|
|
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,))
|
|
rows: List[Tuple[int]] = cur.fetchall()
|
|
cur.close()
|
|
|
|
if len(rows) == 1:
|
|
return rows[0][0]
|
|
return None
|
|
|
|
|
|
def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
|
|
gem_name, hkid = None, None
|
|
if CLIENT == WG.MATZEN:
|
|
hkid = "'WLWV'"
|
|
if name.lower() == 'dörfles':
|
|
gem_name = 'Weikendorf'
|
|
elif name.lower() == 'velm-götzendorf':
|
|
return [(6027, 30859), (6007, 30859)]
|
|
elif name.lower() == 'grub':
|
|
name = 'Grub an der March'
|
|
elif CLIENT == WG.WINZERKELLER:
|
|
hkid = "'WLWV', 'WIEN', 'WLWG'"
|
|
if name.endswith('*'):
|
|
name = name[:-1].strip()
|
|
if name.lower() == 'joching':
|
|
return [(12185, 31351)]
|
|
elif name.lower() == 'kreuttal':
|
|
return [(15206, 31627), (15221, 31627), (15226, 31627)]
|
|
elif name.lower() == 'hochleithen':
|
|
return [(15219, 31622), (15223, 31622), (15202, 31622)]
|
|
elif name.lower() == 'wolfpassing':
|
|
gem_name = 'Hochleithen'
|
|
elif name.lower() == 'seebarn':
|
|
gem_name = 'Harmannsdorf'
|
|
elif name.lower() == 'königsbrunn':
|
|
gem_name = 'Enzersfeld im Weinviertel'
|
|
elif name.lower() == 'wien':
|
|
return [(1616, 90001), (1617, 90001)]
|
|
elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'):
|
|
gem_name = 'Sitzendorf an der Schmida'
|
|
elif name.lower() == 'dietersdorf':
|
|
gem_name = 'Hollabrunn'
|
|
elif name.lower() == 'altenmarkt':
|
|
name = 'Altenmarkt im Thale'
|
|
elif name.lower() == 'eitzerstal':
|
|
name = 'Eitzersthal'
|
|
elif name.lower() == 'gross':
|
|
gem_name = 'Hollabrunn'
|
|
elif name.lower() == 'auggenthal':
|
|
name = 'Augenthal'
|
|
elif name.lower() == 'karlsdorf':
|
|
name = 'Pfaffendorf'
|
|
elif name.lower() == 'kleinhaugsdorf':
|
|
name = 'Augenthal'
|
|
elif name.lower() == 'merkersdorf':
|
|
gem_name = 'Hardegg'
|
|
elif name.lower() == 'retz':
|
|
name = 'Retz Altstadt'
|
|
elif name.lower() == 'heldenberg':
|
|
return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)]
|
|
elif name.lower() == 'retzbach':
|
|
return [(18129, 31038), (18112, 31038), (18117, 31038)]
|
|
elif name.lower() == 'dietmannsdorf':
|
|
gem_name = 'Zellerndorf'
|
|
elif name.lower() == 'sierndorf':
|
|
gem_name = 'Sierndorf'
|
|
elif name.lower() == 'waltersdorf':
|
|
gem_name = 'Staatz'
|
|
elif name.lower() == 'viendorf':
|
|
name = 'Viendorf Weingebirge'
|
|
elif name.lower() == 'stoitzendorf':
|
|
return [(10137, 31105)]
|
|
elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'):
|
|
name = name.replace(' ', '')
|
|
elif name.lower() == 'drosendorf':
|
|
name = 'Drosendorf Stadt'
|
|
elif name.lower() == 'etzmannsdorf':
|
|
name = 'Etzmannsdorf bei Straning'
|
|
elif name.lower() == 'roggendorf':
|
|
gem_name = 'Röschitz'
|
|
elif name.lower() == 'wilhelmsdorf':
|
|
gem_name = 'Poysdorf'
|
|
elif name.lower() == 'nappersdorf-kammersdorf':
|
|
return [(9008, 31028), (9026, 31028), (9032, 31028), (9037, 31028), (9051, 31028), (9067, 31028)]
|
|
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name "
|
|
"FROM AT_kg k "
|
|
"JOIN AT_gem g ON g.gkz = k.gkz "
|
|
"JOIN wb_gem wg ON wg.gkz = g.gkz "
|
|
f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})",
|
|
(name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ')
|
|
.replace('Groß ', 'Groß').replace('-', ''),))
|
|
rows: List[Tuple[int, str, int, str]] = cur.fetchall()
|
|
cur.close()
|
|
|
|
if gem_name:
|
|
rows = [row for row in rows if row[3] == gem_name]
|
|
if len(rows) == 1:
|
|
return [(k, g) for k, _, g, _ in rows]
|
|
|
|
print(name, rows)
|
|
raise RuntimeError()
|
|
|
|
|
|
def lookup_kg_name(kgnr: int) -> str:
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,))
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
return rows[0][0] if len(rows) > 0 else None
|
|
|
|
|
|
def lookup_rnr_name(rnr: int) -> str:
|
|
return REED_MAP[rnr][2]
|
|
|
|
|
|
def lookup_hkid(kgnr: Optional[int], qualid: str) -> str:
|
|
hkid = None
|
|
if qualid in ('WEI', 'RSW'):
|
|
return 'OEST'
|
|
elif kgnr is None:
|
|
if CLIENT in (WG.MATZEN, WG.WINZERKELLER):
|
|
hkid = 'WLNO'
|
|
else:
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz "
|
|
"WHERE kg.kgnr = ?", (kgnr,))
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
hkid = rows[0][0]
|
|
if qualid == 'LDW':
|
|
if hkid == 'WIEN':
|
|
hkid = 'WLXX'
|
|
elif hkid[:2] in ('WL', 'BL', 'SL'):
|
|
hkid = hkid[:2] + 'XX'
|
|
else:
|
|
if hkid.startswith('WL') and not hkid.endswith('XX'):
|
|
hkid = 'WLNO'
|
|
return hkid
|
|
|
|
|
|
def guess_glnr(kgnr: int) -> Optional[int]:
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT kgnr FROM AT_kg "
|
|
"WHERE gkz / 100 != 900 AND gkz / 100 = (SELECT gkz / 100 FROM AT_kg WHERE kgnr = ?)", (kgnr,))
|
|
rows0 = cur.fetchall()
|
|
cur.execute("SELECT kgnr FROM AT_kg "
|
|
"WHERE gkz / 100 != 900 AND gkz = (SELECT gkz FROM AT_kg WHERE kgnr = ?)", (kgnr,))
|
|
rows1 = cur.fetchall()
|
|
cur.close()
|
|
|
|
glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows0 if k in GROSSLAGE_KG_MAP]))
|
|
if len(glnrs) == 0:
|
|
return None
|
|
elif len(glnrs) == 1:
|
|
return glnrs[0]
|
|
|
|
glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows1 if k in GROSSLAGE_KG_MAP]))
|
|
return glnrs[0] if len(glnrs) > 0 else None
|
|
|
|
|
|
def migrate_gradation(in_dir: str, out_dir: str) -> None:
|
|
global GRADATION_MAP
|
|
GRADATION_MAP = {}
|
|
for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'):
|
|
GRADATION_MAP[g['Oechsle']] = g['KW']
|
|
|
|
|
|
def migrate_branches(in_dir: str, out_dir: str) -> None:
|
|
global BRANCH_MAP
|
|
BRANCH_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/branch.csv') as f:
|
|
f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr')
|
|
for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'):
|
|
BRANCH_MAP[b['ZNR']] = b['Kennbst']
|
|
address = b['Straße']
|
|
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
|
|
if CLIENT == WG.MATZEN:
|
|
address = 'Schloßstraße 6'
|
|
postal_dest = 224303541
|
|
tel, mob = normalize_phone_nr(b['Telefon']), None
|
|
if tel and tel[4] == '6':
|
|
mob, tel = tel, None
|
|
f.row(b['Kennbst'], b['Name'].strip().title(), AUSTRIA, postal_dest, address,
|
|
tel, normalize_phone_nr(b['Telefax']), mob)
|
|
|
|
|
|
def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
|
|
global GROSSLAGE_MAP
|
|
GROSSLAGE_MAP = {}
|
|
|
|
glnr = 0
|
|
with utils.csv_open(f'{out_dir}/wb_gl.csv') as f:
|
|
f.header('glnr', 'name')
|
|
for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
|
|
glnr += 1
|
|
if CLIENT == WG.WINZERKELLER and gl['GLNR'] == 8:
|
|
GROSSLAGE_MAP[8] = 6
|
|
continue
|
|
GROSSLAGE_MAP[gl['GLNR']] = glnr
|
|
f.row(glnr, gl['Bezeichnung'])
|
|
|
|
|
|
def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
|
|
global GEM_MAP, GROSSLAGE_KG_MAP
|
|
GEM_MAP, GROSSLAGE_KG_MAP = {}, {}
|
|
|
|
inserted = set()
|
|
with utils.csv_open(f'{out_dir}/wb_kg.csv') as f:
|
|
f.header('kgnr', 'glnr')
|
|
for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'):
|
|
gems = lookup_gem_name(g['Bezeichnung'])
|
|
GEM_MAP[g['GNR']] = gems
|
|
for kgnr, gkz in gems:
|
|
if kgnr in inserted:
|
|
continue
|
|
inserted.add(kgnr)
|
|
glnr = GROSSLAGE_MAP[g['GLNR']]
|
|
GROSSLAGE_KG_MAP[kgnr] = glnr
|
|
f.row(kgnr, glnr)
|
|
|
|
|
|
def migrate_reeds(in_dir: str, out_dir: str) -> None:
|
|
global REED_MAP
|
|
REED_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/wb_rd.csv') as f:
|
|
f.header('kgnr', 'rdnr', 'name')
|
|
for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'):
|
|
name: str = r['Bezeichnung'].strip()
|
|
if name.isupper() or name.islower():
|
|
name = name.title()
|
|
|
|
try:
|
|
gem = GEM_MAP[r['GNR']]
|
|
kgnr = gem[0][0]
|
|
if len(gem) != 1:
|
|
print(gem, name, '->', gem[0])
|
|
except KeyError:
|
|
print(f'Invalid GNR {r["GNR"]} for reed {name}')
|
|
continue
|
|
|
|
rdnr = max([n for k, n, _ in REED_MAP.values() if k == kgnr] or [0]) + 1
|
|
REED_MAP[r['RNR']] = (kgnr, rdnr, name)
|
|
f.row(kgnr, rdnr, name)
|
|
|
|
|
|
def migrate_attributes(in_dir: str, out_dir: str) -> None:
|
|
with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
|
|
f.header('attrid', 'name', 'active', 'max_kg_per_ha', 'strict', 'fill_lower')
|
|
for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
|
|
if a['SANR'] is None:
|
|
continue
|
|
max_kg = int(a['KgProHa']) if a['KgProHa'] is not None else None
|
|
if max_kg == 10_000:
|
|
max_kg = None
|
|
f.row(a['SANR'], a['Attribut'], True, max_kg, False, 0)
|
|
if CLIENT == WG.MATZEN:
|
|
f.row('M', 'Matzen', False, None, False, 0)
|
|
f.row('HU', 'Huber', False, None, False, 0)
|
|
|
|
|
|
def migrate_cultivations(in_dir: str, out_dir: str) -> None:
|
|
global CULTIVATION_MAP
|
|
CULTIVATION_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f:
|
|
f.header('cultid', 'name', 'description')
|
|
for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
|
|
name: str = c['Bezeichnung']
|
|
cultid = name[0].upper()
|
|
if name.isupper():
|
|
cultid = name
|
|
elif 'biolog' in name.lower():
|
|
cultid = 'BIO'
|
|
CULTIVATION_MAP[c['BANR']] = cultid
|
|
f.row(cultid, name, None)
|
|
|
|
|
|
def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None:
|
|
with utils.csv_open(f'{out_dir}/area_commitment_type.csv') as f:
|
|
f.header('vtrgid', 'sortid', 'attrid', 'disc', 'min_kg_per_ha',
|
|
'penalty_per_kg', 'penalty_amount', 'penalty_none')
|
|
for t in utils.csv_parse_dict(f'{in_dir}/TLiefermengen.csv'):
|
|
sortid: str = t['SNR']
|
|
if not sortid or sortid == 'SV':
|
|
continue
|
|
menge = int(t['ErwarteteLiefermengeProHa'])
|
|
f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, menge,
|
|
None, None, None)
|
|
bio = []
|
|
if CLIENT == WG.MATZEN:
|
|
bio = ['GV', 'ZW', 'MT']
|
|
f.row('BM', 'BM', None, None, None, None, None, None)
|
|
elif CLIENT == WG.WINZERKELLER:
|
|
bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU']
|
|
for sortid in bio:
|
|
f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None)
|
|
|
|
|
|
def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]:
|
|
letters = string.ascii_letters + 'äöüßÄÖÜẞ-'
|
|
double_names = ['eva maria', 'maria theresia']
|
|
|
|
def is_alpha(s: str) -> bool:
|
|
return all(c in letters for c in s) if s.lower() not in double_names else True
|
|
|
|
if CLIENT == WG.WINZERKELLER:
|
|
if 'BEZIRKSBAUERNKAMMER' == family_name:
|
|
return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach'
|
|
elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'):
|
|
return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach'
|
|
elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN':
|
|
return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen'
|
|
|
|
if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \
|
|
len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name):
|
|
return None, given_name.title(), None, family_name.title(), None, None
|
|
|
|
prefix: Optional[str] = None
|
|
middle_names: Optional[str] = None
|
|
suffix: Optional[str] = None
|
|
billing_name: Optional[str] = None
|
|
|
|
if given_name.startswith('z.H. '):
|
|
billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR')
|
|
parts = given_name.split(' ')
|
|
given_name = parts[1]
|
|
family_name = parts[2]
|
|
|
|
given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ')
|
|
given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)',
|
|
lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name)
|
|
given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE)
|
|
|
|
titles = ''
|
|
|
|
def repl_title(m: re.Match) -> str:
|
|
nonlocal titles, suffix
|
|
t = m.group(1).lower().replace(' ', '').replace('.', '')
|
|
match t:
|
|
case 'jun': suffix = 'jun.'
|
|
case 'sen': suffix = 'sen.'
|
|
case 'dr': titles += 'Dr. '
|
|
case 'mag': titles += 'Mag. '
|
|
case 'ing': titles += 'Ing. '
|
|
case 'dipling': titles += 'Dipl.-Ing. '
|
|
case 'di': titles += 'Dipl.-Ing. '
|
|
case 'dkfm': titles += 'Dipl.-Kfm. '
|
|
case 'ökrat': titles += 'ÖkR '
|
|
case 'lkr': titles += 'ÖkR '
|
|
return ' '
|
|
|
|
title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE)
|
|
given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name))
|
|
family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name))
|
|
if titles:
|
|
prefix = titles.strip()
|
|
|
|
family_parts = family_name.split(' ')
|
|
last = family_parts[-1].lower()
|
|
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
|
|
family_name = ' '.join(family_parts[:-1])
|
|
if ' ' not in family_name and len(family_name) > 4:
|
|
family_name = family_name.title()
|
|
billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR')
|
|
if is_alpha(given_name):
|
|
return prefix, given_name.title(), middle_names, family_name, suffix, billing_name
|
|
|
|
given_parts = given_name.split(' ')
|
|
last = given_parts[-1].lower()
|
|
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
|
|
given_name = ' '.join(given_parts[:-1]).title()
|
|
family_name = family_name.title()
|
|
billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}'
|
|
return prefix, given_name, middle_names, family_name, suffix, billing_name
|
|
|
|
if ' ' in family_name or '.' in family_name:
|
|
if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'):
|
|
billing_name = family_name.title()
|
|
family_name = ' '.join(family_name.split(' ')[1:]).title()
|
|
elif family_name.lower().endswith('veltlinerhof') or family_name.lower().endswith('biohof'):
|
|
billing_name = ' '.join(family_name.split(' ')[::-1]).title()
|
|
family_name = ' '.join(family_name.split(' ')[:-1]).title()
|
|
elif 'u.' in family_name:
|
|
billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und '))
|
|
family_name = family_name.split(' ')[0].title()
|
|
else:
|
|
billing_name = family_name
|
|
family_name = family_name.split(' ')[-1].title()
|
|
if ' + ' in given_name:
|
|
parts = given_name.split(' + ')
|
|
family_name = family_name.title()
|
|
billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] +
|
|
f' {billing_name or family_name}')
|
|
given_name = parts[0].title()
|
|
else:
|
|
family_name = family_name.title()
|
|
given_name = given_name.title()
|
|
return prefix, given_name, middle_names, family_name, suffix, billing_name
|
|
|
|
|
|
def migrate_members(in_dir: str, out_dir: str) -> None:
|
|
global MEMBER_MAP
|
|
MEMBER_MAP = {}
|
|
|
|
members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')]
|
|
mgnrs = [m['MGNR'] for m in members]
|
|
fbs = parse_flaechenbindungen(in_dir)
|
|
|
|
with utils.csv_open(f'{out_dir}/member.csv') as f_m, \
|
|
utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \
|
|
utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel, \
|
|
utils.csv_open(f'{out_dir}/member_email_address.csv') as f_email, \
|
|
utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg:
|
|
f_m.header(
|
|
'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
|
|
'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
|
|
'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'organic', 'funktionär', 'active', 'deceased',
|
|
'iban', 'bic', 'country', 'postal_dest', 'address', 'default_kgnr', 'comment')
|
|
f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
|
|
f_tel.header('mgnr', 'nr', 'type', 'number', 'comment')
|
|
f_email.header('mgnr', 'nr', 'address', 'comment')
|
|
|
|
for m in members:
|
|
mgnr: int = m['MGNR']
|
|
family_name: str = m['Nachname']
|
|
given_name: str = m['Vorname']
|
|
funktionaer, deceased = False, False
|
|
|
|
if family_name is None and given_name is None:
|
|
continue
|
|
|
|
given_name = given_name or ''
|
|
if CLIENT == WG.MATZEN and given_name.startswith(' '):
|
|
funktionaer = True
|
|
if CLIENT == WG.WINZERKELLER and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name):
|
|
deceased = True
|
|
family_name = family_name.replace('*', '').replace('(+)', '')
|
|
given_name = given_name.replace('*', '').replace('(+)', '')
|
|
|
|
family_name = utils.remove_spaces(family_name)
|
|
given_name = utils.remove_spaces(given_name).replace(', ', ',')
|
|
|
|
ret = normalize_name(family_name, given_name)
|
|
prefix, given_name, middle_names, family_name, suffix, billing_name = ret
|
|
n1 = utils.remove_spaces(' '.join(r or '' for r in ret))
|
|
n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or ''))
|
|
if billing_name or n1.lower() != n2.lower():
|
|
convert_name(mgnr, (m['Nachname'], m['Vorname']),
|
|
(prefix, given_name, middle_names, family_name, suffix), billing_name)
|
|
if not given_name or not family_name:
|
|
given_name = given_name or ''
|
|
family_name = family_name or ''
|
|
invalid(mgnr, 'Name', n1, active)
|
|
|
|
bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None
|
|
if bnr is not None:
|
|
bnr = bnr.replace('.', '')
|
|
if len(bnr) == 10:
|
|
bnr = bnr.removesuffix('000')
|
|
elif len(bnr) == 6:
|
|
bnr = '0' + bnr
|
|
elif bnr.endswith(' inaktiv'):
|
|
bnr = bnr.split(' ')[0]
|
|
if not check_lfbis_nr(bnr):
|
|
if bnr in ('0', '1234567'):
|
|
warning(mgnr, 'BetriebsNr.', bnr, active)
|
|
else:
|
|
invalid(mgnr, 'BetriebsNr.', bnr, active)
|
|
bnr = None
|
|
|
|
ustid_nr: Optional[str] = m['UID']
|
|
if ustid_nr is not None:
|
|
ustid_nr = ustid_nr.replace(' ', '')
|
|
if len(ustid_nr) == 8 and ustid_nr.isdigit():
|
|
ustid_nr = 'ATU' + ustid_nr
|
|
elif not USTID_NR_RE.fullmatch(ustid_nr):
|
|
invalid(mgnr, 'UID', ustid_nr, active)
|
|
ustid_nr = None
|
|
if ustid_nr and not check_ustid_nr_at(ustid_nr):
|
|
if ustid_nr == 'ATU11111111':
|
|
warning(mgnr, 'UID', ustid_nr, active)
|
|
else:
|
|
invalid(mgnr, 'UID', ustid_nr, active)
|
|
ustid_nr = None
|
|
|
|
iban: Optional[str] = m['IBAN']
|
|
bic: Optional[str] = m['BIC']
|
|
blz: Optional[int] = m['BLZ']
|
|
kto_nr: Optional[str] = m['KontoNr']
|
|
|
|
if iban is not None:
|
|
iban = iban.replace(' ', '')
|
|
if not check_iban(iban):
|
|
invalid(mgnr, 'IBAN', iban, active)
|
|
iban = None
|
|
|
|
if bic is not None:
|
|
bic = bic.upper()
|
|
if bic == 'RLNWATAUE':
|
|
bic = 'RLNWATWWAUE'
|
|
elif bic == 'RLNWATWMIB':
|
|
bic = 'RLNWATWWMIB'
|
|
if not BIC_RE.fullmatch(bic):
|
|
invalid(mgnr, 'BIC', bic, active)
|
|
bic = None
|
|
if bic is not None:
|
|
if len(bic) == 11 and bic.endswith('XXX'):
|
|
bic = bic[:-3]
|
|
|
|
plz = int(m['PLZ']) if m['PLZ'] else None
|
|
ort: Optional[str] = m['Ort']
|
|
address: Optional[str] = m['Straße']
|
|
|
|
parts = ort.split(' ') if ort else ['']
|
|
if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()):
|
|
if len(parts) > 1 and parts[-2].isdigit():
|
|
ort = ' '.join(parts[:-2])
|
|
new_address = parts[-2] + parts[-1]
|
|
else:
|
|
ort = ' '.join(parts[:-1])
|
|
new_address = parts[-1]
|
|
if address is not None and address != ' ' and address != new_address:
|
|
print(address, new_address)
|
|
raise RuntimeError()
|
|
address = parts[-1]
|
|
if CLIENT == WG.WINZERKELLER and ort == 'JETZELDORF':
|
|
ort = 'JETZELSDORF'
|
|
if ort:
|
|
ort = ort.upper().strip()
|
|
|
|
if address is not None:
|
|
address_old = address
|
|
address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(),
|
|
utils.remove_spaces(address).title())
|
|
if address.startswith('Haus Nr.') or \
|
|
address.startswith('Nr. ') or \
|
|
address.startswith('Nr ') or \
|
|
(len(address) > 0 and address[0].isdigit()):
|
|
address = ort.title() + ' ' + address.split(' ')[-1]
|
|
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
|
|
.replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\
|
|
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
|
|
.replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\
|
|
.replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\
|
|
.replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß')
|
|
address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address)
|
|
if address.startswith('Ob. '):
|
|
address = address.replace('Ob. ', 'Obere ', 1)
|
|
address = address.replace(' Nr. ', ' ')
|
|
address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address)
|
|
address = utils.remove_spaces(address)
|
|
if address_old != address:
|
|
convert(mgnr, 'Adresse', address_old, address)
|
|
|
|
email: Optional[str] = m['EMail']
|
|
emails = []
|
|
if email is not None:
|
|
for email in email.split(' + '):
|
|
if email.isupper():
|
|
email = email.lower()
|
|
if not EMAIL_RE.fullmatch(email):
|
|
invalid(mgnr, 'E-Mail', m['EMail'], active)
|
|
else:
|
|
parts = email.split('@')
|
|
emails.append(f'{parts[0]}@{parts[1].lower()}')
|
|
|
|
zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0]
|
|
if CLIENT == WG.WINZERKELLER and plz == 1228:
|
|
plz = 1020
|
|
postal_dest = lookup_plz(plz, ort, address)
|
|
|
|
#if mgnr in fbs:
|
|
# gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020}
|
|
# if len(gems) == 1:
|
|
# print(GEM_MAP[list(gems)[0]])
|
|
|
|
okz = postal_dest % 100000 if postal_dest else None
|
|
kgnr = lookup_kgnr(okz)
|
|
active = m['Aktives Mitglied'] or False
|
|
if kgnr is None:
|
|
invalid(mgnr, 'KGNr.', ort, active)
|
|
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
|
|
glnr = guess_glnr(kgnr)
|
|
if glnr:
|
|
new('KG', kgnr, lookup_kg_name(kgnr), f'GL {glnr}')
|
|
f_kg.row(kgnr, glnr)
|
|
if 9999 not in GEM_MAP:
|
|
GEM_MAP[9999] = []
|
|
GEM_MAP[9999].append((kgnr, 0))
|
|
else:
|
|
kgnr = None
|
|
|
|
if postal_dest is None:
|
|
invalid(mgnr, 'PLZ', None, active)
|
|
continue
|
|
|
|
pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None
|
|
f_m.row(
|
|
mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
|
|
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
|
|
m['BHKontonummer'], zwstid, bnr, ustid_nr,
|
|
m['Volllieferant'] or False, m['Buchführend'] or False, False, funktionaer, active, deceased,
|
|
iban, bic, AUSTRIA, postal_dest, address or '-', kgnr, m['Anmerkung']
|
|
)
|
|
|
|
phone_1: Optional[str] = m['Telefon']
|
|
phone_2: Optional[str] = m['Telefax']
|
|
phone_3: Optional[str] = m['Mobiltelefon']
|
|
numbers = []
|
|
|
|
if CLIENT == WG.WINZERKELLER:
|
|
# Telefax (phone_2) not used
|
|
numbers = {}
|
|
|
|
def add_number(nr: str, fax: bool = False, comment: str = None, fax_only: bool = False) -> None:
|
|
mob = nr[4] == '6'
|
|
numbers[nr] = {'mobile': mob, 'landline': not mob and not fax_only, 'fax': fax, 'comment': None}
|
|
|
|
if phone_1:
|
|
phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\
|
|
.replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ')
|
|
phone_1 = utils.remove_spaces(phone_1)
|
|
fax = False
|
|
if phone_1.endswith(' u. fax'):
|
|
fax = True
|
|
phone_1 = ' '.join(phone_1.split(' ')[:-2])
|
|
if phone_1.replace(' ', '').replace('/', '').replace('-', '').isdigit() and len(phone_1) <= 20:
|
|
if phone_1[0] != '0' and '/' in phone_1:
|
|
for nr in phone_1.split('/'):
|
|
add_number(normalize_phone_nr(nr, ort), fax)
|
|
else:
|
|
add_number(normalize_phone_nr(phone_1, ort), fax)
|
|
elif re.fullmatch(r'0[0-9/ -]+ od\. 0[0-9/ -]+', phone_1):
|
|
parts = phone_1.split(' od. ')
|
|
add_number(normalize_phone_nr(parts[0], ort), False)
|
|
add_number(normalize_phone_nr(parts[1], ort), fax)
|
|
elif re.fullmatch(r'0[0-9/ -]+ od\. [1-9][0-9/ -]+', phone_1):
|
|
parts = phone_1.split(' od. ')
|
|
add_number(normalize_phone_nr(parts[0], ort), False)
|
|
if parts[0][1] == '6':
|
|
add_number(normalize_phone_nr(parts[1], ort), fax)
|
|
else:
|
|
add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), fax)
|
|
elif re.fullmatch(r'0[0-9/ -]+ fax 0[0-9/ -]+', phone_1):
|
|
parts = phone_1.split(' fax ')
|
|
add_number(normalize_phone_nr(parts[0], ort), False)
|
|
add_number(normalize_phone_nr(parts[1], ort), True, fax_only=True)
|
|
elif re.fullmatch(r'0[0-9/ -]+ fax [1-9][0-9/ -]+', phone_1):
|
|
parts = phone_1.split(' fax ')
|
|
add_number(normalize_phone_nr(parts[0], ort), False)
|
|
add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), True, fax_only=True)
|
|
elif '-' in phone_1 and phone_1.endswith('fax'):
|
|
nr = re.sub(r'-+ ', '-', phone_1)
|
|
nr = ' '.join(nr.split(' ')[:-1])
|
|
add_number(normalize_phone_nr(nr.split('-')[0], ort), False)
|
|
add_number(normalize_phone_nr(nr, ort), True, fax_only=True)
|
|
elif 'fax -' in phone_1:
|
|
parts = phone_1.split('fax')
|
|
add_number(normalize_phone_nr(parts[0], ort), False)
|
|
add_number(normalize_phone_nr(parts[0].strip() + parts[1].strip(), ort), True, fax_only=True)
|
|
elif phone_1.endswith('fax'):
|
|
nr = phone_1[:-3].strip()
|
|
add_number(normalize_phone_nr(nr), False)
|
|
add_number(normalize_phone_nr(nr), True, fax_only=True)
|
|
elif re.fullmatch(r'0[0-9/ -]+ u\. fax (od\. |u\. )?[0-9/ -]+', phone_1):
|
|
parts = phone_1.split(' ')
|
|
add_number(normalize_phone_nr(parts[0], ort), True)
|
|
nr = parts[-1]
|
|
if nr[0] == '0':
|
|
add_number(normalize_phone_nr(nr, ort))
|
|
else:
|
|
add_number(normalize_phone_nr(parts[0][:5] + nr, ort))
|
|
else:
|
|
parts = phone_1.split(' ')
|
|
if parts[-1].isalpha():
|
|
add_number(normalize_phone_nr(parts[0], ort), comment=parts[-1])
|
|
else:
|
|
for nr in parts:
|
|
add_number(normalize_phone_nr(nr, ort), fax)
|
|
if phone_3:
|
|
for nr in phone_3.split(','):
|
|
nr = nr.strip()
|
|
parts = nr.split(' ')
|
|
comment = None
|
|
if parts[-1].startswith('(') and parts[-1].endswith(')'):
|
|
nr = nr[:nr.rindex(' ')].strip()
|
|
comment = parts[-1][1:-1].strip()
|
|
elif parts[-1].isalpha():
|
|
nr = nr[:nr.rindex(' ')].strip()
|
|
comment = parts[-1].strip()
|
|
add_number(normalize_phone_nr(nr, ort), comment=comment)
|
|
count = 0
|
|
for nr, data in numbers.items():
|
|
if data['mobile']:
|
|
count += 1
|
|
f_tel.row(mgnr, count, 'mobile', nr, data['comment'])
|
|
if data['landline']:
|
|
count += 1
|
|
f_tel.row(mgnr, count, 'landline', nr, data['comment'])
|
|
if data['fax']:
|
|
count += 1
|
|
f_tel.row(mgnr, count, 'fax', nr, data['comment'])
|
|
else:
|
|
if phone_1:
|
|
phone_1 = normalize_phone_nr(phone_1)
|
|
if len(phone_1) <= 10 or phone_1[0] != '+':
|
|
invalid(mgnr, 'Tel.Nr.', m['Telefon'], active)
|
|
else:
|
|
numbers.append(phone_1)
|
|
if phone_1[4] == '6':
|
|
f_tel.row(mgnr, len(numbers), 'mobile', phone_1, None)
|
|
else:
|
|
f_tel.row(mgnr, len(numbers), 'landline', phone_1, None)
|
|
if phone_2:
|
|
phone_2 = normalize_phone_nr(phone_2)
|
|
if len(phone_2) <= 8 or phone_2[0] != '+':
|
|
invalid(mgnr, 'Fax.Nr.', m['Telefax'], active)
|
|
else:
|
|
numbers.append(phone_2)
|
|
if phone_2[4] == '6':
|
|
f_tel.row(mgnr, len(numbers), 'mobile', phone_2, None)
|
|
else:
|
|
f_tel.row(mgnr, len(numbers), 'fax', phone_2, None)
|
|
if phone_3:
|
|
phone_3 = normalize_phone_nr(phone_3)
|
|
if len(phone_3) <= 10 or phone_3[0] != '+':
|
|
invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon'], active)
|
|
elif phone_3 not in numbers:
|
|
numbers.append(phone_3)
|
|
if phone_3[4] == '6':
|
|
f_tel.row(mgnr, len(numbers), 'mobile', phone_3, None)
|
|
else:
|
|
f_tel.row(mgnr, len(numbers), 'landline', phone_3, None)
|
|
|
|
for i, email in enumerate(emails):
|
|
f_email.row(mgnr, i + 1, email, None)
|
|
|
|
MEMBER_MAP[mgnr] = {
|
|
'default_kgnr': kgnr
|
|
}
|
|
if billing_name:
|
|
f_mba.row(mgnr, billing_name, AUSTRIA, postal_dest, address or '-')
|
|
|
|
|
|
def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
|
|
def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]:
|
|
if nr_str is None:
|
|
return []
|
|
elif nr_str.isdigit() and len(nr_str) <= 6:
|
|
return [nr_str]
|
|
elif nr_str.count('/') == 1:
|
|
parts = nr_str.split('/')
|
|
if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3:
|
|
return [parts[0], parts[1]]
|
|
elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3:
|
|
return [nr_str]
|
|
if nr_str.count('/') > 1:
|
|
parts = nr_str.split('/')
|
|
if all([p.isdigit() for p in parts]):
|
|
if all([len(p) <= 1 for p in parts[1:]]):
|
|
return [f'{parts[0]}/{p}' for p in parts[1:]]
|
|
elif all([len(p) == len(parts[0]) for p in parts]):
|
|
return parts
|
|
if nr_str.startswith(f'{kgnr:05}'):
|
|
return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr)
|
|
if nr_str.endswith(' 2000'):
|
|
return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr)
|
|
parts = re.split(r' *[,;+&] *', nr_str)
|
|
if len(parts) == 1:
|
|
parts = nr_str.split(' / ')
|
|
if len(parts) == 1 and ' ' not in nr_str:
|
|
parts = nr_str.split(' ')
|
|
if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str:
|
|
parts = nr_str.split(' ')
|
|
if len(parts) > 1:
|
|
return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)]
|
|
|
|
m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str)
|
|
if m is not None:
|
|
b = m.group(1)
|
|
f = int(m.group(2))
|
|
t = int(m.group(3))
|
|
if t < f:
|
|
t += f - (f % pow(10, len(m.group(3))))
|
|
if t - f < 50:
|
|
return [
|
|
gst
|
|
for counter in range(f, t + 1)
|
|
for p in [f'{b or ""}{counter}']
|
|
for gst in parse_gstnrs(p, kgnr, mgnr)
|
|
]
|
|
|
|
invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}', None)
|
|
return []
|
|
|
|
def replace_nrs(m: re.Match, sep: str) -> str:
|
|
end = m.group(0).endswith(sep)
|
|
parts = [int(p) for p in m.group(0).split(sep)]
|
|
text = ''
|
|
last = None
|
|
for i, p in enumerate(parts):
|
|
if last is not None:
|
|
if last + 1 == p:
|
|
last = p
|
|
continue
|
|
else:
|
|
text += f'{last}{sep}'
|
|
last = None
|
|
if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]:
|
|
last = p
|
|
text += f'{p}-'
|
|
else:
|
|
text += f'{p}{sep}'
|
|
if last is not None:
|
|
text += str(last)
|
|
return text.strip().strip(sep) + (sep if end else '')
|
|
|
|
def format_gstnr(nrs: List[str]) -> Optional[str]:
|
|
if len(nrs) == 0:
|
|
return None
|
|
nrs = [re.sub(r'\b0+', '', nr)
|
|
for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr)
|
|
for nr in nrs])]
|
|
last = None
|
|
text = ''
|
|
for nr in nrs:
|
|
if last is None:
|
|
text += nr
|
|
elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]:
|
|
text += f'+{nr.split("/")[-1]}'
|
|
else:
|
|
text += f', {nr}'
|
|
last = nr
|
|
text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text)
|
|
text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text)
|
|
return text
|
|
|
|
reeds: Dict[int, Dict[int, str]] = {k: {r: n
|
|
for rk, r, n in REED_MAP.values() if rk == k}
|
|
for k in set([k for k, _, _ in REED_MAP.values()])}
|
|
new_reeds: Dict[Tuple[int, int], int] = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \
|
|
utils.csv_open(f'{out_dir}/wb_rd.csv', 'a+') as f_rd:
|
|
f_fb.header('fbnr', 'mgnr', 'vtrgid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr',
|
|
'year_from', 'year_to', 'comment')
|
|
|
|
for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
|
|
if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None:
|
|
continue
|
|
parz: str = fb['Parzellennummer']
|
|
fbnr: int = fb['FBNR']
|
|
mgnr: int = fb['MGNR']
|
|
gem = GEM_MAP[fb['GNR']]
|
|
kgnrs = [kgnr for kgnr, gkz in gem]
|
|
rnr = fb['RNR']
|
|
rd_kgnr, rdnr, _ = REED_MAP.get(rnr, (None, None, None)) if rnr else (None, None, None)
|
|
if mgnr not in MEMBER_MAP:
|
|
continue
|
|
|
|
kgnr = None
|
|
if rd_kgnr is None:
|
|
kgnr = kgnrs[0]
|
|
elif rd_kgnr in kgnrs:
|
|
kgnr = rd_kgnr
|
|
elif (kgnrs[0], rnr) in new_reeds:
|
|
kgnr = kgnrs[0]
|
|
rdnr = new_reeds[(kgnr, rnr)]
|
|
else:
|
|
rname = lookup_rnr_name(rnr)
|
|
for k in kgnrs:
|
|
if k not in reeds:
|
|
continue
|
|
try:
|
|
pos = list(reeds[k].values()).index(rname)
|
|
r = list(reeds[k].keys())[pos]
|
|
kgnr = k
|
|
rdnr = r
|
|
new_reeds[(kgnr, rnr)] = rdnr
|
|
break
|
|
except ValueError:
|
|
continue
|
|
if kgnr is None:
|
|
kgnr = kgnrs[0]
|
|
rdnr = max([r for _, r, _ in REED_MAP.values() if k == kgnr] +
|
|
[r for (k, _), r in new_reeds.items() if k == kgnr]) + 1
|
|
f_rd.row(kgnr, rdnr, rname)
|
|
new_reeds[(kgnr, rnr)] = rdnr
|
|
new('Reed', (kgnr, rdnr), rname)
|
|
|
|
area = int(fb['Flaeche'])
|
|
if CLIENT == WG.MATZEN:
|
|
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
|
|
else:
|
|
gstnrs = []
|
|
comment, gstnr = None, None
|
|
if parz is None or parz == '0000':
|
|
if parz is not None:
|
|
invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None)
|
|
gstnrs = []
|
|
gstnr = '-'
|
|
if CLIENT == WG.MATZEN and len(gstnrs) == 0:
|
|
comment = f'KG {kgnr or 0:05}: {parz}'
|
|
gstnr = format_gstnr(gstnrs) or gstnr or parz
|
|
if parz != gstnr.replace('+', '/'):
|
|
convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr)
|
|
|
|
to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
|
|
f_fb.row(fbnr, mgnr, fb['SNR'] + (fb['SANR'] or ''), CULTIVATION_MAP[fb['BANR'] or 1], area,
|
|
kgnr, gstnr, rdnr, fb['Von'], to, comment)
|
|
|
|
|
|
def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]:
|
|
dates = {}
|
|
fixed = {}
|
|
last_dates = {}
|
|
|
|
def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None:
|
|
if lsnr not in fixed:
|
|
fixed[lsnr] = []
|
|
dates[lsnr] = date
|
|
elif unique:
|
|
return add(lsnr + '/2', linr, date, unique)
|
|
fixed[lsnr].append(linr)
|
|
|
|
def get_lsnr(date: datetime.date, lsnr: str) -> str:
|
|
if date.year < 2000:
|
|
return date.strftime('%y%m%d00') + lsnr[8:]
|
|
else:
|
|
return date.strftime('%Y%m%d') + lsnr[8:]
|
|
|
|
deliveries: List[Tuple[int, str, datetime.date, int, int]] = [
|
|
(d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR'])
|
|
for d in deliveries
|
|
if d['Lieferscheinnummer'] and not d['Storniert']
|
|
]
|
|
|
|
lsnrs = {d[1] for d in deliveries}
|
|
|
|
for lnr, lsnr, date, zwstid, mgnr in deliveries:
|
|
if len(lsnr) < 8:
|
|
continue
|
|
if lsnr.startswith('22'):
|
|
lsnr = '20' + lsnr[2:]
|
|
lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \
|
|
else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6]))
|
|
|
|
lsnr_zwstid = lsnr[8]
|
|
if lsnr_zwstid != zwstid and lsnr_zwstid in BRANCH_MAP.values():
|
|
zwstid = lsnr_zwstid
|
|
|
|
if len(lsnr) == 12:
|
|
if date != lsdate:
|
|
if date.year == lsdate.year:
|
|
lsnr_n = get_lsnr(date, lsnr)
|
|
if lsnr_n not in lsnrs:
|
|
lsnr = lsnr_n
|
|
else:
|
|
warning_delivery(lsnr, mgnr, 'date', date)
|
|
else:
|
|
date = datetime.date(lsdate.year, date.month, date.day)
|
|
|
|
if zwstid not in last_dates or not date < last_dates[zwstid]:
|
|
last_dates[zwstid] = date
|
|
add(lsnr, lnr, date, unique=True)
|
|
else:
|
|
add(lsnr[:12], lnr, date)
|
|
|
|
return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()],
|
|
key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0])
|
|
|
|
|
|
def migrate_deliveries(in_dir: str, out_dir: str) -> None:
|
|
global DELIVERY_MAP, MODIFIER_MAP, SORT_MAP
|
|
DELIVERY_MAP, MODIFIER_MAP, SORT_MAP = {}, {}, {}
|
|
|
|
modifiers = {
|
|
m['ASNR']: m
|
|
for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv')
|
|
if m['Bezeichnung'] and m['Bezeichnung'] != '-'
|
|
}
|
|
seasons = {}
|
|
branches = {}
|
|
|
|
for mod in modifiers.values():
|
|
name: str = mod['Bezeichnung'].replace('ausser', 'außer')
|
|
nr: int = mod['ASNR']
|
|
MODIFIER_MAP[name] = mod
|
|
if CLIENT == WG.MATZEN:
|
|
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS'
|
|
elif CLIENT == WG.WINZERKELLER:
|
|
mod['id'] = {
|
|
1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG',
|
|
5: 'VT', 6: 'MV', 7: 'UP', 8: 'VL',
|
|
9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG',
|
|
}[nr]
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
|
|
delivery_dict = {d['LINR']: d for d in deliveries}
|
|
fixed = fix_deliveries(deliveries)
|
|
updated_varieties = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \
|
|
utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part:
|
|
f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment')
|
|
f_part.header('year', 'did', 'dpnr', 'sortid', 'attrid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr',
|
|
'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen', 'gebunden',
|
|
'temperature', 'acid', 'scale_id', 'weighing_id', 'weighing_reason', 'comment')
|
|
|
|
for lsnr, linrs, date in fixed:
|
|
if date.year not in seasons:
|
|
seasons[date.year] = {
|
|
'currency': 'EUR' if date.year >= 2001 else 'ATS',
|
|
'precision': WGMASTER_PRECISION,
|
|
'start': date,
|
|
'end': date,
|
|
'nr': 0,
|
|
}
|
|
s = seasons[date.year]
|
|
if date > s['end']:
|
|
s['end'] = date
|
|
s['nr'] += 1
|
|
snr = s['nr']
|
|
|
|
mgnr = delivery_dict[linrs[0]]['MGNR']
|
|
znr = delivery_dict[linrs[0]]['ZNR']
|
|
glob_waage = set(delivery_dict[linr]['Waagentext'] for linr in linrs if delivery_dict[linr]['Waagentext'])
|
|
|
|
zwstid = lsnr[8]
|
|
if zwstid not in branches:
|
|
branches[zwstid] = {}
|
|
if date not in branches[zwstid]:
|
|
branches[zwstid][date] = 0
|
|
branches[zwstid][date] += 1
|
|
lnr = branches[zwstid][date]
|
|
|
|
if BRANCH_MAP[znr] != zwstid:
|
|
if zwstid not in BRANCH_MAP.values():
|
|
zwstid = BRANCH_MAP[znr]
|
|
|
|
comments = []
|
|
attributes = set()
|
|
for dpnr, linr in enumerate(linrs, start=1):
|
|
d = delivery_dict[linr]
|
|
DELIVERY_MAP[linr] = (date.year, snr, dpnr)
|
|
if lsnr != d['Lieferscheinnummer']:
|
|
renumber_delivery(d['Lieferscheinnummer'], lsnr)
|
|
|
|
oe = d['OechsleOriginal'] or d['Oechsle']
|
|
kmw = GRADATION_MAP[oe]
|
|
sortid = d['SNR'].upper()
|
|
if d['SANR']:
|
|
attributes.add(d['SANR'])
|
|
if len(sortid) != 2:
|
|
attributes.add(sortid[2:])
|
|
sortid = sortid[:2]
|
|
|
|
if CLIENT == WG.MATZEN:
|
|
if sortid == 'HU':
|
|
# Gr.Veltliner (Huber)
|
|
sortid = 'GV'
|
|
attributes.remove('B')
|
|
attributes.add('HU')
|
|
elif sortid == 'SV':
|
|
sortid = 'SW'
|
|
elif sortid == 'WC':
|
|
# WEIẞBURGUNDER/CHARDONNAY
|
|
sortid = 'SW'
|
|
if 'H' in attributes:
|
|
attributes.remove('H')
|
|
attributes.add('HK')
|
|
if 'W' in attributes:
|
|
attributes.remove('W')
|
|
elif CLIENT == WG.WINZERKELLER:
|
|
if 'F' in attributes:
|
|
attributes.remove('F')
|
|
|
|
if d['SNR'] != sortid:
|
|
SORT_MAP[f'{d["SNR"]}/{d["SANR"] or ""}'] = f'{sortid}/{",".join(list(attributes)) or ""}'
|
|
line = f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{",".join(list(attributes)) or None}'
|
|
if line not in updated_varieties:
|
|
updated_varieties[line] = 0
|
|
updated_varieties[line] += 1
|
|
|
|
if d['QSNR'] is None:
|
|
warning_delivery(lsnr, mgnr, 'qualid', 'UNSET')
|
|
if d['Oechsle'] >= 86:
|
|
qualid = 'KAB'
|
|
else:
|
|
qualid = QUAL_MAP[d['QSNR']]
|
|
if qualid != 'WEI' and d['Abgewertet']:
|
|
if qualid == 'RSW':
|
|
qualid = 'WEI'
|
|
else:
|
|
warning_delivery(lsnr, mgnr, 'qualid', f'{qualid} (abgewertet)')
|
|
qualid = 'WEI'
|
|
|
|
kgnr, rdnr = None, None
|
|
if d['GNR']:
|
|
gem = GEM_MAP.get(d['GNR'], [])
|
|
if len(gem) == 1:
|
|
kgnr = gem[0][0]
|
|
if d['RNR']:
|
|
kgnr, rdnr, _ = REED_MAP[d['RNR']]
|
|
if kgnr is None:
|
|
m = MEMBER_MAP[mgnr]
|
|
kgnr = m['default_kgnr']
|
|
if kgnr is None:
|
|
pass
|
|
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
|
|
warning_delivery(lsnr, mgnr, 'KGNr.', kgnr)
|
|
kgnr = None
|
|
hkid = lookup_hkid(kgnr, qualid)
|
|
|
|
handwiegung = d['Handwiegung'] or False
|
|
waage = list(glob_waage)[0] if len(glob_waage) == 1 else d['Waagentext']
|
|
scale_id, weighing_id = None, None
|
|
if waage:
|
|
# Waagenr: 1 ID: 19
|
|
# Waagennummer: 1 Speichernummer: 9166
|
|
waage = re.split(r' +', waage)
|
|
scale_id = waage[1]
|
|
weighing_id = waage[3] if waage[2] == 'Speichernummer:' else f'{date}/{waage[3]}'
|
|
elif len(glob_waage) == 0 and not handwiegung:
|
|
handwiegung = True
|
|
|
|
comment: Optional[str] = d['Anmerkung']
|
|
acid = d['Säure']
|
|
|
|
hand, lesewagen = None, None
|
|
if comment:
|
|
comment = comment.replace('Söure', 'Säure')
|
|
if comment.startswith('Säure'):
|
|
acid = float(comment.split(' ')[-1].replace(',', '.'))
|
|
comment = None
|
|
elif comment in ('Maschine', 'Masschine'):
|
|
hand = False
|
|
comment = None
|
|
elif comment == 'Hand':
|
|
hand = True
|
|
comment = None
|
|
elif comment == '.':
|
|
comment = None
|
|
if comment:
|
|
comments.append(comment)
|
|
gerebelt = True if CLIENT == WG.MATZEN or (CLIENT == WG.WINZERKELLER and zwstid == 'W') else d['Gerebelt'] or False
|
|
gebunden = None if CLIENT in (WG.MATZEN, WG.WINZERKELLER) else d['Gebunden']
|
|
if len(attributes) > 1:
|
|
print("ERROR: ", attributes)
|
|
attrid = attributes.pop() if len(attributes) == 1 else None
|
|
f_part.row(
|
|
date.year, snr, dpnr, sortid, attrid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr,
|
|
gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False,
|
|
hand, lesewagen, gebunden, d['Temperatur'], acid, scale_id, weighing_id, None, comment
|
|
)
|
|
f_delivery.row(date.year, snr, date, d['Uhrzeit'], zwstid, lnr, lsnr, mgnr, '; '.join(comments) or None)
|
|
for k, v in updated_varieties.items():
|
|
print(k + (f' ({v} times)' if v > 1 else ''))
|
|
|
|
with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
|
|
f_part_mod.header('year', 'did', 'dpnr', 'modid')
|
|
for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
|
|
if m['LINR'] not in DELIVERY_MAP or m['ASNR'] not in modifiers:
|
|
continue
|
|
y, did, dpnr = DELIVERY_MAP[m['LINR']]
|
|
f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id'])
|
|
|
|
with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
|
|
utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
|
|
f_season.header('year', 'currency', 'precision', 'max_kg_per_ha', 'vat_normal', 'vat_flatrate',
|
|
'min_kg_per_bs', 'max_kg_per_bs', 'penalty_per_kg', 'penalty_amount', 'penalty_none',
|
|
'start_date', 'end_date')
|
|
f_mod.header('year', 'modid', 'ordering', 'name', 'abs', 'rel', 'standard', 'quick_select')
|
|
for y, s in seasons.items():
|
|
f_season.row(y, s['currency'], s['precision'], 10_000, 0.10, 0.13,
|
|
PARAMETERS['LIEFERPFLICHT/GA1'], PARAMETERS['LIEFERRECHT/GA1'],
|
|
None, None, None, s['start'], s['end'])
|
|
for m in modifiers.values():
|
|
abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
|
|
rel_v = m['AZASProzent'] / 100.0 if m['AZASProzent'] is not None else None
|
|
f_mod.row(y, m['id'], m['ASNR'], m['Bezeichnung'], abs_v, rel_v,
|
|
m.get('Standard', False), m['Schnellauswahl'])
|
|
|
|
|
|
def migrate_payments(in_dir: str, out_dir: str) -> None:
|
|
variant_map: Dict[int, Tuple[int, int]] = {}
|
|
variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {}
|
|
year_map = {}
|
|
az_map = {}
|
|
|
|
p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv'))
|
|
sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])}
|
|
p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv'))
|
|
qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])}
|
|
|
|
def collapse_data(data: dict[str, Any]):
|
|
rev = {}
|
|
for k, v in data.items():
|
|
if k == 'default' or k.startswith('/'):
|
|
continue
|
|
rev[v] = rev.get(v, [])
|
|
rev[v].append(k)
|
|
if 'default' not in data.keys():
|
|
if len(rev) == 1:
|
|
return set(rev.keys()).pop()
|
|
for v, ks in rev.items():
|
|
if len(ks) >= len(data) / 2:
|
|
for k in ks:
|
|
del data[k]
|
|
data['default'] = v
|
|
return collapse_data(data)
|
|
for idx1 in {'/' + k.split('/')[1] for k in data.keys() if '/' in k and len(k) > 3}:
|
|
len1 = len(list(k for k, _ in data.items() if k.endswith(idx1)))
|
|
for v, ks in rev.items():
|
|
my_ks = list(k for k in ks if k.endswith(idx1))
|
|
if len(my_ks) > 1 and len(my_ks) >= len1 / 2:
|
|
for k in my_ks:
|
|
del data[k]
|
|
data[idx1] = v
|
|
return data
|
|
|
|
def collapse_curve(curve: list[float]):
|
|
n = {}
|
|
d = 0
|
|
for oe, p0, p1, p2 in zip(range(0, len(curve) + 1), [0] + curve, curve, curve[1:] + [curve[len(curve) - 1]]):
|
|
d1, d2 = round(p1 - p0, WGMASTER_PRECISION), round(p2 - p1, WGMASTER_PRECISION)
|
|
if d1 == d:
|
|
continue
|
|
d = d2
|
|
if p0 > 0:
|
|
n[f'{oe - 1}oe'] = p0
|
|
n[f'{oe}oe'] = p1
|
|
if curve[len(curve) - 1] > 0:
|
|
n[f'{len(curve) - 1}oe'] = curve[len(curve) - 1]
|
|
keys = list(n.keys())
|
|
vals = list(n.values())
|
|
while len(n) >= 2 and vals[0] == vals[1]:
|
|
del n[keys[0]]
|
|
del n[keys[1]]
|
|
n = {keys[1]: vals[1], **n}
|
|
keys = list(n.keys())
|
|
vals = list(n.values())
|
|
while len(n) >= 2 and vals[len(vals) - 1] == vals[len(vals) - 2]:
|
|
del n[keys[len(vals) - 1]]
|
|
del n[keys[len(vals) - 2]]
|
|
n = {**n, keys[len(vals) - 2]: vals[len(vals) - 2]}
|
|
keys = list(n.keys())
|
|
vals = list(n.values())
|
|
if len(n) == 0:
|
|
n = {'73oe': 0}
|
|
elif len(n) == 1:
|
|
n = {'73oe': list(n.values())[0]}
|
|
return n
|
|
|
|
with (utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment):
|
|
f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time', 'comment', 'data')
|
|
for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'):
|
|
year = p['Lesejahr']
|
|
if year is None:
|
|
continue
|
|
if year not in year_map:
|
|
year_map[year] = 0
|
|
year_map[year] += 1
|
|
variant_map[p['AZNR']] = (year, year_map[year])
|
|
var = p.copy()
|
|
del var['AZNR']
|
|
del var['Datum']
|
|
del var['Beschreibung']
|
|
del var['Lesejahr']
|
|
del var['Titel']
|
|
del var['TeilzahlungNr']
|
|
data = {
|
|
'mode': 'wgmaster',
|
|
**var,
|
|
'AuszahlungSorten': {},
|
|
'AuszahlungSortenQualitätsstufe': {},
|
|
}
|
|
|
|
azs = data['AuszahlungSorten']
|
|
for s in sort_map.get(p['AZNR'], []):
|
|
del s['AZNR']
|
|
del s['ID']
|
|
if s['Oechsle'] is None:
|
|
continue
|
|
key = SORT_MAP.get(f'{s["SNR"]}/{s["SANR"] or ""}', f'{s["SNR"].upper()}/{s["SANR"] or ""}')
|
|
azs[key] = azs.get(key, {'Gebunden': {}, 'NichtGebunden': {}})
|
|
azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = s['Betrag']
|
|
curves = []
|
|
curve_zero = False
|
|
for key, d1 in azs.items():
|
|
oe = [d1['NichtGebunden'].get(n, 0.0) for n in range(max(d1['NichtGebunden'].keys()) + 1)]
|
|
if len(d1['Gebunden']) > 0:
|
|
oe_geb = [d1['Gebunden'].get(n, 0.0) for n in range(max(d1['Gebunden'].keys()) + 1)]
|
|
else:
|
|
oe_geb = None
|
|
|
|
if len(set(oe)) <= 2 and oe[0] == 0 and \
|
|
(oe_geb is None or (len(set(oe_geb)) <= 2 and oe[len(oe) - 1] == oe_geb[len(oe_geb) - 1])):
|
|
azs[key] = oe[len(oe) - 1]
|
|
if azs[key] == 0:
|
|
azs[key] = 'curve:0'
|
|
curve_zero = True
|
|
else:
|
|
c = (oe, oe_geb)
|
|
if c not in curves:
|
|
curves.append(c)
|
|
azs[key] = f'curve:{curves.index(c) + 1}'
|
|
data['AuszahlungSorten'] = collapse_data(azs)
|
|
for i, cs in enumerate(curves):
|
|
c, c_geb = cs
|
|
geb = None
|
|
if c_geb is not None:
|
|
diff = {round(b - a, WGMASTER_PRECISION) for a, b in zip(c, c_geb)}
|
|
diff.remove(0.0)
|
|
if len(diff) == 1:
|
|
geb = diff.pop()
|
|
elif len(diff) > 1:
|
|
geb = collapse_curve(c_geb)
|
|
curves[i] = {
|
|
'id': i + 1,
|
|
'mode': 'oe',
|
|
'data': collapse_curve(c),
|
|
}
|
|
if geb is not None:
|
|
curves[i]['geb'] = geb
|
|
if curve_zero:
|
|
curves.insert(0, {
|
|
'id': 0,
|
|
'mode': 'oe',
|
|
'data': data['Grundbetrag'],
|
|
'geb': data['GBZS'] or 0
|
|
})
|
|
data['Kurven'] = curves
|
|
azq = data['AuszahlungSortenQualitätsstufe']
|
|
for q in qual_map.get(p['AZNR'], []):
|
|
del q['AZNR']
|
|
del q['ID']
|
|
qualid = QUAL_MAP[q['QSNR']]
|
|
key = SORT_MAP.get(f'{q["SNR"]}/{q["SANR"] or ""}', f'{q["SNR"].upper()}/{q["SANR"] or ""}')
|
|
azq[qualid] = azq.get(qualid, {})
|
|
azq[qualid][key] = q['Betrag']
|
|
for qualid, d1 in azq.items():
|
|
azq[qualid] = collapse_data(d1)
|
|
|
|
for data_k, data_v in data.copy().items():
|
|
if data_v is None or isinstance(data_v, bool) and not data_v:
|
|
del data[data_k]
|
|
az_map[p['AZNR']] = data
|
|
|
|
test = (p['TeilzahlungNr'] == 7)
|
|
if not test:
|
|
if year not in variant_year_map:
|
|
variant_year_map[year] = []
|
|
variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr']))
|
|
f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None,
|
|
p['Beschreibung'], json.dumps(data))
|
|
|
|
with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay, \
|
|
utils.csv_open(f'{out_dir}/delivery_part_bucket.csv') as f_bucket:
|
|
f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'net_amount')
|
|
f_bucket.header('year', 'did', 'dpnr', 'bktnr', 'discr', 'value')
|
|
deliveries = {d['LINR']: d for d in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')}
|
|
for linr, (y, did, dpnr) in DELIVERY_MAP.items():
|
|
p = deliveries[linr]
|
|
if y not in variant_year_map:
|
|
continue
|
|
gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden'])
|
|
b1 = gew - geb_gew
|
|
b2 = geb_gew
|
|
f_bucket.row(y, did, dpnr, 0, '_', b1)
|
|
f_bucket.row(y, did, dpnr, 1, p['SANR'] or '', b2)
|
|
for aznr, avnr, tznr in variant_year_map[y]:
|
|
val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung']
|
|
val = round(val * pow(10, WGMASTER_PRECISION))
|
|
f_del_pay.row(y, did, dpnr, avnr, val)
|
|
|
|
|
|
def migrate_parameters(in_dir: str, out_dir: str) -> None:
|
|
global PARAMETERS
|
|
PARAMETERS = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')}
|
|
name = PARAMETERS['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und').replace(' Im ', ' im ')
|
|
suffix = PARAMETERS['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '')
|
|
types = {
|
|
'reggenmbh': 'reg. Gen.m.b.H.'
|
|
}
|
|
tokens = {
|
|
WG.MATZEN: ('WGM', 'WG Matzen'),
|
|
WG.WINZERKELLER: ('WKW', 'Winzerkeller')
|
|
}.get(CLIENT, (None, None))
|
|
|
|
ort = PARAMETERS['MANDANTENORT'].title()
|
|
new_params: Dict[str, Optional[str]] = {
|
|
'CLIENT_NAME_TOKEN': tokens[0],
|
|
'CLIENT_NAME_SHORT': tokens[1],
|
|
'CLIENT_NAME': name,
|
|
'CLIENT_NAME_SUFFIX': None,
|
|
'CLIENT_NAME_TYPE': types[suffix],
|
|
'CLIENT_PLZ': PARAMETERS['MANDANTENPLZ'],
|
|
'CLIENT_ORT': ort,
|
|
'CLIENT_ADDRESS': PARAMETERS['MANDANTENSTRASSE'],
|
|
'CLIENT_IBAN': None,
|
|
'CLIENT_BIC': None,
|
|
'CLIENT_USTIDNR': PARAMETERS['MANDANTENUID'].replace(' ', ''),
|
|
'CLIENT_LFBISNR': PARAMETERS['MANDANTENBETRIEBSNUMMER'],
|
|
'CLIENT_PHONE': normalize_phone_nr(PARAMETERS['MANDANTENTELEFON'], ort),
|
|
'CLIENT_FAX': normalize_phone_nr(PARAMETERS['MANDANTENTELEFAX'], ort),
|
|
'CLIENT_EMAIL': PARAMETERS['MANDANTENEMAIL'],
|
|
'CLIENT_WEBSITE': PARAMETERS.get('MANDANTENHOMEPAGE', None),
|
|
'DOCUMENT_SENDER': PARAMETERS.get('ABSENDERTEXT2', None),
|
|
'TEXT_DELIVERYNOTE': PARAMETERS.get('LIEFERSCHEINTEXT', None).replace(' daß ', ' dass ').replace('obige Angaben maßgeblicher Veränderungen', 'maßgeblichen Veränderungen obiger Angaben'),
|
|
'TEXT_DELIVERYCONFIRMATION': PARAMETERS.get('ANLIEFTEXT', None),
|
|
}
|
|
|
|
with utils.csv_open(f'{out_dir}/client_parameter.csv') as f:
|
|
f.header('param', 'value')
|
|
for param, value in new_params.items():
|
|
f.row(param, value)
|
|
|
|
|
|
def main() -> None:
|
|
global DB_CNX, QUIET, CLIENT
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('in_dir', type=str,
|
|
help='The input directory where the exported csv files are stored')
|
|
parser.add_argument('out_dir', type=str,
|
|
help='The output directory where the migrated csv file should be stored')
|
|
parser.add_argument('-q', '--quiet', action='store_true', default=False,
|
|
help='Be less verbose')
|
|
parser.add_argument('-d', '--database', metavar='DB', required=True,
|
|
help='The sqlite database file to look up information')
|
|
parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str,
|
|
choices=[wg.name for wg in WG])
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.out_dir, exist_ok=True)
|
|
|
|
QUIET = args.quiet
|
|
CLIENT = WG.from_str(args.genossenschaft)
|
|
|
|
DB_CNX = sqlite3.connect(args.database)
|
|
|
|
migrate_parameters(args.in_dir, args.out_dir)
|
|
migrate_gradation(args.in_dir, args.out_dir)
|
|
migrate_branches(args.in_dir, args.out_dir)
|
|
migrate_grosslagen(args.in_dir, args.out_dir)
|
|
migrate_gemeinden(args.in_dir, args.out_dir)
|
|
migrate_reeds(args.in_dir, args.out_dir)
|
|
migrate_attributes(args.in_dir, args.out_dir)
|
|
migrate_cultivations(args.in_dir, args.out_dir)
|
|
migrate_area_commitment_types(args.in_dir, args.out_dir)
|
|
migrate_members(args.in_dir, args.out_dir)
|
|
migrate_area_commitments(args.in_dir, args.out_dir)
|
|
migrate_deliveries(args.in_dir, args.out_dir)
|
|
migrate_payments(args.in_dir, args.out_dir)
|
|
|
|
DB_CNX.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|