migrate.py: Use enum for client

This commit is contained in:
2023-09-05 14:06:01 +02:00
parent 865e49eaae
commit e533773a5e

View File

@ -2,6 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from typing import Dict, Any, Tuple, Optional, List, Iterable from typing import Dict, Any, Tuple, Optional, List, Iterable
from enum import Enum
import argparse import argparse
import os import os
import re import re
@ -15,9 +16,18 @@ import string
import utils import utils
class WG(Enum):
MATZEN = 1
WKW = 2
@classmethod
def from_str(cls, name: str):
return cls({wg.name: wg.value for wg in WG}[name])
DB_CNX: Optional[sqlite3.Connection] = None DB_CNX: Optional[sqlite3.Connection] = None
QUIET: bool = False QUIET: bool = False
WG: Optional[str] = None CLIENT: Optional[WG] = None
USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}')
BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
@ -230,7 +240,7 @@ def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]:
nr = nr.replace('-', '') nr = nr.replace('-', '')
if nr[0] == '0': if nr[0] == '0':
nr = '+43 ' + nr[1:] nr = '+43 ' + nr[1:]
elif WG == 'WKW' and ort: elif CLIENT == WG.WKW and ort:
ort = ort.upper().strip() ort = ort.upper().strip()
if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF',
'EIBESBRUNN'): 'EIBESBRUNN'):
@ -348,7 +358,7 @@ def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
def lookup_gem_name(name: str) -> List[Tuple[int, int]]: def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
gem_name, hkid = None, None gem_name, hkid = None, None
if WG == 'MATZEN': if CLIENT == WG.MATZEN:
hkid = "'WLWV'" hkid = "'WLWV'"
if name.lower() == 'dörfles': if name.lower() == 'dörfles':
gem_name = 'Weikendorf' gem_name = 'Weikendorf'
@ -356,7 +366,7 @@ def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
return [(6027, 30859), (6007, 30859)] return [(6027, 30859), (6007, 30859)]
elif name.lower() == 'grub': elif name.lower() == 'grub':
name = 'Grub an der March' name = 'Grub an der March'
elif WG == 'WKW': elif CLIENT == WG.WKW:
hkid = "'WLWV', 'WIEN', 'WLWG'" hkid = "'WLWV', 'WIEN', 'WLWG'"
if name.endswith('*'): if name.endswith('*'):
name = name[:-1].strip() name = name[:-1].strip()
@ -456,7 +466,7 @@ def lookup_hkid(kgnr: Optional[int], qualid: str) -> str:
if qualid in ('WEI', 'RSW'): if qualid in ('WEI', 'RSW'):
return 'OEST' return 'OEST'
elif kgnr is None: elif kgnr is None:
if WG in ('MATZEN', 'WKW'): if CLIENT in (WG.MATZEN, WG.WKW):
hkid = 'WLNO' hkid = 'WLNO'
else: else:
cur = DB_CNX.cursor() cur = DB_CNX.cursor()
@ -526,7 +536,7 @@ def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
f.header('glnr', 'name') f.header('glnr', 'name')
for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
glnr += 1 glnr += 1
if WG == 'WKW' and gl['GLNR'] == 8: if CLIENT == WG.WKW and gl['GLNR'] == 8:
GROSSLAGE_MAP[8] = 6 GROSSLAGE_MAP[8] = 6
continue continue
GROSSLAGE_MAP[gl['GLNR']] = glnr GROSSLAGE_MAP[gl['GLNR']] = glnr
@ -584,7 +594,7 @@ def migrate_attributes(in_dir: str, out_dir: str) -> None:
if a['SANR'] is None: if a['SANR'] is None:
continue continue
f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None, True) f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None, True)
if WG == 'MATZEN': if CLIENT == WG.MATZEN:
f.row('M', 'Matzen', None, False) f.row('M', 'Matzen', None, False)
f.row('HU', 'Huber', None, False) f.row('HU', 'Huber', None, False)
@ -618,10 +628,10 @@ def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None:
f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, None, f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, None,
menge, menge, None) menge, menge, None)
bio = [] bio = []
if WG == 'MATZEN': if CLIENT == WG.MATZEN:
bio = ['GV', 'ZW', 'MT'] bio = ['GV', 'ZW', 'MT']
f.row('BM', 'BM', None, None, None, None, None, None) f.row('BM', 'BM', None, None, None, None, None, None)
elif WG == 'WKW': elif CLIENT == WG.WKW:
bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU'] bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU']
for sortid in bio: for sortid in bio:
f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None) f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None)
@ -634,7 +644,7 @@ def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Op
def is_alpha(s: str) -> bool: def is_alpha(s: str) -> bool:
return all(c in letters for c in s) if s.lower() not in double_names else True return all(c in letters for c in s) if s.lower() not in double_names else True
if WG == 'WKW': if CLIENT == WG.WKW:
if 'BEZIRKSBAUERNKAMMER' == family_name: if 'BEZIRKSBAUERNKAMMER' == family_name:
return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach' return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach'
elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'): elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'):
@ -760,9 +770,9 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
continue continue
given_name = given_name or '' given_name = given_name or ''
if WG == 'MATZEN' and given_name.startswith(' '): if CLIENT == WG.MATZEN and given_name.startswith(' '):
funktionaer = True funktionaer = True
if WG == 'WKW' and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): if CLIENT == WG.WKW and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name):
deceased = True deceased = True
family_name = family_name.replace('*', '').replace('(+)', '') family_name = family_name.replace('*', '').replace('(+)', '')
given_name = given_name.replace('*', '').replace('(+)', '') given_name = given_name.replace('*', '').replace('(+)', '')
@ -853,7 +863,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
print(address, new_address) print(address, new_address)
raise RuntimeError() raise RuntimeError()
address = parts[-1] address = parts[-1]
if WG == 'WKW' and ort == 'JETZELDORF': if CLIENT == WG.WKW and ort == 'JETZELDORF':
ort = 'JETZELSDORF' ort = 'JETZELSDORF'
if ort: if ort:
ort = ort.upper().strip() ort = ort.upper().strip()
@ -894,7 +904,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
email = f'{parts[0]}@{parts[1].lower()}' email = f'{parts[0]}@{parts[1].lower()}'
zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0]
if WG == 'WKW' and plz == 1228: if CLIENT == WG.WKW and plz == 1228:
plz = 1020 plz = 1020
postal_dest = lookup_plz(plz, ort, address) postal_dest = lookup_plz(plz, ort, address)
@ -937,7 +947,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
phone_3: Optional[str] = m['Mobiltelefon'] phone_3: Optional[str] = m['Mobiltelefon']
numbers = [] numbers = []
if WG == 'WKW': if CLIENT == WG.WKW:
# Telefax (phone_2) not used # Telefax (phone_2) not used
numbers = {} numbers = {}
@ -1215,7 +1225,7 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
new('Reed', (kgnr, rdnr), rname) new('Reed', (kgnr, rdnr), rname)
area = int(fb['Flaeche']) area = int(fb['Flaeche'])
if WG == 'MATZEN': if CLIENT == WG.MATZEN:
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
else: else:
gstnrs = [] gstnrs = []
@ -1225,7 +1235,7 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None) invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None)
gstnrs = [] gstnrs = []
gstnr = '-' gstnr = '-'
if WG == 'MATZEN' and len(gstnrs) == 0: if CLIENT == WG.MATZEN and len(gstnrs) == 0:
comment = f'KG {kgnr or 0:05}: {parz}' comment = f'KG {kgnr or 0:05}: {parz}'
gstnr = format_gstnr(gstnrs) or gstnr or parz gstnr = format_gstnr(gstnrs) or gstnr or parz
if parz != gstnr.replace('+', '/'): if parz != gstnr.replace('+', '/'):
@ -1312,9 +1322,9 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
name: str = mod['Bezeichnung'].replace('ausser', 'außer') name: str = mod['Bezeichnung'].replace('ausser', 'außer')
nr: int = mod['ASNR'] nr: int = mod['ASNR']
MODIFIER_MAP[name] = mod MODIFIER_MAP[name] = mod
if WG == 'MATZEN': if CLIENT == WG.MATZEN:
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS'
elif WG == 'WKW': elif CLIENT == WG.WKW:
mod['id'] = { mod['id'] = {
1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG', 1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG',
5: 'VT', 6: 'MV', 7: 'LW', 8: 'VL', 5: 'VT', 6: 'MV', 7: 'LW', 8: 'VL',
@ -1385,7 +1395,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
attributes.add(sortid[2:]) attributes.add(sortid[2:])
sortid = sortid[:2] sortid = sortid[:2]
if WG == 'MATZEN': if CLIENT == WG.MATZEN:
if sortid == 'HU': if sortid == 'HU':
# Gr.Veltliner (Huber) # Gr.Veltliner (Huber)
sortid = 'GV' sortid = 'GV'
@ -1400,7 +1410,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
attributes.add('HK') attributes.add('HK')
if 'W' in attributes: if 'W' in attributes:
attributes.remove('W') attributes.remove('W')
elif WG == 'WKW': elif CLIENT == WG.WKW:
if 'F' in attributes: if 'F' in attributes:
attributes.remove('F') attributes.remove('F')
@ -1463,7 +1473,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
if comment: if comment:
comments.append(comment) comments.append(comment)
gerebelt = True if WG == 'MATZEN' else d['Gerebelt'] or False gerebelt = True if CLIENT == WG.MATZEN else d['Gerebelt'] or False
f_part.row( f_part.row(
date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr, date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr,
gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False, gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False,
@ -1679,9 +1689,9 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None:
'reggenmbh': 'reg. Gen.m.b.H.' 'reggenmbh': 'reg. Gen.m.b.H.'
} }
tokens = { tokens = {
'MATZEN': ('WGM', 'WG Matzen'), WG.MATZEN: ('WGM', 'WG Matzen'),
'WKW': ('WKW', 'WKW') WG.WKW: ('WKW', 'WKW')
}.get(WG, (None, None)) }.get(CLIENT, (None, None))
ort = params['MANDANTENORT'].title() ort = params['MANDANTENORT'].title()
new_params: Dict[str, Optional[str]] = { new_params: Dict[str, Optional[str]] = {
@ -1717,7 +1727,7 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None:
def main() -> None: def main() -> None:
global DB_CNX, QUIET, WG global DB_CNX, QUIET, CLIENT
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('in_dir', type=str, parser.add_argument('in_dir', type=str,
@ -1729,13 +1739,13 @@ def main() -> None:
parser.add_argument('-d', '--database', metavar='DB', required=True, parser.add_argument('-d', '--database', metavar='DB', required=True,
help='The sqlite database file to look up information') help='The sqlite database file to look up information')
parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str, parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str,
choices=('MATZEN', 'WKW')) choices=[wg.name for wg in WG])
args = parser.parse_args() args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True) os.makedirs(args.out_dir, exist_ok=True)
QUIET = args.quiet QUIET = args.quiet
WG = args.genossenschaft CLIENT = WG.from_str(args.genossenschaft)
DB_CNX = sqlite3.connect(args.database) DB_CNX = sqlite3.connect(args.database)