Update migrate.py for better name migrations

This commit is contained in:
2023-08-18 20:54:23 +02:00
parent af94afaa62
commit 495ba4d7d2
3 changed files with 133 additions and 69 deletions

View File

@ -303,6 +303,7 @@ CREATE TABLE member (
buchführend INTEGER NOT NULL CHECK (buchführend IN (TRUE, FALSE)) DEFAULT FALSE, buchführend INTEGER NOT NULL CHECK (buchführend IN (TRUE, FALSE)) DEFAULT FALSE,
funktionär INTEGER NOT NULL CHECK (funktionär IN (TRUE, FALSE)) DEFAULT FALSE, funktionär INTEGER NOT NULL CHECK (funktionär IN (TRUE, FALSE)) DEFAULT FALSE,
active INTEGER NOT NULL CHECK (active IN (TRUE, FALSE)) DEFAULT TRUE, active INTEGER NOT NULL CHECK (active IN (TRUE, FALSE)) DEFAULT TRUE,
deceased INTEGER NOT NULL CHECK (deceased IN (TRUE, FALSE)) DEFAULT FALSE,
iban TEXT CHECK (iban REGEXP '^[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}$') DEFAULT NULL, iban TEXT CHECK (iban REGEXP '^[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}$') DEFAULT NULL,
bic TEXT CHECK (bic REGEXP '^[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$') DEFAULT NULL, bic TEXT CHECK (bic REGEXP '^[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?$') DEFAULT NULL,

View File

@ -10,6 +10,7 @@ import sqlite3
import requests import requests
import datetime import datetime
import json import json
import string
import utils import utils
@ -97,6 +98,7 @@ ORT_NAMES: Dict[str, Optional[str]] = {
'Wolkersdorf': None, 'Wolkersdorf': None,
'Znaim': None, 'Znaim': None,
'Obersdorf': None, 'Obersdorf': None,
'Sechshaus': None,
} }
STREET_NAMES: Dict[str, str] = { STREET_NAMES: Dict[str, str] = {
@ -606,6 +608,108 @@ def migrate_cultivations(in_dir: str, out_dir: str) -> None:
f.row(cultid, name) f.row(cultid, name)
def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]:
letters = string.ascii_letters + 'äöüßÄÖÜẞ-'
double_names = ['eva maria', 'maria theresia']
def is_alpha(s: str) -> bool:
return all(c in letters for c in s) if s.lower() not in double_names else True
if WG == 'GWK':
if 'BEZIRKSBAUERNKAMMER' == family_name:
return None, None, None, None, None, 'Bezirksbauernkammer Mistelbach'
elif (given_name, family_name) == ('LANDW.', 'FACHSCHULE'):
return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach'
elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN':
return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen'
if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \
len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name):
return None, given_name.title(), None, family_name.title(), None, None
prefix: Optional[str] = None
middle_names: Optional[str] = None
suffix: Optional[str] = None
billing_name: Optional[str] = None
if given_name.startswith('z.H. '):
billing_name = family_name.replace('AGRAR', 'Agrar').replace('GESBR', 'GesbR')
parts = given_name.split(' ')
given_name = parts[1]
family_name = parts[2]
given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ')
given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)',
lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name)
given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE)
titles = ''
def repl_title(m: re.Match) -> str:
nonlocal titles, suffix
t = m.group(1).lower().replace(' ', '').replace('.', '')
match t:
case 'jun': suffix = 'jun.'
case 'sen': suffix = 'sen.'
case 'dr': titles += 'Dr. '
case 'mag': titles += 'Mag. '
case 'ing': titles += 'Ing. '
case 'dipling': titles += 'Dipl.-Ing. '
case 'di': titles += 'Dipl.-Ing. '
case 'dkfm': titles += 'Dipl.-Kfm. '
case 'ökrat': titles += 'ÖkR '
case 'lkr': titles += 'ÖkR '
return ' '
title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE)
given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name))
family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name))
if titles:
prefix = titles.strip()
family_parts = family_name.split(' ')
last = family_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
family_name = ' '.join(family_parts[:-1])
if ' ' not in family_name and len(family_name) > 4:
family_name = family_name.title()
billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR')
if is_alpha(given_name):
return prefix, given_name.title(), middle_names, family_name, suffix, billing_name
given_parts = given_name.split(' ')
last = given_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
given_name = ' '.join(given_parts[:-1]).title()
family_name = family_name.title()
billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}'
return prefix, given_name, middle_names, family_name, suffix, billing_name
if ' ' in family_name or '.' in family_name:
if family_name.lower().startswith('weingut') or family_name.lower().startswith('weinbau'):
billing_name = family_name.title()
family_name = ' '.join(family_name.split(' ')[1:]).title()
elif family_name.lower().endswith('veltlinerhof'):
billing_name = ' '.join(family_name.split(' ')[::-1]).title()
family_name = ' '.join(family_name.split(' ')[:-1]).title()
elif 'u.' in family_name:
billing_name = utils.remove_spaces(family_name.title().replace('U.', ' und '))
family_name = family_name.split(' ')[0].title()
else:
billing_name = family_name
family_name = family_name.split(' ')[-1].title()
if ' + ' in given_name:
parts = given_name.split(' + ')
family_name = family_name.title()
billing_name = (', '.join(parts).title()[::-1].replace(',', ' und'[::-1], 1)[::-1] +
f' {billing_name or family_name}')
given_name = parts[0].title()
else:
family_name = family_name.title()
given_name = given_name.title()
return prefix, given_name, middle_names, family_name, suffix, billing_name
def migrate_members(in_dir: str, out_dir: str) -> None: def migrate_members(in_dir: str, out_dir: str) -> None:
global MEMBER_MAP global MEMBER_MAP
MEMBER_MAP = {} MEMBER_MAP = {}
@ -621,22 +725,17 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
f_m.header( f_m.header(
'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic', 'lfbis_nr', 'ustid_nr', 'volllieferant', 'buchführend', 'funktionär', 'active', 'deceased',
'country', 'postal_dest', 'address', 'iban', 'bic', 'country', 'postal_dest', 'address',
'email', 'default_kgnr', 'comment') 'email', 'default_kgnr', 'comment')
f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
f_tel.header('mgnr', 'nr', 'type', 'number', 'comment') f_tel.header('mgnr', 'nr', 'type', 'number', 'comment')
for m in members: for m in members:
# TODO GWK handle * in member name
mgnr: int = m['MGNR'] mgnr: int = m['MGNR']
family_name: str = m['Nachname'] family_name: str = m['Nachname']
given_name: str = m['Vorname'] given_name: str = m['Vorname']
prefix: Optional[str] = None funktionaer, deceased = False, False
middle_names: Optional[str] = None
suffix: Optional[str] = None
billing_name: Optional[str] = None
funktionaer = False
if family_name is None and given_name is None: if family_name is None and given_name is None:
continue continue
@ -644,63 +743,25 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
given_name = given_name or '' given_name = given_name or ''
if WG == 'MATZEN' and given_name.startswith(' '): if WG == 'MATZEN' and given_name.startswith(' '):
funktionaer = True funktionaer = True
if WG == 'GWK' and ('*' in family_name or '*' in given_name or '(+)' in family_name or '(+)' in given_name):
deceased = True
family_name = family_name.replace('*', '').replace('(+)', '')
given_name = given_name.replace('*', '').replace('(+)', '')
family_name = re.sub(r'\s+', ' ', family_name).strip() family_name = utils.remove_spaces(family_name)
given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',') given_name = utils.remove_spaces(given_name).replace(', ', ',')
if ' ' in family_name or '.' in family_name or ',' in family_name:
if family_name.endswith(' KG'):
parts = family_name.split(' ')
family_name = parts[0].title()
billing_name = f'{family_name} KG'
ret = normalize_name(family_name, given_name)
prefix, given_name, middle_names, family_name, suffix, billing_name = ret
n1 = utils.remove_spaces(' '.join(r or '' for r in ret))
n2 = utils.remove_spaces((m['Vorname'] or '') + ' ' + (m['Nachname'] or ''))
if billing_name or n1.lower() != n2.lower():
convert_name(mgnr, (m['Nachname'], m['Vorname']), convert_name(mgnr, (m['Nachname'], m['Vorname']),
(prefix, given_name, middle_names, family_name, suffix), billing_name) (prefix, given_name, middle_names, family_name, suffix), billing_name)
elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA': if not given_name or not family_name:
if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower(): given_name = given_name or ''
parts = given_name.split(' ') family_name = family_name or ''
family_name = family_name.title() invalid(mgnr, 'Name', n1)
billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}'
given_name = parts[0].title()
elif given_name.lower().endswith(' gesbr'):
family_name = family_name.title()
given_name = given_name.split(' ')[0].title()
billing_name = f'{family_name} {given_name} GesbR'
elif given_name.endswith(' KeG.'):
family_name = family_name.title()
given_name = given_name.split(' ')[0].title()
billing_name = f'{family_name} {given_name} KEG'
elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \
given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'):
family_name = family_name.title()
parts = given_name.split(' ')
suffix = parts[-1].lower()
if suffix[-1] != '.':
suffix += '.'
given_name = parts[0].title()
elif ',' in given_name:
family_name = family_name.title()
parts = given_name.split(',')
given_name = parts[0].title()
prefix = ' '.join([p.title() for p in parts[1:]])
elif given_name.endswith(' DI'):
family_name = family_name.title()
given_name = given_name.split(' ')[0].title()
prefix = 'Dipl.-Ing.'
elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \
given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'):
family_name = family_name.title()
parts = given_name.split(' ')
given_name = parts[0].title()
prefix = parts[-1].title()
if prefix[-1] != '.':
prefix += '.'
convert_name(mgnr, (m['Nachname'], m['Vorname']),
(prefix, given_name, middle_names, family_name, suffix), billing_name)
else:
family_name = family_name.title()
given_name = given_name.title()
bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None
if bnr is not None: if bnr is not None:
@ -738,9 +799,6 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
blz: Optional[int] = m['BLZ'] blz: Optional[int] = m['BLZ']
kto_nr: Optional[str] = m['KontoNr'] kto_nr: Optional[str] = m['KontoNr']
if iban is None:
pass
if iban is not None: if iban is not None:
iban = iban.replace(' ', '') iban = iban.replace(' ', '')
if not check_iban(iban): if not check_iban(iban):
@ -784,11 +842,11 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
if address is not None: if address is not None:
address_old = address address_old = address
address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(), address = re.sub(r'([0-9]) ?([A-Z])\b', lambda a: a.group(1) + a.group(2).lower(),
re.sub(r'\s+', ' ', address).strip().title()) utils.remove_spaces(address).title())
if address.startswith('Haus Nr.') or \ if address.startswith('Haus Nr.') or \
address.startswith('Nr. ') or \ address.startswith('Nr. ') or \
address.startswith('Nr ') or \ address.startswith('Nr ') or \
address.isdigit(): address.isdigit() or (len(address) > 1 and address[:-1].isdigit()):
address = ort.title() + ' ' + address.split(' ')[-1] address = ort.title() + ' ' + address.split(' ')[-1]
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\ address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
.replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\ .replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\
@ -801,7 +859,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
address = address.replace('Ob. ', 'Obere ', 1) address = address.replace('Ob. ', 'Obere ', 1)
address = address.replace(' Nr. ', ' ') address = address.replace(' Nr. ', ' ')
address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address) address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address)
address = re.sub(r'\s+', ' ', address).strip() address = utils.remove_spaces(address)
if address_old != address: if address_old != address:
convert(mgnr, 'Adresse', address_old, address) convert(mgnr, 'Adresse', address_old, address)
@ -851,7 +909,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
mgnr, pred, prefix, given_name, middle_names, family_name, suffix, mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
m['BHKontonummer'], zwstid, bnr, ustid_nr, m['BHKontonummer'], zwstid, bnr, ustid_nr,
m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active, deceased,
iban, bic, AUSTRIA, postal_dest, address or '-', email, kgnr, m['Anmerkung'] iban, bic, AUSTRIA, postal_dest, address or '-', email, kgnr, m['Anmerkung']
) )
@ -871,7 +929,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
if phone_1: if phone_1:
phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\ phone_1 = phone_1.lower().replace('und', 'u.').replace('auch', 'u.').replace('u.', ' u. ')\
.replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ') .replace('oder', 'od.').replace(';', 'od.').replace('od.', ' od. ')
phone_1 = re.sub(r'\s+', ' ', phone_1).strip() phone_1 = utils.remove_spaces(phone_1)
fax = False fax = False
if phone_1.endswith(' u. fax'): if phone_1.endswith(' u. fax'):
fax = True fax = True

View File

@ -7,6 +7,7 @@ import re
import datetime import datetime
import csv import csv
RE_SPACES = re.compile(r'\s+')
RE_INT = re.compile(r'-?[0-9]+') RE_INT = re.compile(r'-?[0-9]+')
RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+') RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+')
RE_STR_START = re.compile(r'.*,"[^"]*$') RE_STR_START = re.compile(r'.*,"[^"]*$')
@ -17,6 +18,10 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]:
return re.match(pattern, value) is not None if value is not None else None return re.match(pattern, value) is not None if value is not None else None
def remove_spaces(s: str) -> str:
return RE_SPACES.sub(' ', s).strip()
def cast_value(value: str) -> Any: def cast_value(value: str) -> Any:
if value == '': if value == '':
return None return None