From 495ba4d7d2406fb1205f00bf0a57a2b8b9fe835a Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Fri, 18 Aug 2023 20:54:23 +0200 Subject: [PATCH] Update migrate.py for better name migrations --- sql/v01/10.create.sql | 1 + wgmaster/migrate.py | 196 +++++++++++++++++++++++++++--------------- wgmaster/utils.py | 5 ++ 3 files changed, 133 insertions(+), 69 deletions(-) diff --git a/sql/v01/10.create.sql b/sql/v01/10.create.sql index c5d0593..6d7e5e8 100644 --- a/sql/v01/10.create.sql +++ b/sql/v01/10.create.sql @@ -303,6 +303,7 @@ CREATE TABLE member ( buchführend INTEGER NOT NULL CHECK (buchführend IN (TRUE, FALSE)) DEFAULT FALSE, funktionär INTEGER NOT NULL CHECK (funktionär IN (TRUE, FALSE)) DEFAULT FALSE, active INTEGER NOT NULL CHECK (active IN (TRUE, FALSE)) DEFAULT TRUE, + deceased INTEGER NOT NULL CHECK (deceased IN (TRUE, FALSE)) DEFAULT FALSE, iban TEXT CHECK (iban REGEXP '^[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}$') DEFAULT NULL, bic TEXT CHECK (bic REGEXP '^[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$') DEFAULT NULL, diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index 9b8f3ca..66c679e 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -10,6 +10,7 @@ import sqlite3 import requests import datetime import json +import string import utils @@ -97,6 +98,7 @@ ORT_NAMES: Dict[str, Optional[str]] = { 'Wolkersdorf': None, 'Znaim': None, 'Obersdorf': None, + 'Sechshaus': None, } STREET_NAMES: Dict[str, str] = { @@ -606,6 +608,108 @@ def migrate_cultivations(in_dir: str, out_dir: str) -> None: f.row(cultid, name) +def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]: + letters = string.ascii_letters + 'äöüßÄÖÜẞ-' + double_names = ['eva maria', 'maria theresia'] + + def is_alpha(s: str) -> bool: + return all(c in letters for c in s) if s.lower() not in double_names else True + + if WG == 'GWK': + if 'BEZIRKSBAUERNKAMMER' == family_name: + return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach' + elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'): + return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach' + elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN': + return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen' + + if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \ + len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name): + return None, given_name.title(), None, family_name.title(), None, None + + prefix: Optional[str] = None + middle_names: Optional[str] = None + suffix: Optional[str] = None + billing_name: Optional[str] = None + + if given_name.startswith('z.H. '): + billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR') + parts = given_name.split(' ') + given_name = parts[1] + family_name = parts[2] + + given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ') + given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)', + lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name) + given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE) + + titles = '' + + def repl_title(m: re.Match) -> str: + nonlocal titles, suffix + t = m.group(1).lower().replace(' ', '').replace('.', '') + match t: + case 'jun': suffix = 'jun.' + case 'sen': suffix = 'sen.' + case 'dr': titles += 'Dr. ' + case 'mag': titles += 'Mag. ' + case 'ing': titles += 'Ing. ' + case 'dipling': titles += 'Dipl.-Ing. ' + case 'di': titles += 'Dipl.-Ing. ' + case 'dkfm': titles += 'Dipl.-Kfm. ' + case 'ökrat': titles += 'ÖkR ' + case 'lkr': titles += 'ÖkR ' + return ' ' + + title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE) + given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name)) + family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name)) + if titles: + prefix = titles.strip() + + family_parts = family_name.split(' ') + last = family_parts[-1].lower() + if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'): + family_name = ' '.join(family_parts[:-1]) + if ' ' not in family_name and len(family_name) > 4: + family_name = family_name.title() + billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR') + if is_alpha(given_name): + return prefix, given_name.title(), middle_names, family_name, suffix, billing_name + + given_parts = given_name.split(' ') + last = given_parts[-1].lower() + if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'): + given_name = ' '.join(given_parts[:-1]).title() + family_name = family_name.title() + billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}' + return prefix, given_name, middle_names, family_name, suffix, billing_name + + if ' ' in family_name or '.' in family_name: + if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'): + billing_name = family_name.title() + family_name = ' '.join(family_name.split(' ')[1:]).title() + elif family_name.lower().endswith('veltlinerhof'): + billing_name = ' '.join(family_name.split(' ')[::-1]).title() + family_name = ' '.join(family_name.split(' ')[:-1]).title() + elif 'u.' in family_name: + billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und ')) + family_name = family_name.split(' ')[0].title() + else: + billing_name = family_name + family_name = family_name.split(' ')[-1].title() + if ' + ' in given_name: + parts = given_name.split(' + ') + family_name = family_name.title() + billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] + + f' {billing_name or family_name}') + given_name = parts[0].title() + else: + family_name = family_name.title() + given_name = given_name.title() + return prefix, given_name, middle_names, family_name, suffix, billing_name + + def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} @@ -621,22 +725,17 @@ def migrate_members(in_dir: str, out_dir: str) -> None: f_m.header( 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', - 'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic', - 'country', 'postal_dest', 'address', + 'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'funktionär', 'active', 'deceased', + 'iban', 'bic', 'country', 'postal_dest', 'address', 'email', 'default_kgnr', 'comment') f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') for m in members: - # TODO GWK handle * in member name mgnr: int = m['MGNR'] family_name: str = m['Nachname'] given_name: str = m['Vorname'] - prefix: Optional[str] = None - middle_names: Optional[str] = None - suffix: Optional[str] = None - billing_name: Optional[str] = None - funktionaer = False + funktionaer, deceased = False, False if family_name is None and given_name is None: continue @@ -644,63 +743,25 @@ def migrate_members(in_dir: str, out_dir: str) -> None: given_name = given_name or '' if WG == 'MATZEN' and given_name.startswith(' '): funktionaer = True + if WG == 'GWK' and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name): + deceased = True + family_name = family_name.replace('*', '').replace('(+)', '') + given_name = given_name.replace('*', '').replace('(+)', '') - family_name = re.sub(r'\s+', ' ', family_name).strip() - given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',') - - if ' ' in family_name or '.' in family_name or ',' in family_name: - if family_name.endswith(' KG'): - parts = family_name.split(' ') - family_name = parts[0].title() - billing_name = f'{family_name} KG' + family_name = utils.remove_spaces(family_name) + given_name = utils.remove_spaces(given_name).replace(', ', ',') + ret = normalize_name(family_name, given_name) + prefix, given_name, middle_names, family_name, suffix, billing_name = ret + n1 = utils.remove_spaces(' '.join(r or '' for r in ret)) + n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or '')) + if billing_name or n1.lower() != n2.lower(): convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name) - elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': - if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): - parts = given_name.split(' ') - family_name = family_name.title() - billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}' - given_name = parts[0].title() - elif given_name.lower().endswith(' gesbr'): - family_name = family_name.title() - given_name = given_name.split(' ')[0].title() - billing_name = f'{family_name} {given_name} GesbR' - elif given_name.endswith(' KeG.'): - family_name = family_name.title() - given_name = given_name.split(' ')[0].title() - billing_name = f'{family_name} {given_name} KEG' - elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \ - given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'): - family_name = family_name.title() - parts = given_name.split(' ') - suffix = parts[-1].lower() - if suffix[-1] != '.': - suffix += '.' - given_name = parts[0].title() - elif ',' in given_name: - family_name = family_name.title() - parts = given_name.split(',') - given_name = parts[0].title() - prefix = ' '.join([p.title() for p in parts[1:]]) - elif given_name.endswith(' DI'): - family_name = family_name.title() - given_name = given_name.split(' ')[0].title() - prefix = 'Dipl.-Ing.' - elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \ - given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'): - family_name = family_name.title() - parts = given_name.split(' ') - given_name = parts[0].title() - prefix = parts[-1].title() - if prefix[-1] != '.': - prefix += '.' - - convert_name(mgnr, (m['Nachname'], m['Vorname']), - (prefix, given_name, middle_names, family_name, suffix), billing_name) - else: - family_name = family_name.title() - given_name = given_name.title() + if not given_name or not family_name: + given_name = given_name or '' + family_name = family_name or '' + invalid(mgnr, 'Name', n1) bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None if bnr is not None: @@ -738,9 +799,6 @@ def migrate_members(in_dir: str, out_dir: str) -> None: blz: Optional[int] = m['BLZ'] kto_nr: Optional[str] = m['KontoNr'] - if iban is None: - pass - if iban is not None: iban = iban.replace(' ', '') if not check_iban(iban): @@ -784,11 +842,11 @@ def migrate_members(in_dir: str, out_dir: str) -> None: if address is not None: address_old = address address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), - re.sub(r'\s+', ' ', address).strip().title()) + utils.remove_spaces(address).title()) if address.startswith('Haus Nr.') or \ address.startswith('Nr. ') or \ address.startswith('Nr ') or \ - address.isdigit(): + address.isdigit() or (len(address) > 1 and address[:-1].isdigit()): address = ort.title() + ' ' + address.split(' ')[-1] address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\ @@ -801,7 +859,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: address = address.replace('Ob. ', 'Obere ', 1) address = address.replace(' Nr. ', ' ') address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address) - address = re.sub(r'\s+', ' ', address).strip() + address = utils.remove_spaces(address) if address_old != address: convert(mgnr, 'Adresse', address_old, address) @@ -851,7 +909,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: mgnr, pred, prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid_nr, - m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, + m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, deceased, iban, bic, AUSTRIA, postal_dest, address or '-', email, kgnr, m['Anmerkung'] ) @@ -871,7 +929,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: if phone_1: phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\ .replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ') - phone_1 = re.sub(r'\s+', ' ', phone_1).strip() + phone_1 = utils.remove_spaces(phone_1) fax = False if phone_1.endswith(' u. fax'): fax = True diff --git a/wgmaster/utils.py b/wgmaster/utils.py index 37a2b49..9d1a665 100644 --- a/wgmaster/utils.py +++ b/wgmaster/utils.py @@ -7,6 +7,7 @@ import re import datetime import csv +RE_SPACES = re.compile(r'\s+') RE_INT = re.compile(r'-?[0-9]+') RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+') RE_STR_START = re.compile(r'.*,"[^"]*$') @@ -17,6 +18,10 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]: return re.match(pattern, value) is not None if value is not None else None +def remove_spaces(s: str) -> str: + return RE_SPACES.sub(' ', s).strip() + + def cast_value(value: str) -> Any: if value == '': return None