Files
elwig-misc/wgmaster/migrate.py

1761 lines
73 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Dict, Any, Tuple, Optional, List, Iterable
import argparse
import os
import re
import sys
import sqlite3
import requests
import datetime
import json
import string
import utils
DB_CNX: Optional[sqlite3.Connection] = None
QUIET: bool = False
WG: Optional[str] = None
USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}')
BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
EMAIL_RE = re.compile(r'[^@\s]+@([A-Za-z0-9_äöüß-]+\.)+[A-Za-z]{2,}')
GRADATION_MAP: Optional[Dict[float, float]] = None
CULTIVATION_MAP: Optional[Dict[int, str]] = None
BRANCH_MAP: Optional[Dict[int, str]] = None
GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None
REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None
GROSSLAGE_MAP: Optional[Dict[int, int]] = None
MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None
GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None
DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None
MODIFIER_MAP: Optional[Dict[str, Dict]] = None
AUSTRIA = 40
WGMASTER_PRECISION = 4
QUAL_MAP: Dict[int, str] = {
0: 'WEI',
1: 'RSW',
2: 'LDW',
3: 'QUW',
4: 'KAB',
5: 'SPL',
}
ORT_NAMES: Dict[str, Optional[str]] = {
'Pirawarth': None,
'Raggendorf': None,
'Matzen': 'Matzner',
'Matzn': None,
'Stillfried': None,
'Harras': None,
'Gänserndorf': None,
'Sulz': None,
'Brünn': None,
'Wien': None,
'Angern': None,
'Schweinbarth': None,
'Hohenruppersdorf': None,
'Grub': None,
'Auersthal': None,
'Ollersdorf': None,
'Spannberg': None,
'Ebenthal': None,
'Bockfließ': None,
'Dörfless': 'Dörfleser',
'Dörfles': None,
'Ableiding': None,
'Absberg': None,
'Eibesbrunn': None,
'Engersdorf': None,
'Enzersfeld': None,
'Großebersdorf': None,
'Hollabrunn': None,
'Korneuburg': None,
'Königsbrunn': None,
'Laa': None,
'Leopoldau': None,
'Manhartsbrunn': None,
'Mannhartsbrunn': 'Manhartsbrunner',
'Münichsthal': None,
'Pernau': None,
'Pillichsdorf': None,
'Retz': None,
'Russbach': None,
'Schleinbach': None,
'Seefeld': None,
'Seyring': None,
'Stammersdorf': None,
'Stelzendorf': None,
'Traunfeld': None,
'Tresdorf': None,
'Trumau': None,
'Wolkersdorf': None,
'Znaim': None,
'Obersdorf': None,
'Sechshaus': None,
}
STREET_NAMES: Dict[str, str] = {
'Hans-Wagnerstraße': 'Hans-Wagner-Straße',
'J.Seitzstraße': 'Josef-Seitz-Straße',
'Kurhaus-Str.': 'Kurhausstraße',
'Kurhaus-Straße': 'Kurhausstraße',
'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße',
'Flustraße': 'Flurstraße',
'St.Laurentstraße': 'St.-Laurentstraße',
'Josef Seitzstraße': 'Josef-Seitz-Straße',
'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße',
'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße',
'R. Virchow-Straße': 'Rudolf-Virchow-Straße',
'Grubere Hauptstraße': 'Gruber Hauptstraße',
'Groß Inzersdorf': 'Großinzersdorf',
'Erdpress': 'Erdpreß',
'Hochleitengasse': 'Hochleithengasse',
'Bei Der Gösselmühle': 'Bei der Gösslmühle',
'Dr. Peschlstraße': 'Dr.-Peschl-Straße',
'Dr.Peschlstraße': 'Dr.-Peschl-Straße',
'Dr. Salzbornstraße': 'Dr.-Salzborn-Straße',
'Elsa Brandström-Straße': 'Elsa-Brandström-Straße',
'Franz Ecker Siedlung': 'Franz-Ecker-Siedlung',
'Franz-Ecker Siedlung': 'Franz-Ecker-Siedlung',
'Franz Gillygasse': 'Franz-Gilly-Gasse',
'Franz V. Zülowstraße': 'Franz-von-Zülow-Straße',
'Gr. Nondorf': 'Großnondorf',
'In Der Trift': 'In der Trift',
'Johann Degengasse': 'Johann-Degen-Gasse',
'Josef Fürnkranz Siedlung': 'Josef-Fürnkranz-Siedlung',
'Kaiser Franz Josef Platz': 'Kaiser-Franz-Josef-Platz',
'Klein Haugsdorf': 'Kleinhaugsdorf',
'Leopold Leuthnerstraße': 'Leopold-Leuthner-Straße',
'Lh.-Mayer-Platz': 'Landeshauptmann-Mayer-Platz',
'Manhartsbr.Straße': 'Manhartsbrunner Straße',
'Maria Lourd Weg': 'Maria-Lourd-Weg',
'U. Weißgasse Straße': 'Untere Weißgerberstraße',
}
def new(t: str, ids: Any, name: str, comment: str = None) -> None:
print(f'\x1B[1;32mNew {t:>6}: {str(ids):>10} ({name}{", " + comment if comment else ""})\x1B[0m', file=sys.stderr)
def success(mgnr: int, key: str, value) -> None:
if not QUIET:
print(f'\x1B[1;32m{mgnr:>6} : {key:<12} {value}\x1B[0m', file=sys.stderr)
def warning(mgnr: int, key: str, value, active: bool) -> None:
act = 'A' if active else '?' if active is None else ' '
print(f'\x1B[1;33m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def invalid(mgnr: int, key: str, value, active: Optional[bool]) -> None:
act = 'A' if active else '?' if active is None else ' '
print(f'\x1B[1;31m{mgnr:>6} ({act}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None:
if not QUIET:
print(f'\x1B[1m{lsnr_1:<15} -> {lsnr_2:<15}\x1B[0m', file=sys.stderr)
def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
print(f'\x1B[1;33m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
print(f'\x1B[1;31m{lsnr:<15} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
def convert(mgnr: int, key: str, old_value: str, new_value) -> None:
if not QUIET:
print(f'\x1B[1m{mgnr:>6} : {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str],
billing: Optional[str] = None) -> None:
if not QUIET:
print(f'\x1B[1m{mgnr:>6} : '
f'{" / ".join([e or "" for e in old_name])} -> '
f'{" / ".join([e or "" for e in new_name])}'
f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr)
def check_lfbis_nr(nr: str) -> bool:
# https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41
if len(nr) != 7 or not nr.isdigit():
return False
s = 0
for i, ch in enumerate(nr[:-1]):
s += int(ch) * (7 - i)
v = (11 - (s % 11)) % 10
return v == int(nr[-1])
def check_ustid_nr_at(nr: str) -> bool:
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
return False
s = 0
for i, ch in enumerate(nr[3:-1]):
s += sum(map(int, str(int(ch) * (i % 2 + 1))))
v = (96 - s) % 10
return v == int(nr[-1])
def modulo(a: str, b: int) -> int:
s = 0
for ch in a:
s = (s * 10 + int(ch)) % b
return s
def check_iban(iban: str) -> bool:
if not IBAN_RE.fullmatch(iban):
return False
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
return modulo(s, 97) == 1
def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]:
if nr is None:
return None
nr = nr.replace('/', ' ').strip()
if nr.count('-') > 1 or len(nr.split('-')[-1]) > 3:
nr = nr.replace('-', '')
if nr[0] == '0':
nr = '+43 ' + nr[1:]
elif WG == 'WKW' and ort:
ort = ort.upper().strip()
if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF',
'EIBESBRUNN'):
nr = f'+43 2245 {nr}'
elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'):
nr = f'+43 2944 {nr}'
elif ort in ('HADRES'):
nr = f'+43 2943 {nr}'
else:
print(nr, ort)
raise RuntimeError()
if nr.startswith('+43'):
if nr[4] == '6':
nr = nr.replace(' ', '')
nr = f'{nr[:3]} {nr[3:6]} {nr[6:]}'
elif nr[4] == '1':
nr = nr.replace(' ', '')
nr = f'{nr[:3]} {nr[3]} {nr[3:]}'
elif nr[4] == '2':
nr = nr.replace(' ', '')
nr = f'{nr[:3]} {nr[3:7]} {nr[7:]}'
return nr.strip()
def fix_street_name(name: str) -> str:
if name in STREET_NAMES:
return STREET_NAMES[name]
orte = [(k, v) for k, v in ORT_NAMES.items() if name.startswith(k + 'er')]
if (name.endswith('straße') or name.endswith('platz')) and len(orte) == 1:
return f'{orte[0][1] or orte[0][0] + "er"} {name[len(orte[0][0]) + 2:].title()}'.replace(' ', ' ')
return name
def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/')
if r.status_code != 200:
return None
data = r.json()
return sum([n['fl'] for n in data['properties']['nutzungen']])
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
members = {}
for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
if f['MGNR'] not in members:
members[f['MGNR']] = {}
members[f['MGNR']][f['FBNR']] = f
return members
def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]:
if plz is None or ort is None:
return None
ort = ort.replace('0', 'O').replace('SZ', 'SS')
if ort.upper() == 'PILLICHSDORF' and plz == 2212:
plz = 2211
elif ort.upper() == 'ENZERSFELD' and plz == 2203:
plz = 2202
elif ort.upper() == 'GROSSEBERSDORF' and plz == 2212:
ort = 'GROSSENGERSDORF'
elif ort.upper() == 'MÜNICHSTHAL' and plz == 2123:
plz = 2122
elif ort.upper() == 'FRAUENDORF' and plz == 3710:
plz = 3714
elif ort.upper() == 'MAISSAU' and plz == 3721:
ort = 'UNTERDÜRNBACH'
elif ort.upper() in ('KLEINRIEDENTHAL', 'KLEINHÖFLEIN', 'KLEIN HÖFLEIN') and plz == 2074:
plz = 2070
elif ort.upper() == 'DROSENDORF' and plz == 2095:
ort = 'DROSENDORF ALTSTADT'
elif ort.upper() == 'KLEINWEIKERSDORF' and plz == 2033:
plz = 2023
cur = DB_CNX.cursor()
cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,))
rows: List[Tuple[int, str, str]] = cur.fetchall()
cur.close()
ort_m = re.sub(r'\d+', '', ort).lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss')
rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
if len(rows_m) > 1:
rows_m = [r[0] for r in rows if ort_m == r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
if len(rows_m) == 1:
return plz * 100000 + rows_m[0]
if ort == 'VELM-GÖTZENDORF':
parts = address.split(' ')
street = ' '.join(parts[:-1])
nr = int(parts[-1].split('-')[0])
if street == 'Landstraße' and nr <= 48 \
or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \
or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)):
# Velm
return plz * 100000 + 3572
else:
# Götzendorf
return plz * 100000 + 3571
raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})')
def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
if okz is None:
return None
cur = DB_CNX.cursor()
cur.execute("SELECT kgnr FROM AT_ort WHERE okz = ?", (okz,))
rows: List[Tuple[int]] = cur.fetchall()
cur.close()
if len(rows) == 1:
return rows[0][0]
return None
def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
gem_name, hkid = None, None
if WG == 'MATZEN':
hkid = "'WLWV'"
if name.lower() == 'dörfles':
gem_name = 'Weikendorf'
elif name.lower() == 'velm-götzendorf':
return [(6027, 30859), (6007, 30859)]
elif name.lower() == 'grub':
name = 'Grub an der March'
elif WG == 'WKW':
hkid = "'WLWV', 'WIEN', 'WLWG'"
if name.endswith('*'):
name = name[:-1].strip()
if name.lower() == 'joching':
return [(12185, 31351)]
elif name.lower() == 'kreuttal':
return [(15206, 31627), (15221, 31627), (15226, 31627)]
elif name.lower() == 'hochleithen':
return [(15219, 31622), (15223, 31622), (15202, 31622)]
elif name.lower() == 'wolfpassing':
gem_name = 'Hochleithen'
elif name.lower() == 'seebarn':
gem_name = 'Harmannsdorf'
elif name.lower() == 'königsbrunn':
gem_name = 'Enzersfeld im Weinviertel'
elif name.lower() == 'wien':
return [(1616, 90001), (1617, 90001)]
elif name.lower() in ('sitzendorf', 'roseldorf', 'frauendorf'):
gem_name = 'Sitzendorf an der Schmida'
elif name.lower() == 'dietersdorf':
gem_name = 'Hollabrunn'
elif name.lower() == 'altenmarkt':
name = 'Altenmarkt im Thale'
elif name.lower() == 'eitzerstal':
name = 'Eitzersthal'
elif name.lower() == 'gross':
gem_name = 'Hollabrunn'
elif name.lower() == 'auggenthal':
name = 'Augenthal'
elif name.lower() == 'karlsdorf':
name = 'Pfaffendorf'
elif name.lower() == 'kleinhaugsdorf':
name = 'Augenthal'
elif name.lower() == 'merkersdorf':
gem_name = 'Hardegg'
elif name.lower() == 'retz':
name = 'Retz Altstadt'
elif name.lower() == 'heldenberg':
return [(9112, 31019), (9132, 31019), (9131, 31019), (9141, 31019), (9140, 31019)]
elif name.lower() == 'retzbach':
return [(18129, 31038), (18112, 31038), (18117, 31038)]
elif name.lower() == 'dietmannsdorf':
gem_name = 'Zellerndorf'
elif name.lower() == 'sierndorf':
gem_name = 'Sierndorf'
elif name.lower() == 'waltersdorf':
gem_name = 'Staatz'
elif name.lower() == 'viendorf':
name = 'Viendorf Weingebirge'
elif name.lower() == 'stoitzendorf':
return [(10137, 31105)]
elif name.lower() in ('klein reinprechtsdorf', 'unter nalb', 'klein stelzendorf', 'klein kirchberg'):
name = name.replace(' ', '')
elif name.lower() == 'drosendorf':
name = 'Drosendorf Stadt'
elif name.lower() == 'etzmannsdorf':
name = 'Etzmannsdorf bei Straning'
elif name.lower() == 'roggendorf':
gem_name = 'Röschitz'
elif name.lower() == 'wilhelmsdorf':
gem_name = 'Poysdorf'
cur = DB_CNX.cursor()
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name "
"FROM AT_kg k "
"JOIN AT_gem g ON g.gkz = k.gkz "
"JOIN wb_gem wg ON wg.gkz = g.gkz "
f"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid IN ({hkid})",
(name.replace('fliess', 'fließ').replace('ross', 'roß').replace('Gr.', 'Groß ')
.replace('Groß ', 'Groß').replace('-', ''),))
rows: List[Tuple[int, str, int, str]] = cur.fetchall()
cur.close()
if gem_name:
rows = [row for row in rows if row[3] == gem_name]
if len(rows) == 1:
return [(k, g) for k, _, g, _ in rows]
print(name, rows)
raise RuntimeError()
def lookup_kg_name(kgnr: int) -> str:
cur = DB_CNX.cursor()
cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,))
rows = cur.fetchall()
cur.close()
return rows[0][0] if len(rows) > 0 else None
def lookup_rnr_name(rnr: int) -> str:
return REED_MAP[rnr][2]
def lookup_hkid(kgnr: Optional[int], qualid: str) -> str:
hkid = None
if qualid in ('WEI', 'RSW'):
return 'OEST'
elif kgnr is None:
if WG in ('MATZEN', 'WKW'):
hkid = 'WLNO'
else:
cur = DB_CNX.cursor()
cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz "
"WHERE kg.kgnr = ?", (kgnr,))
rows = cur.fetchall()
cur.close()
hkid = rows[0][0]
if qualid == 'LDW':
if hkid == 'WIEN':
hkid = 'WLXX'
elif hkid[:2] in ('WL', 'BL', 'SL'):
hkid = hkid[:2] + 'XX'
return hkid
def guess_glnr(kgnr: int) -> Optional[int]:
cur = DB_CNX.cursor()
cur.execute("SELECT kgnr FROM AT_kg "
"WHERE gkz / 100 != 900 AND gkz / 100 = (SELECT gkz / 100 FROM AT_kg WHERE kgnr = ?)", (kgnr,))
rows0 = cur.fetchall()
cur.execute("SELECT kgnr FROM AT_kg "
"WHERE gkz / 100 != 900 AND gkz = (SELECT gkz FROM AT_kg WHERE kgnr = ?)", (kgnr,))
rows1 = cur.fetchall()
cur.close()
glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows0 if k in GROSSLAGE_KG_MAP]))
if len(glnrs) == 0:
return None
elif len(glnrs) == 1:
return glnrs[0]
glnrs = list(set([GROSSLAGE_KG_MAP[k] for k, in rows1 if k in GROSSLAGE_KG_MAP]))
return glnrs[0] if len(glnrs) > 0 else None
def migrate_gradation(in_dir: str, out_dir: str) -> None:
global GRADATION_MAP
GRADATION_MAP = {}
for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'):
GRADATION_MAP[g['Oechsle']] = g['KW']
def migrate_branches(in_dir: str, out_dir: str) -> None:
global BRANCH_MAP
BRANCH_MAP = {}
with utils.csv_open(f'{out_dir}/branch.csv') as f:
f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr', 'fax_nr', 'mobile_nr')
for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'):
BRANCH_MAP[b['ZNR']] = b['Kennbst']
address = b['Straße']
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
tel, mob = normalize_phone_nr(b['Telefon']), None
if tel and tel[4] == '6':
mob, tel = tel, None
f.row(b['Kennbst'], b['Name'].strip().title(), AUSTRIA, postal_dest, address,
tel, normalize_phone_nr(b['Telefax']), mob)
def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
global GROSSLAGE_MAP
GROSSLAGE_MAP = {}
glnr = 0
with utils.csv_open(f'{out_dir}/wb_gl.csv') as f:
f.header('glnr', 'name')
for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
glnr += 1
if WG == 'WKW' and gl['GLNR'] == 8:
GROSSLAGE_MAP[8] = 6
continue
GROSSLAGE_MAP[gl['GLNR']] = glnr
f.row(glnr, gl['Bezeichnung'])
def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
global GEM_MAP, GROSSLAGE_KG_MAP
GEM_MAP, GROSSLAGE_KG_MAP = {}, {}
inserted = set()
with utils.csv_open(f'{out_dir}/wb_kg.csv') as f:
f.header('kgnr', 'glnr')
for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'):
gems = lookup_gem_name(g['Bezeichnung'])
GEM_MAP[g['GNR']] = gems
for kgnr, gkz in gems:
if kgnr in inserted:
continue
inserted.add(kgnr)
glnr = GROSSLAGE_MAP[g['GLNR']]
GROSSLAGE_KG_MAP[kgnr] = glnr
f.row(kgnr, glnr)
def migrate_reeds(in_dir: str, out_dir: str) -> None:
global REED_MAP
REED_MAP = {}
with utils.csv_open(f'{out_dir}/wb_rd.csv') as f:
f.header('kgnr', 'rdnr', 'name')
for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'):
name: str = r['Bezeichnung'].strip()
if name.isupper() or name.islower():
name = name.title()
try:
gem = GEM_MAP[r['GNR']]
kgnr = gem[0][0]
if len(gem) != 1:
print(gem, name, '->', gem[0])
except KeyError:
print(f'Invalid GNR {r["GNR"]} for reed {name}')
continue
rdnr = max([n for k, n, _ in REED_MAP.values() if k == kgnr] or [0]) + 1
REED_MAP[r['RNR']] = (kgnr, rdnr, name)
f.row(kgnr, rdnr, name)
def migrate_attributes(in_dir: str, out_dir: str) -> None:
with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
f.header('attrid', 'name', 'max_kg_per_ha', 'active')
for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
if a['SANR'] is None:
continue
f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None, True)
if WG == 'MATZEN':
f.row('M', 'Matzen', None, False)
f.row('HU', 'Huber', None, False)
def migrate_cultivations(in_dir: str, out_dir: str) -> None:
global CULTIVATION_MAP
CULTIVATION_MAP = {}
with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f:
f.header('cultid', 'name')
for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
name: str = c['Bezeichnung']
cultid = name[0].upper()
if name.isupper():
cultid = name
elif 'biolog' in name.lower():
cultid = 'BIO'
CULTIVATION_MAP[c['BANR']] = cultid
f.row(cultid, name)
def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None:
with utils.csv_open(f'{out_dir}/area_commitment_type.csv') as f:
f.header('vtrgid', 'sortid', 'attrid_1', 'attrid_2', 'disc',
'min_kg_per_ha', 'max_kg_per_ha', 'penalty_amount')
for t in utils.csv_parse_dict(f'{in_dir}/TLiefermengen.csv'):
sortid: str = t['SNR']
if not sortid or sortid == 'SV':
continue
menge = int(t['ErwarteteLiefermengeProHa'])
f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, None,
menge, menge, None)
bio = []
if WG == 'MATZEN':
bio = ['GV', 'ZW', 'MT']
f.row('BM', 'BM', None, None, None, None, None, None)
elif WG == 'WKW':
bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU']
for sortid in bio:
f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None)
def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]:
letters = string.ascii_letters + 'äöüßÄÖÜẞ-'
double_names = ['eva maria', 'maria theresia']
def is_alpha(s: str) -> bool:
return all(c in letters for c in s) if s.lower() not in double_names else True
if WG == 'WKW':
if 'BEZIRKSBAUERNKAMMER' == family_name:
return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach'
elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'):
return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach'
elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN':
return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen'
if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \
len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name):
return None, given_name.title(), None, family_name.title(), None, None
prefix: Optional[str] = None
middle_names: Optional[str] = None
suffix: Optional[str] = None
billing_name: Optional[str] = None
if given_name.startswith('z.H. '):
billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR')
parts = given_name.split(' ')
given_name = parts[1]
family_name = parts[2]
given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ')
given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)',
lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name)
given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE)
titles = ''
def repl_title(m: re.Match) -> str:
nonlocal titles, suffix
t = m.group(1).lower().replace(' ', '').replace('.', '')
match t:
case 'jun': suffix = 'jun.'
case 'sen': suffix = 'sen.'
case 'dr': titles += 'Dr. '
case 'mag': titles += 'Mag. '
case 'ing': titles += 'Ing. '
case 'dipling': titles += 'Dipl.-Ing. '
case 'di': titles += 'Dipl.-Ing. '
case 'dkfm': titles += 'Dipl.-Kfm. '
case 'ökrat': titles += 'ÖkR '
case 'lkr': titles += 'ÖkR '
return ' '
title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE)
given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name))
family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name))
if titles:
prefix = titles.strip()
family_parts = family_name.split(' ')
last = family_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
family_name = ' '.join(family_parts[:-1])
if ' ' not in family_name and len(family_name) > 4:
family_name = family_name.title()
billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR')
if is_alpha(given_name):
return prefix, given_name.title(), middle_names, family_name, suffix, billing_name
given_parts = given_name.split(' ')
last = given_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
given_name = ' '.join(given_parts[:-1]).title()
family_name = family_name.title()
billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}'
return prefix, given_name, middle_names, family_name, suffix, billing_name
if ' ' in family_name or '.' in family_name:
if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'):
billing_name = family_name.title()
family_name = ' '.join(family_name.split(' ')[1:]).title()
elif family_name.lower().endswith('veltlinerhof'):
billing_name = ' '.join(family_name.split(' ')[::-1]).title()
family_name = ' '.join(family_name.split(' ')[:-1]).title()
elif 'u.' in family_name:
billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und '))
family_name = family_name.split(' ')[0].title()
else:
billing_name = family_name
family_name = family_name.split(' ')[-1].title()
if ' + ' in given_name:
parts = given_name.split(' + ')
family_name = family_name.title()
billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] +
f' {billing_name or family_name}')
given_name = parts[0].title()
else:
family_name = family_name.title()
given_name = given_name.title()
return prefix, given_name, middle_names, family_name, suffix, billing_name
def migrate_members(in_dir: str, out_dir: str) -> None:
global MEMBER_MAP
MEMBER_MAP = {}
members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')]
mgnrs = [m['MGNR'] for m in members]
fbs = parse_flaechenbindungen(in_dir)
with utils.csv_open(f'{out_dir}/member.csv') as f_m,\
utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \
utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel,\
utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg:
f_m.header(
'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'funktionär', 'active', 'deceased',
'iban', 'bic', 'country', 'postal_dest', 'address',
'email', 'default_kgnr', 'comment')
f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
f_tel.header('mgnr', 'nr', 'type', 'number', 'comment')
for m in members:
mgnr: int = m['MGNR']
family_name: str = m['Nachname']
given_name: str = m['Vorname']
funktionaer, deceased = False, False
if family_name is None and given_name is None:
continue
given_name = given_name or ''
if WG == 'MATZEN' and given_name.startswith(' '):
funktionaer = True
if WG == 'WKW' and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name):
deceased = True
family_name = family_name.replace('*', '').replace('(+)', '')
given_name = given_name.replace('*', '').replace('(+)', '')
family_name = utils.remove_spaces(family_name)
given_name = utils.remove_spaces(given_name).replace(', ', ',')
ret = normalize_name(family_name, given_name)
prefix, given_name, middle_names, family_name, suffix, billing_name = ret
n1 = utils.remove_spaces(' '.join(r or '' for r in ret))
n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or ''))
if billing_name or n1.lower() != n2.lower():
convert_name(mgnr, (m['Nachname'], m['Vorname']),
(prefix, given_name, middle_names, family_name, suffix), billing_name)
if not given_name or not family_name:
given_name = given_name or ''
family_name = family_name or ''
invalid(mgnr, 'Name', n1, active)
bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None
if bnr is not None:
bnr = bnr.replace('.', '')
if len(bnr) == 10:
bnr = bnr.removesuffix('000')
elif len(bnr) == 6:
bnr = '0' + bnr
elif bnr.endswith(' inaktiv'):
bnr = bnr.split(' ')[0]
if not check_lfbis_nr(bnr):
if bnr in ('0', '1234567'):
warning(mgnr, 'BetriebsNr.', bnr, active)
else:
invalid(mgnr, 'BetriebsNr.', bnr, active)
bnr = None
ustid_nr: Optional[str] = m['UID']
if ustid_nr is not None:
ustid_nr = ustid_nr.replace(' ', '')
if len(ustid_nr) == 8 and ustid_nr.isdigit():
ustid_nr = 'ATU' + ustid_nr
elif not USTID_NR_RE.fullmatch(ustid_nr):
invalid(mgnr, 'UID', ustid_nr, active)
ustid_nr = None
if ustid_nr and not check_ustid_nr_at(ustid_nr):
if ustid_nr == 'ATU11111111':
warning(mgnr, 'UID', ustid_nr, active)
else:
invalid(mgnr, 'UID', ustid_nr, active)
ustid_nr = None
iban: Optional[str] = m['IBAN']
bic: Optional[str] = m['BIC']
blz: Optional[int] = m['BLZ']
kto_nr: Optional[str] = m['KontoNr']
if iban is not None:
iban = iban.replace(' ', '')
if not check_iban(iban):
invalid(mgnr, 'IBAN', iban, active)
iban = None
if bic is not None:
bic = bic.upper()
if bic == 'RLNWATAUE':
bic = 'RLNWATWWAUE'
elif bic == 'RLNWATWMIB':
bic = 'RLNWATWWMIB'
if not BIC_RE.fullmatch(bic):
invalid(mgnr, 'BIC', bic, active)
bic = None
if bic is not None:
if len(bic) == 11 and bic.endswith('XXX'):
bic = bic[:-3]
plz = int(m['PLZ']) if m['PLZ'] else None
ort: Optional[str] = m['Ort']
address: Optional[str] = m['Straße']
parts = ort.split(' ') if ort else ['']
if parts[-1].isdigit() or (len(parts) > 1 and parts[-2].isdigit()):
if len(parts) > 1 and parts[-2].isdigit():
ort = ' '.join(parts[:-2])
new_address = parts[-2] + parts[-1]
else:
ort = ' '.join(parts[:-1])
new_address = parts[-1]
if address is not None and address != ' ' and address != new_address:
print(address, new_address)
raise RuntimeError()
address = parts[-1]
if WG == 'WKW' and ort == 'JETZELDORF':
ort = 'JETZELSDORF'
if ort:
ort = ort.upper().strip()
if address is not None:
address_old = address
address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(),
utils.remove_spaces(address).title())
if address.startswith('Haus Nr.') or \
address.startswith('Nr. ') or \
address.startswith('Nr ') or \
address.isdigit() or (len(address) > 1 and address[:-1].isdigit()):
address = ort.title() + ' ' + address.split(' ')[-1]
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
.replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
.replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\
.replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\
.replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß')
address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address)
if address.startswith('Ob. '):
address = address.replace('Ob. ', 'Obere ', 1)
address = address.replace(' Nr. ', ' ')
address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address)
address = utils.remove_spaces(address)
if address_old != address:
convert(mgnr, 'Adresse', address_old, address)
email: Optional[str] = m['EMail']
if email is not None:
if email.isupper():
email = email.lower()
if not EMAIL_RE.fullmatch(email):
invalid(mgnr, 'E-Mail', m['EMail'], active)
email = None
else:
parts = email.split('@')
email = f'{parts[0]}@{parts[1].lower()}'
zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0]
if WG == 'WKW' and plz == 1228:
plz = 1020
postal_dest = lookup_plz(plz, ort, address)
#if mgnr in fbs:
# gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020}
# if len(gems) == 1:
# print(GEM_MAP[list(gems)[0]])
okz = postal_dest % 100000 if postal_dest else None
kgnr = lookup_kgnr(okz)
active = m['Aktives Mitglied'] or False
if kgnr is None:
invalid(mgnr, 'KGNr.', ort, active)
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
glnr = guess_glnr(kgnr)
if glnr:
new('KG', kgnr, lookup_kg_name(kgnr), f'GL {glnr}')
f_kg.row(kgnr, glnr)
if 9999 not in GEM_MAP:
GEM_MAP[9999] = []
GEM_MAP[9999].append((kgnr, 0))
else:
kgnr = None
if postal_dest is None:
invalid(mgnr, 'PLZ', None, active)
continue
pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None
f_m.row(
mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
m['BHKontonummer'], zwstid, bnr, ustid_nr,
m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, deceased,
iban, bic, AUSTRIA, postal_dest, address or '-', email, kgnr, m['Anmerkung']
)
phone_1: Optional[str] = m['Telefon']
phone_2: Optional[str] = m['Telefax']
phone_3: Optional[str] = m['Mobiltelefon']
numbers = []
if WG == 'WKW':
# Telefax (phone_2) not used
numbers = {}
def add_number(nr: str, fax: bool = False, comment: str = None, fax_only: bool = False) -> None:
mob = nr[4] == '6'
numbers[nr] = {'mobile': mob, 'landline': not mob and not fax_only, 'fax': fax, 'comment': None}
if phone_1:
phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\
.replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ')
phone_1 = utils.remove_spaces(phone_1)
fax = False
if phone_1.endswith(' u. fax'):
fax = True
phone_1 = ' '.join(phone_1.split(' ')[:-2])
if phone_1.replace(' ', '').replace('/', '').replace('-', '').isdigit() and len(phone_1) <= 20:
if phone_1[0] != '0' and '/' in phone_1:
for nr in phone_1.split('/'):
add_number(normalize_phone_nr(nr, ort), fax)
else:
add_number(normalize_phone_nr(phone_1, ort), fax)
elif re.fullmatch(r'0[0-9/ -]+ od\. 0[0-9/ -]+', phone_1):
parts = phone_1.split(' od. ')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[1], ort), fax)
elif re.fullmatch(r'0[0-9/ -]+ od\. [1-9][0-9/ -]+', phone_1):
parts = phone_1.split(' od. ')
add_number(normalize_phone_nr(parts[0], ort), False)
if parts[0][1] == '6':
add_number(normalize_phone_nr(parts[1], ort), fax)
else:
add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), fax)
elif re.fullmatch(r'0[0-9/ -]+ fax 0[0-9/ -]+', phone_1):
parts = phone_1.split(' fax ')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[1], ort), True, fax_only=True)
elif re.fullmatch(r'0[0-9/ -]+ fax [1-9][0-9/ -]+', phone_1):
parts = phone_1.split(' fax ')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[0][:5] + parts[1], ort), True, fax_only=True)
elif '-' in phone_1 and phone_1.endswith('fax'):
nr = re.sub(r'-+ ', '-', phone_1)
nr = ' '.join(nr.split(' ')[:-1])
add_number(normalize_phone_nr(nr.split('-')[0], ort), False)
add_number(normalize_phone_nr(nr, ort), True, fax_only=True)
elif 'fax -' in phone_1:
parts = phone_1.split('fax')
add_number(normalize_phone_nr(parts[0], ort), False)
add_number(normalize_phone_nr(parts[0].strip() + parts[1].strip(), ort), True, fax_only=True)
elif phone_1.endswith('fax'):
nr = phone_1[:-3].strip()
add_number(normalize_phone_nr(nr), False)
add_number(normalize_phone_nr(nr), True, fax_only=True)
elif re.fullmatch(r'0[0-9/ -]+ u\. fax (od\. |u\. )?[0-9/ -]+', phone_1):
parts = phone_1.split(' ')
add_number(normalize_phone_nr(parts[0], ort), True)
nr = parts[-1]
if nr[0] == '0':
add_number(normalize_phone_nr(nr, ort))
else:
add_number(normalize_phone_nr(parts[0][:5] + nr, ort))
else:
parts = phone_1.split(' ')
if parts[-1].isalpha():
add_number(normalize_phone_nr(parts[0], ort), comment=parts[-1])
else:
for nr in parts:
add_number(normalize_phone_nr(nr, ort), fax)
if phone_3:
for nr in phone_3.split(','):
nr = nr.strip()
parts = nr.split(' ')
comment = None
if parts[-1].startswith('(') and parts[-1].endswith(')'):
nr = nr[:nr.rindex(' ')].strip()
comment = parts[-1][1:-1].strip()
elif parts[-1].isalpha():
nr = nr[:nr.rindex(' ')].strip()
comment = parts[-1].strip()
add_number(normalize_phone_nr(nr, ort), comment=comment)
count = 0
for nr, data in numbers.items():
if data['mobile']:
count += 1
f_tel.row(mgnr, count, 'mobile', nr, data['comment'])
if data['landline']:
count += 1
f_tel.row(mgnr, count, 'landline', nr, data['comment'])
if data['fax']:
count += 1
f_tel.row(mgnr, count, 'fax', nr, data['comment'])
else:
if phone_1:
phone_1 = normalize_phone_nr(phone_1)
if len(phone_1) <= 10 or phone_1[0] != '+':
invalid(mgnr, 'Tel.Nr.', m['Telefon'], active)
else:
numbers.append(phone_1)
if phone_1[4] == '6':
f_tel.row(mgnr, len(numbers), 'mobile', phone_1, None)
else:
f_tel.row(mgnr, len(numbers), 'landline', phone_1, None)
if phone_2:
phone_2 = normalize_phone_nr(phone_2)
if len(phone_2) <= 8 or phone_2[0] != '+':
invalid(mgnr, 'Fax.Nr.', m['Telefax'], active)
else:
numbers.append(phone_2)
if phone_2[4] == '6':
f_tel.row(mgnr, len(numbers), 'mobile', phone_2, None)
else:
f_tel.row(mgnr, len(numbers), 'fax', phone_2, None)
if phone_3:
phone_3 = normalize_phone_nr(phone_3)
if len(phone_3) <= 10 or phone_3[0] != '+':
invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon'], active)
elif phone_3 not in numbers:
numbers.append(phone_3)
if phone_3[4] == '6':
f_tel.row(mgnr, len(numbers), 'mobile', phone_3, None)
else:
f_tel.row(mgnr, len(numbers), 'landline', phone_3, None)
MEMBER_MAP[mgnr] = {
'default_kgnr': kgnr
}
if billing_name:
f_mba.row(mgnr, billing_name, AUSTRIA, postal_dest, address or '-')
def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]:
if nr_str is None:
return []
elif nr_str.isdigit() and len(nr_str) <= 6:
return [nr_str]
elif nr_str.count('/') == 1:
parts = nr_str.split('/')
if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3:
return [parts[0], parts[1]]
elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3:
return [nr_str]
if nr_str.count('/') > 1:
parts = nr_str.split('/')
if all([p.isdigit() for p in parts]):
if all([len(p) <= 1 for p in parts[1:]]):
return [f'{parts[0]}/{p}' for p in parts[1:]]
elif all([len(p) == len(parts[0]) for p in parts]):
return parts
if nr_str.startswith(f'{kgnr:05}'):
return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr)
if nr_str.endswith(' 2000'):
return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr)
parts = re.split(r' *[,;+&] *', nr_str)
if len(parts) == 1:
parts = nr_str.split(' / ')
if len(parts) == 1 and ' ' not in nr_str:
parts = nr_str.split(' ')
if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str:
parts = nr_str.split(' ')
if len(parts) > 1:
return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)]
m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str)
if m is not None:
b = m.group(1)
f = int(m.group(2))
t = int(m.group(3))
if t < f:
t += f - (f % pow(10, len(m.group(3))))
if t - f < 50:
return [
gst
for counter in range(f, t + 1)
for p in [f'{b or ""}{counter}']
for gst in parse_gstnrs(p, kgnr, mgnr)
]
invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}', None)
return []
def replace_nrs(m: re.Match, sep: str) -> str:
end = m.group(0).endswith(sep)
parts = [int(p) for p in m.group(0).split(sep)]
text = ''
last = None
for i, p in enumerate(parts):
if last is not None:
if last + 1 == p:
last = p
continue
else:
text += f'{last}{sep}'
last = None
if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]:
last = p
text += f'{p}-'
else:
text += f'{p}{sep}'
if last is not None:
text += str(last)
return text.strip().strip(sep) + (sep if end else '')
def format_gstnr(nrs: List[str]) -> Optional[str]:
if len(nrs) == 0:
return None
nrs = [re.sub(r'\b0+', '', nr)
for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr)
for nr in nrs])]
last = None
text = ''
for nr in nrs:
if last is None:
text += nr
elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]:
text += f'+{nr.split("/")[-1]}'
else:
text += f', {nr}'
last = nr
text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text)
text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text)
return text
reeds: Dict[int, Dict[int, str]] = {k: {r: n
for rk, r, n in REED_MAP.values() if rk == k}
for k in set([k for k, _, _ in REED_MAP.values()])}
new_reeds: Dict[Tuple[int, int], int] = {}
with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \
utils.csv_open(f'{out_dir}/wb_rd.csv', 'a+') as f_rd:
f_fb.header('fbnr', 'mgnr', 'vtrgid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr',
'year_from', 'year_to', 'comment')
for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
if (fb['Von'] is None and fb['Bis'] is None) or fb['GNR'] is None:
continue
parz: str = fb['Parzellennummer']
fbnr: int = fb['FBNR']
mgnr: int = fb['MGNR']
gem = GEM_MAP[fb['GNR']]
kgnrs = [kgnr for kgnr, gkz in gem]
rnr = fb['RNR']
rd_kgnr, rdnr, _ = REED_MAP.get(rnr, (None, None, None)) if rnr else (None, None, None)
if mgnr not in MEMBER_MAP:
continue
kgnr = None
if rd_kgnr is None:
kgnr = kgnrs[0]
elif rd_kgnr in kgnrs:
kgnr = rd_kgnr
elif (kgnrs[0], rnr) in new_reeds:
kgnr = kgnrs[0]
rdnr = new_reeds[(kgnr, rnr)]
else:
rname = lookup_rnr_name(rnr)
for k in kgnrs:
if k not in reeds:
continue
try:
pos = list(reeds[k].values()).index(rname)
r = list(reeds[k].keys())[pos]
kgnr = k
rdnr = r
new_reeds[(kgnr, rnr)] = rdnr
break
except ValueError:
continue
if kgnr is None:
kgnr = kgnrs[0]
rdnr = max([r for _, r, _ in REED_MAP.values() if k == kgnr] +
[r for (k, _), r in new_reeds.items() if k == kgnr]) + 1
f_rd.row(kgnr, rdnr, rname)
new_reeds[(kgnr, rnr)] = rdnr
new('Reed', (kgnr, rdnr), rname)
area = int(fb['Flaeche'])
if WG == 'MATZEN':
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
else:
gstnrs = []
comment, gstnr = None, None
if parz is None or parz == '0000':
if parz is not None:
invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None)
gstnrs = []
gstnr = '-'
if WG == 'MATZEN' and len(gstnrs) == 0:
comment = f'KG {kgnr or 0:05}: {parz}'
gstnr = format_gstnr(gstnrs) or gstnr or parz
if parz != gstnr.replace('+', '/'):
convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr)
to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
f_fb.row(fbnr, mgnr, fb['SNR'] + (fb['SANR'] or ''), CULTIVATION_MAP[fb['BANR'] or 1], area,
kgnr, gstnr, rdnr, fb['Von'], to, comment)
def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]:
dates = {}
fixed = {}
last_dates = {}
def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None:
if lsnr not in fixed:
fixed[lsnr] = []
dates[lsnr] = date
elif unique:
return add(lsnr + '/2', linr, date, unique)
fixed[lsnr].append(linr)
def get_lsnr(date: datetime.date, lsnr: str) -> str:
if date.year < 2000:
return date.strftime('%y%m%d00') + lsnr[8:]
else:
return date.strftime('%Y%m%d') + lsnr[8:]
deliveries: List[Tuple[int, str, datetime.date, int, int]] = [
(d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR'])
for d in deliveries
if d['Lieferscheinnummer'] and not d['Storniert']
]
lsnrs = {d[1] for d in deliveries}
for lnr, lsnr, date, zwstid, mgnr in deliveries:
if len(lsnr) < 8:
continue
if lsnr.startswith('22'):
lsnr = '20' + lsnr[2:]
lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \
else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6]))
lsnr_zwstid = lsnr[8]
if lsnr_zwstid != zwstid and lsnr_zwstid in BRANCH_MAP.values():
zwstid = lsnr_zwstid
if len(lsnr) == 12:
if date != lsdate:
if date.year == lsdate.year:
lsnr_n = get_lsnr(date, lsnr)
if lsnr_n not in lsnrs:
lsnr = lsnr_n
else:
warning_delivery(lsnr, mgnr, 'date', date)
else:
date = datetime.date(lsdate.year, date.month, date.day)
if zwstid not in last_dates or not date < last_dates[zwstid]:
last_dates[zwstid] = date
add(lsnr, lnr, date, unique=True)
else:
add(lsnr[:12], lnr, date)
return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()],
key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0])
def migrate_deliveries(in_dir: str, out_dir: str) -> None:
global DELIVERY_MAP, MODIFIER_MAP
DELIVERY_MAP, MODIFIER_MAP = {}, {}
modifiers = {
m['ASNR']: m
for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv')
if m['Bezeichnung'] and m['Bezeichnung'] != '-'
}
seasons = {}
branches = {}
for mod in modifiers.values():
name: str = mod['Bezeichnung'].replace('ausser', 'außer')
nr: int = mod['ASNR']
MODIFIER_MAP[name] = mod
if WG == 'MATZEN':
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS'
elif WG == 'WKW':
mod['id'] = {
1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG',
5: 'VT', 6: 'MV', 7: 'LW', 8: 'VL',
9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG',
}[nr]
else:
raise NotImplementedError()
deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
delivery_dict = {d['LINR']: d for d in deliveries}
fixed = fix_deliveries(deliveries)
updated_varieties = {}
with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \
utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \
utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr:
f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment')
f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr',
'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen',
'temperature', 'acid', 'scale_id', 'weighing_id', 'comment')
f_attr.header('year', 'did', 'dpnr', 'attrid')
for lsnr, linrs, date in fixed:
if date.year not in seasons:
seasons[date.year] = {
'currency': 'EUR' if date.year >= 2001 else 'ATS',
'precision': WGMASTER_PRECISION,
'start': date,
'end': date,
'nr': 0,
}
s = seasons[date.year]
if date > s['end']:
s['end'] = date
s['nr'] += 1
snr = s['nr']
mgnr = delivery_dict[linrs[0]]['MGNR']
znr = delivery_dict[linrs[0]]['ZNR']
glob_waage = set(delivery_dict[linr]['Waagentext'] for linr in linrs if delivery_dict[linr]['Waagentext'])
zwstid = lsnr[8]
if zwstid not in branches:
branches[zwstid] = {}
if date not in branches[zwstid]:
branches[zwstid][date] = 0
branches[zwstid][date] += 1
lnr = branches[zwstid][date]
if BRANCH_MAP[znr] != zwstid:
if zwstid not in BRANCH_MAP.values():
zwstid = BRANCH_MAP[znr]
comments = []
attributes = set()
for dpnr, linr in enumerate(linrs, start=1):
d = delivery_dict[linr]
DELIVERY_MAP[linr] = (date.year, snr, dpnr)
if lsnr != d['Lieferscheinnummer']:
renumber_delivery(d['Lieferscheinnummer'], lsnr)
oe = d['OechsleOriginal'] or d['Oechsle']
kmw = GRADATION_MAP[oe]
sortid = d['SNR'].upper()
if d['SANR']:
attributes.add(d['SANR'])
if len(sortid) != 2:
attributes.add(sortid[2:])
sortid = sortid[:2]
if WG == 'MATZEN':
if sortid == 'HU':
# Gr.Veltliner (Huber)
sortid = 'GV'
attributes.add('HU')
elif sortid == 'SV':
sortid = 'SW'
elif sortid == 'WC':
# WEIẞBURGUNDER/CHARDONNAY
sortid = 'SW'
if 'H' in attributes:
attributes.remove('H')
attributes.add('HK')
if 'W' in attributes:
attributes.remove('W')
elif WG == 'WKW':
if 'F' in attributes:
attributes.remove('F')
if d['SNR'] != sortid:
line = f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{",".join(list(attributes)) or None}'
if line not in updated_varieties:
updated_varieties[line] = 0
updated_varieties[line] += 1
qualid = QUAL_MAP[d['QSNR']]
if qualid != 'WEI' and d['Abgewertet']:
if qualid == 'RSW':
qualid = 'WEI'
else:
warning_delivery(lsnr, mgnr, 'qualid', f'{qualid} (abgewertet)')
kgnr, rdnr = None, None
if d['GNR']:
gem = GEM_MAP.get(d['GNR'], [])
if len(gem) == 1:
kgnr = gem[0][0]
if d['RNR']:
kgnr, rdnr, _ = REED_MAP[d['RNR']]
if kgnr is None:
m = MEMBER_MAP[mgnr]
kgnr = m['default_kgnr']
if kgnr is None:
pass
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
warning_delivery(lsnr, mgnr, 'KGNr.', kgnr)
kgnr = None
hkid = lookup_hkid(kgnr, qualid)
handwiegung = d['Handwiegung'] or False
waage = list(glob_waage)[0] if len(glob_waage) == 1 else d['Waagentext']
scale_id, weighing_id = None, None
if waage:
# Waagenr: 1 ID: 19
# Waagennummer: 1 Speichernummer: 9166
waage = re.split(r' +', waage)
scale_id = waage[1]
weighing_id = waage[3] if waage[2] == 'Speichernummer:' else f'{date}/{waage[3]}'
elif len(glob_waage) == 0 and not handwiegung:
handwiegung = True
comment: Optional[str] = d['Anmerkung']
acid = d['Säure']
hand, lesemaschine = None, None
if comment:
comment = comment.replace('Söure', 'Säure')
if comment.startswith('Säure'):
acid = float(comment.split(' ')[-1].replace(',', '.'))
comment = None
elif comment == 'Maschine':
hand = False
comment = None
elif comment == 'Hand':
hand = True
comment = None
if comment:
comments.append(comment)
gerebelt = True if WG == 'MATZEN' else d['Gerebelt'] or False
f_part.row(
date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr,
gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False,
hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment
)
for attrid in attributes:
f_attr.row(date.year, snr, dpnr, attrid)
f_delivery.row(date.year, snr, date, d['Uhrzeit'], zwstid, lnr, lsnr, mgnr, '; '.join(comments) or None)
for k, v in updated_varieties.items():
print(k + (f' ({v} times)' if v > 1 else ''))
with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
f_part_mod.header('year', 'did', 'dpnr', 'modid')
for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
if m['LINR'] not in DELIVERY_MAP or m['ASNR'] not in modifiers:
continue
y, did, dpnr = DELIVERY_MAP[m['LINR']]
f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id'])
with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
f_season.header('year', 'currency', 'precision', 'start_date', 'end_date')
f_mod.header('year', 'modid', 'ordering', 'name', 'abs', 'rel', 'standard', 'quick_select')
for y, s in seasons.items():
f_season.row(y, s['currency'], s['precision'], s['start'], s['end'])
for m in modifiers.values():
abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
rel_v = m['AZASProzent'] / 100.0 if m['AZASProzent'] is not None else None
f_mod.row(y, m['id'], m['ASNR'], m['Bezeichnung'], abs_v, rel_v,
m.get('Standard', False), m['Schnellauswahl'])
def migrate_payments(in_dir: str, out_dir: str) -> None:
variant_map: Dict[int, Tuple[int, int]] = {}
variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {}
year_map = {}
az_map = {}
p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv'))
sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])}
p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv'))
qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])}
with utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment:
f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time',
'bucket_1_name', 'bucket_2_name', 'bucket_3_name', 'comment', 'data')
for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'):
year = p['Lesejahr']
if year is None:
continue
if year not in year_map:
year_map[year] = 0
year_map[year] += 1
variant_map[p['AZNR']] = (year, year_map[year])
var = p.copy()
del var['AZNR']
del var['Datum']
del var['Beschreibung']
del var['Lesejahr']
del var['Titel']
del var['TeilzahlungNr']
data = {
'mode': 'wgmaster',
**var,
'AuszahlungSorten': {},
'AuszahlungSortenQualitätsstufe': {},
}
azs = data['AuszahlungSorten']
for s in sort_map.get(p['AZNR'], []):
del s['AZNR']
del s['ID']
if s['Oechsle'] is None:
continue
snr = s['SNR'].upper()
sanr = s['SANR'] or ''
azs[snr] = azs.get(snr, {})
azs[snr][sanr] = azs[snr].get(sanr, {})
geb = 'Gebunden' if s['gebunden'] else 'NichtGebunden'
azs[snr][sanr][geb] = azs[snr][sanr].get(geb, {})
azs[snr][sanr][geb][s['Oechsle']] = s['Betrag']
curves = []
for sortid, d1 in azs.items():
for attrid, d2 in d1.items():
for geb, d3 in d2.items():
oe = [d3.get(n, 0.0) for n in range(max(d3.keys()) + 1)]
if oe not in curves:
curves.append(oe)
azs[sortid][attrid][geb] = curves.index(oe)
for i, c in enumerate(curves):
n = {}
d = 0
for oe, p0, p1, p2 in zip(range(0, len(c) + 1), [0] + c, c, c[1:] + [c[len(c) - 1]]):
d1, d2 = round(p1 - p0, 4), round(p2 - p1, 4)
if d1 == d:
continue
d = d2
if p0 > 0:
n[f'{oe - 1}oe'] = p0
n[f'{oe}oe'] = p1
if c[len(c) - 1] > 0:
n[f'{len(c) - 1}oe'] = c[len(c) - 1]
keys = list(n.keys())
vals = list(n.values())
if len(n) >= 2 and vals[0] == vals[1]:
del n[keys[0]]
del n[keys[1]]
n = {keys[1]: vals[1], **n}
if len(n) == 1:
n = {'73oe': list(n.values())[0]}
curves[i] = n
azs['Kurven'] = curves
azq = data['AuszahlungSortenQualitätsstufe']
for q in qual_map.get(p['AZNR'], []):
del q['AZNR']
del q['ID']
qualid = QUAL_MAP[q['QSNR']]
snr = q['SNR']
sanr = q['SANR'] or ''
azq[qualid] = azq.get(qualid, {})
azq[qualid][snr] = azq[qualid].get(snr, {})
azq[qualid][snr][sanr] = q['Betrag']
for qualid, d1 in azq.items():
for sortid, d2 in d1.items():
if len(set(d2.values())) == 1:
azq[qualid][sortid] = list(d2.values())[0]
for qualid, d1 in azq.items():
try:
if len(set(d1.values())) == 1:
azq[qualid] = list(d1.values())[0]
except TypeError:
pass
for k, v in data.copy().items():
if v is None or (type(v) == bool and not v):
del data[k]
az_map[p['AZNR']] = data
test = (p['TeilzahlungNr'] == 7)
if not test:
if year not in variant_year_map:
variant_year_map[year] = []
variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr']))
f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None,
'Gebunden', 'Nicht gebunden', 'Abgewertet', p['Beschreibung'], json.dumps(data))
def get_modifiers(modifiers: str) -> Tuple[int, float]:
if modifiers is None or modifiers == '':
return 0, 0.0
a, r = 0, 0.0
for name in modifiers.split(' / '):
mod = MODIFIER_MAP[name]
if mod['AZASProzent']:
r += mod['AZASProzent'] / 100.0
if mod['AZAS']:
a += round(mod['AZAS'] * pow(10, WGMASTER_PRECISION))
return a, r
def get_prices(aznr: int, sortid: str, attribute: Optional[str], oe: int) -> Tuple[int, int, int]:
az = az_map[aznr]
qs = az['AuszahlungSortenQualitätsstufe']
so = az['AuszahlungSorten']
p1, p2, p3 = 0, 0, 0
if qs:
p3 = qs['WEI']
if type(p3) == dict:
p3 = p3[sortid]
if type(p3) == dict:
p3 = p3[attribute or '']
if sortid.upper() in so:
so = so[sortid.upper()][attribute or '']
p2 = so['NichtGebunden'][oe]
p1 = so['Gebunden'][oe] if 'Gebunden' in so else p2
prec = pow(10, WGMASTER_PRECISION)
return round(p1 * prec), round(p2 * prec), round(p3 * prec)
with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay:
f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'bucket_1', 'bucket_2', 'bucket_3', 'amount')
deliveries = {d['LINR']: d for d in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')}
for linr, (y, did, dpnr) in DELIVERY_MAP.items():
p = deliveries[linr]
if y not in variant_year_map:
continue
for aznr, avnr, tznr in variant_year_map[y]:
val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung']
val = round(val * pow(10, WGMASTER_PRECISION))
b1, b2, b3 = 0, 0, 0
# prices = get_prices(aznr, p['SNR'], p['SANR'], int(p['Oechsle']))
# mod = get_modifiers(p['BAbschlaegeString'])
# if not az_map[aznr].get('AbschlägeBerücksichtigen', False):
# mod = 0, 0.0
gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden'])
if QUAL_MAP[p['QSNR']] == 'WEI':
b3 += gew
else:
b2 += gew - geb_gew
b1 += geb_gew
# check_val = b1 * (prices[0] + mod[0]) + b2 * (prices[1] + mod[0]) + b3 * (prices[2] + mod[0])
# check_val *= 1 + mod[1]
# check_val = round(check_val / 100) * 100
# if check_val != val:
# print(p['LINR'], y, did, dpnr, avnr, val, check_val)
# else:
# print("GOOD")
f_del_pay.row(y, did, dpnr, avnr, b1, b2, b3, val)
def migrate_parameters(in_dir: str, out_dir: str) -> None:
params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')}
name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und').replace(' Im ', ' im ')
suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '')
types = {
'reggenmbh': 'reg. Gen.m.b.H.'
}
tokens = {
'MATZEN': ('WGM', 'WG Matzen'),
'WKW': ('WKW', 'WKW')
}.get(WG, (None, None))
ort = params['MANDANTENORT'].title()
new_params: Dict[str, Optional[str]] = {
'CLIENT_NAME_TOKEN': tokens[0],
'CLIENT_NAME_SHORT': tokens[1],
'CLIENT_NAME': name,
'CLIENT_NAME_SUFFIX': None,
'CLIENT_NAME_TYPE': types[suffix],
'CLIENT_PLZ': params['MANDANTENPLZ'],
'CLIENT_ORT': ort,
'CLIENT_ADDRESS': params['MANDANTENSTRASSE'],
'CLIENT_IBAN': None,
'CLIENT_BIC': None,
'CLIENT_USTIDNR': params['MANDANTENUID'].replace(' ', ''),
'CLIENT_LFBISNR': params['MANDANTENBETRIEBSNUMMER'],
'CLIENT_PHONE': normalize_phone_nr(params['MANDANTENTELEFON'], ort),
'CLIENT_FAX': normalize_phone_nr(params['MANDANTENTELEFAX'], ort),
'CLIENT_EMAIL': params['MANDANTENEMAIL'],
'CLIENT_WEBSITE': params.get('MANDANTENHOMEPAGE', None),
'DELIVERY_OBLIGATION': params.get('LIEFERPFLICHT/GA1', None),
'DELIVERY_RIGHT': params.get('LIEFERRECHT/GA1', None),
'VAT_NORMAL': '0.20',
'VAT_REDUCED': '0.10',
'VAT_FLATRATE': '0.13',
'DOCUMENT_SENDER': params.get('ABSENDERTEXT2', None),
'TEXT_DELIVERYNOTE': params.get('LIEFERSCHEINTEXT', None).replace(' daß ', ' dass ').replace('obige Angaben maßgeblicher Veränderungen', 'maßgeblichen Veränderungen obiger Angaben'),
}
with utils.csv_open(f'{out_dir}/client_parameter.csv') as f:
f.header('param', 'value')
for param, value in new_params.items():
f.row(param, value)
def main() -> None:
global DB_CNX, QUIET, WG
parser = argparse.ArgumentParser()
parser.add_argument('in_dir', type=str,
help='The input directory where the exported csv files are stored')
parser.add_argument('out_dir', type=str,
help='The output directory where the migrated csv file should be stored')
parser.add_argument('-q', '--quiet', action='store_true', default=False,
help='Be less verbose')
parser.add_argument('-d', '--database', metavar='DB', required=True,
help='The sqlite database file to look up information')
parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str,
choices=('MATZEN', 'WKW'))
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
QUIET = args.quiet
WG = args.genossenschaft
DB_CNX = sqlite3.connect(args.database)
migrate_gradation(args.in_dir, args.out_dir)
migrate_branches(args.in_dir, args.out_dir)
migrate_grosslagen(args.in_dir, args.out_dir)
migrate_gemeinden(args.in_dir, args.out_dir)
migrate_reeds(args.in_dir, args.out_dir)
migrate_attributes(args.in_dir, args.out_dir)
migrate_cultivations(args.in_dir, args.out_dir)
migrate_area_commitment_types(args.in_dir, args.out_dir)
migrate_members(args.in_dir, args.out_dir)
migrate_area_commitments(args.in_dir, args.out_dir)
migrate_deliveries(args.in_dir, args.out_dir)
migrate_payments(args.in_dir, args.out_dir)
migrate_parameters(args.in_dir, args.out_dir)
DB_CNX.close()
if __name__ == '__main__':
main()