From 12f6c01c5237c31a11438f6bf1ff1054d9db6480 Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Mon, 27 Feb 2023 19:23:15 +0100 Subject: [PATCH] Small fixes in migrate.py and do not generate IBANs automatically (ISO 13616) --- wgmaster/migrate.py | 100 ++++++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 51 deletions(-) diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index cfd1613..aeb7acc 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -12,10 +12,10 @@ import requests DB_CNX: Optional[sqlite3.Connection] = None -USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}') -BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') -IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') -EMAIL_RE = re.compile('[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}') +USTID_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}') +BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?') +IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}') +EMAIL_RE = re.compile(r'[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}') CULTIVATION_MAP: Optional[Dict[int, str]] = {1: 'N', 2: 'KIP', 3: 'BIO'} @@ -61,10 +61,10 @@ STREET_NAMES: Dict[str, str] = { def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: - def parse_line(l: str) -> Iterator[str]: + def parse_line(line_str: str) -> Iterator[str]: w = None s = False - for ch in l: + for ch in line_str: if w is None: if ch == ';': yield '' @@ -103,7 +103,7 @@ def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: part = False elif part.isdigit(): part = int(part) - elif re.match('[0-9]+\.[0-9]+', part): + elif re.match(r'[0-9]+\.[0-9]+', part): part = float(part) elif len(part) == 10 and part[4] == '-' and part[7] == '-': part = datetime.datetime.strptime(part, '%Y-%m-%d').date() @@ -113,17 +113,17 @@ def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: yield obj -def format_row(*args) -> str: +def format_row(*values) -> str: row = '' - for arg in args: - if arg is None: + for val in values: + if val is None: pass - elif type(arg) == str: - row += f'"{arg}"' - elif type(arg) == bool: - row += 'T' if arg else 'F' + elif type(val) == str: + row += f'"{val}"' + elif type(val) == bool: + row += 'T' if val else 'F' else: - row += str(arg) + row += str(val) row += ';' return f'{row[:-1]}\n' @@ -146,7 +146,8 @@ def convert(mgnr: int, key: str, old_value: str, new_value: str) -> None: print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) -def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: +def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], + billing: Optional[str] = None) -> None: if not args.quiet: print(f'\x1B[1m{mgnr:>6}: ' f'{" / ".join([e or "" for e in old_name])} -> ' @@ -164,6 +165,7 @@ def check_lfbis_nr(nr: str) -> bool: v = (11 - (s % 11)) % 10 return v == int(nr[-1]) + def check_ustid_at(nr: str) -> bool: # http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit(): @@ -175,26 +177,18 @@ def check_ustid_at(nr: str) -> bool: return v == int(nr[-1]) -def iban_checksum(iban: str) -> int: - if not IBAN_RE.fullmatch(iban): - raise RuntimeError() - s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) - v = 98 - (int(s) % 97) - return v +def modulo(a: str, b: int) -> int: + s = 0 + for ch in a: + s = (s * 10 + int(ch)) % b + return s def check_iban(iban: str) -> bool: if not IBAN_RE.fullmatch(iban): return False - return iban_checksum(iban) == 97 - - -def generate_iban_at(blz: int, ktonr: str) -> str: - if blz > 99999 or len(ktonr) > 11: - raise RuntimeError() - iban = f'AT00{blz:>05}{ktonr:>011}' - s = iban_checksum(iban) - return iban.replace('00', f'{s:02}', 1) + s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4])) + return modulo(s, 97) == 1 def normalize_phone_nr(nr: str) -> str: @@ -277,7 +271,10 @@ def lookup_gem_name(name: str) -> List[Tuple[int, int]]: name = 'Grub an der March' cur = DB_CNX.cursor() - cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name FROM AT_kg k JOIN AT_gem g ON g.gkz = k.gkz JOIN wb_gem wg ON wg.gkz = g.gkz " + cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name " + "FROM AT_kg k " + "JOIN AT_gem g ON g.gkz = k.gkz " + "JOIN wb_gem wg ON wg.gkz = g.gkz " "WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid = 'WLWV'", (name.replace('Gr.', 'Groß ').replace('Groß ', 'Groß').replace('-', ''),)) rows: List[Tuple[int, str, int, str]] = cur.fetchall() @@ -322,7 +319,7 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None: if name.isupper(): name = name.title() - gem = GEM_MAP[r['GNR']] + gem = GEM_MAP[r['GNR']] kgnr = gem[0][0] if len(gem) != 1: print(gem, name, '->', gem[0]) @@ -336,7 +333,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: members = parse_csv(f'{in_dir}/TMitglieder.csv') fbs = parse_flaechenbindungen(in_dir) - with open(f'{out_dir}/member.csv', 'w+') as f_m,open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba: + with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba: f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' 'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' 'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' @@ -358,8 +355,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: continue given_name = given_name or '' - family_name = re.sub('\s+', ' ', family_name).strip() - given_name = re.sub('\s+', ' ', given_name).strip().replace(', ', ',') + family_name = re.sub(r'\s+', ' ', family_name).strip() + given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',') if ' ' in family_name or '.' in family_name or ',' in family_name: if family_name.endswith(' KG'): @@ -367,9 +364,10 @@ def migrate_members(in_dir: str, out_dir: str) -> None: family_name = parts[0].title() billing_name = f'{family_name} KG' - convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) + convert_name(mgnr, (m['Nachname'], m['Vorname']), + (prefix, given_name, middle_names, family_name, suffix), billing_name) elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': - if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): + if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): parts = given_name.split(' ') family_name = family_name.title() billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' @@ -382,7 +380,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: family_name = family_name.title() given_name = given_name.split(' ')[0].title() billing_name = f'{family_name} {given_name} KEG' - elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): + elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \ + given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): family_name = family_name.title() parts = given_name.split(' ') suffix = parts[-1].lower() @@ -398,7 +397,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: family_name = family_name.title() given_name = given_name.split(' ')[0].title() prefix = 'Dipl.-Ing.' - elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): + elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \ + given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): family_name = family_name.title() parts = given_name.split(' ') given_name = parts[0].title() @@ -406,7 +406,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: if prefix[-1] != '.': prefix += '.' - convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) + convert_name(mgnr, (m['Nachname'], m['Vorname']), + (prefix, given_name, middle_names, family_name, suffix), billing_name) else: family_name = family_name.title() given_name = given_name.title() @@ -425,7 +426,6 @@ def migrate_members(in_dir: str, out_dir: str) -> None: invalid(mgnr, 'BetriebsNr.', bnr) bnr = None - ustid: Optional[str] = m['UID'] if ustid is not None: ustid = ustid.replace(' ', '') @@ -444,7 +444,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: iban: Optional[str] = m['IBAN'] bic: Optional[str] = m['BIC'] blz: Optional[int] = m['BLZ'] - ktonr: Optional[str] = m['KontoNr'] + kto_nr: Optional[str] = m['KontoNr'] if iban is None: pass @@ -455,10 +455,6 @@ def migrate_members(in_dir: str, out_dir: str) -> None: invalid(mgnr, 'IBAN', iban) iban = None - if iban is None and blz and ktonr: - iban = generate_iban_at(blz, re.sub('[. -]', '', ktonr)) - success(mgnr, 'IBAN', f'{iban} ({blz}, {ktonr})') - if bic is not None: bic = bic.upper() if bic == 'RLNWATAUE': @@ -474,7 +470,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: address: Optional[str] = m['Straße'] if address is not None: address_old = address - address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), re.sub('\s+', ' ', address).strip().title()) + address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), + re.sub(r'\s+', ' ', address).strip().title()) address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße')\ .replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\ @@ -487,7 +484,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None: elif address.startswith('Ob. '): address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') - address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address) + address = re.sub(r'([^0-9]+?)( [0-9])', + lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) @@ -594,8 +592,8 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None: if t - f < 50: return [ gst - for i in range(f, t + 1) - for p in [f'{b or ""}{i}'] + for counter in range(f, t + 1) + for p in [f'{b or ""}{counter}'] for gst in parse_gstnrs(p, kgnr, mgnr) ]