diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index 0d2452b..36df526 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- from typing import Dict, Any, Tuple, Optional, List, Iterable +from enum import Enum import argparse import os import re @@ -15,9 +16,18 @@ import string import utils +class WG(Enum): + MATZEN = 1 + WKW = 2 + + @classmethod + def from_str(cls, name: str): + return cls({wg.name: wg.value for wg in WG}[name]) + + DB_CNX: Optional[sqlite3.Connection] = None QUIET: bool = False -WG: Optional[str] = None +CLIENT: Optional[WG] = None USTID_NR_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') @@ -230,7 +240,7 @@ def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]: nr = nr.replace('-', '') if nr[0] == '0': nr = '+43 ' + nr[1:] - elif WG == 'WKW' and ort: + elif CLIENT == WG.WKW and ort: ort = ort.upper().strip() if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', 'EIBESBRUNN'): @@ -348,7 +358,7 @@ def lookup_kgnr(okz: Optional[int]) -> Optional[int]: def lookup_gem_name(name: str) -> List[Tuple[int, int]]: gem_name, hkid = None, None - if WG == 'MATZEN': + if CLIENT == WG.MATZEN: hkid = "'WLWV'" if name.lower() == 'dörfles': gem_name = 'Weikendorf' @@ -356,7 +366,7 @@ def lookup_gem_name(name: str) -> List[Tuple[int, int]]: return [(6027, 30859), (6007, 30859)] elif name.lower() == 'grub': name = 'Grub an der March' - elif WG == 'WKW': + elif CLIENT == WG.WKW: hkid = "'WLWV', 'WIEN', 'WLWG'" if name.endswith('*'): name = name[:-1].strip() @@ -456,7 +466,7 @@ def lookup_hkid(kgnr: Optional[int], qualid: str) -> str: if qualid in ('WEI', 'RSW'): return 'OEST' elif kgnr is None: - if WG in ('MATZEN', 'WKW'): + if CLIENT in (WG.MATZEN, WG.WKW): hkid = 'WLNO' else: cur = DB_CNX.cursor() @@ -526,7 +536,7 @@ def migrate_grosslagen(in_dir: str, out_dir: str) -> None: f.header('glnr', 'name') for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): glnr += 1 - if WG == 'WKW' and gl['GLNR'] == 8: + if CLIENT == WG.WKW and gl['GLNR'] == 8: GROSSLAGE_MAP[8] = 6 continue GROSSLAGE_MAP[gl['GLNR']] = glnr @@ -584,7 +594,7 @@ def migrate_attributes(in_dir: str, out_dir: str) -> None: if a['SANR'] is None: continue f.row(a['SANR'], a['Attribut'], int(a['KgProHa']) if a['KgProHa'] is not None else None, True) - if WG == 'MATZEN': + if CLIENT == WG.MATZEN: f.row('M', 'Matzen', None, False) f.row('HU', 'Huber', None, False) @@ -618,10 +628,10 @@ def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None: f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, None, menge, menge, None) bio = [] - if WG == 'MATZEN': + if CLIENT == WG.MATZEN: bio = ['GV', 'ZW', 'MT'] f.row('BM', 'BM', None, None, None, None, None, None) - elif WG == 'WKW': + elif CLIENT == WG.WKW: bio = ['GV', 'ZW', 'WR', 'MT', 'RR', 'WB', 'CH', 'MU'] for sortid in bio: f.row(f'{sortid}B', sortid, 'B', None, None, None, None, None) @@ -634,7 +644,7 @@ def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Op def is_alpha(s: str) -> bool: return all(c in letters for c in s) if s.lower() not in double_names else True - if WG == 'WKW': + if CLIENT == WG.WKW: if 'BEZIRKSBAUERNKAMMER' == family_name: return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach' elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'): @@ -760,9 +770,9 @@ def migrate_members(in_dir: str, out_dir: str) -> None: continue given_name = given_name or '' - if WG == 'MATZEN' and given_name.startswith(' '): + if CLIENT == WG.MATZEN and given_name.startswith(' '): funktionaer = True - if WG == 'WKW' and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): + if CLIENT == WG.WKW and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): deceased = True family_name = family_name.replace('*', '').replace('(+)', '') given_name = given_name.replace('*', '').replace('(+)', '') @@ -853,7 +863,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: print(address, new_address) raise RuntimeError() address = parts[-1] - if WG == 'WKW' and ort == 'JETZELDORF': + if CLIENT == WG.WKW and ort == 'JETZELDORF': ort = 'JETZELSDORF' if ort: ort = ort.upper().strip() @@ -894,7 +904,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: email = f'{parts[0]}@{parts[1].lower()}' zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0] - if WG == 'WKW' and plz == 1228: + if CLIENT == WG.WKW and plz == 1228: plz = 1020 postal_dest = lookup_plz(plz, ort, address) @@ -937,7 +947,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: phone_3: Optional[str] = m['Mobiltelefon'] numbers = [] - if WG == 'WKW': + if CLIENT == WG.WKW: # Telefax (phone_2) not used numbers = {} @@ -1215,7 +1225,7 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: new('Reed', (kgnr, rdnr), rname) area = int(fb['Flaeche']) - if WG == 'MATZEN': + if CLIENT == WG.MATZEN: gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) else: gstnrs = [] @@ -1225,7 +1235,7 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: invalid(mgnr, 'GstNr.', f'{lookup_kg_name(kgnr)} {kgnr or 0:05}-{parz}', None) gstnrs = [] gstnr = '-' - if WG == 'MATZEN' and len(gstnrs) == 0: + if CLIENT == WG.MATZEN and len(gstnrs) == 0: comment = f'KG {kgnr or 0:05}: {parz}' gstnr = format_gstnr(gstnrs) or gstnr or parz if parz != gstnr.replace('+', '/'): @@ -1312,9 +1322,9 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: name: str = mod['Bezeichnung'].replace('ausser', 'außer') nr: int = mod['ASNR'] MODIFIER_MAP[name] = mod - if WG == 'MATZEN': + if CLIENT == WG.MATZEN: mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS' - elif WG == 'WKW': + elif CLIENT == WG.WKW: mod['id'] = { 1: 'KA', 2: 'LG', 3: 'MG', 4: 'SG', 5: 'VT', 6: 'MV', 7: 'LW', 8: 'VL', @@ -1385,7 +1395,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: attributes.add(sortid[2:]) sortid = sortid[:2] - if WG == 'MATZEN': + if CLIENT == WG.MATZEN: if sortid == 'HU': # Gr.Veltliner (Huber) sortid = 'GV' @@ -1400,7 +1410,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: attributes.add('HK') if 'W' in attributes: attributes.remove('W') - elif WG == 'WKW': + elif CLIENT == WG.WKW: if 'F' in attributes: attributes.remove('F') @@ -1463,7 +1473,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: if comment: comments.append(comment) - gerebelt = True if WG == 'MATZEN' else d['Gerebelt'] or False + gerebelt = True if CLIENT == WG.MATZEN else d['Gerebelt'] or False f_part.row( date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, qualid, hkid, kgnr, rdnr, gerebelt, handwiegung, d['Spaetlese-Ueberpruefung'] or False, @@ -1679,9 +1689,9 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None: 'reggenmbh': 'reg. Gen.m.b.H.' } tokens = { - 'MATZEN': ('WGM', 'WG Matzen'), - 'WKW': ('WKW', 'WKW') - }.get(WG, (None, None)) + WG.MATZEN: ('WGM', 'WG Matzen'), + WG.WKW: ('WKW', 'WKW') + }.get(CLIENT, (None, None)) ort = params['MANDANTENORT'].title() new_params: Dict[str, Optional[str]] = { @@ -1717,7 +1727,7 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None: def main() -> None: - global DB_CNX, QUIET, WG + global DB_CNX, QUIET, CLIENT parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, @@ -1729,13 +1739,13 @@ def main() -> None: parser.add_argument('-d', '--database', metavar='DB', required=True, help='The sqlite database file to look up information') parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str, - choices=('MATZEN', 'WKW')) + choices=[wg.name for wg in WG]) args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True) QUIET = args.quiet - WG = args.genossenschaft + CLIENT = WG.from_str(args.genossenschaft) DB_CNX = sqlite3.connect(args.database)