Small fixes in migrate.py and do not generate IBANs automatically (ISO 13616)
This commit is contained in:
@ -12,10 +12,10 @@ import requests
|
|||||||
|
|
||||||
DB_CNX: Optional[sqlite3.Connection] = None
|
DB_CNX: Optional[sqlite3.Connection] = None
|
||||||
|
|
||||||
USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}')
|
USTID_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}')
|
||||||
BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
|
BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
|
||||||
IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
|
IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
|
||||||
EMAIL_RE = re.compile('[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}')
|
EMAIL_RE = re.compile(r'[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}')
|
||||||
|
|
||||||
CULTIVATION_MAP: Optional[Dict[int, str]] = {1: 'N', 2: 'KIP', 3: 'BIO'}
|
CULTIVATION_MAP: Optional[Dict[int, str]] = {1: 'N', 2: 'KIP', 3: 'BIO'}
|
||||||
|
|
||||||
@ -61,10 +61,10 @@ STREET_NAMES: Dict[str, str] = {
|
|||||||
|
|
||||||
|
|
||||||
def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
|
def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
|
||||||
def parse_line(l: str) -> Iterator[str]:
|
def parse_line(line_str: str) -> Iterator[str]:
|
||||||
w = None
|
w = None
|
||||||
s = False
|
s = False
|
||||||
for ch in l:
|
for ch in line_str:
|
||||||
if w is None:
|
if w is None:
|
||||||
if ch == ';':
|
if ch == ';':
|
||||||
yield ''
|
yield ''
|
||||||
@ -103,7 +103,7 @@ def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
|
|||||||
part = False
|
part = False
|
||||||
elif part.isdigit():
|
elif part.isdigit():
|
||||||
part = int(part)
|
part = int(part)
|
||||||
elif re.match('[0-9]+\.[0-9]+', part):
|
elif re.match(r'[0-9]+\.[0-9]+', part):
|
||||||
part = float(part)
|
part = float(part)
|
||||||
elif len(part) == 10 and part[4] == '-' and part[7] == '-':
|
elif len(part) == 10 and part[4] == '-' and part[7] == '-':
|
||||||
part = datetime.datetime.strptime(part, '%Y-%m-%d').date()
|
part = datetime.datetime.strptime(part, '%Y-%m-%d').date()
|
||||||
@ -113,17 +113,17 @@ def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
|
|||||||
yield obj
|
yield obj
|
||||||
|
|
||||||
|
|
||||||
def format_row(*args) -> str:
|
def format_row(*values) -> str:
|
||||||
row = ''
|
row = ''
|
||||||
for arg in args:
|
for val in values:
|
||||||
if arg is None:
|
if val is None:
|
||||||
pass
|
pass
|
||||||
elif type(arg) == str:
|
elif type(val) == str:
|
||||||
row += f'"{arg}"'
|
row += f'"{val}"'
|
||||||
elif type(arg) == bool:
|
elif type(val) == bool:
|
||||||
row += 'T' if arg else 'F'
|
row += 'T' if val else 'F'
|
||||||
else:
|
else:
|
||||||
row += str(arg)
|
row += str(val)
|
||||||
row += ';'
|
row += ';'
|
||||||
return f'{row[:-1]}\n'
|
return f'{row[:-1]}\n'
|
||||||
|
|
||||||
@ -146,7 +146,8 @@ def convert(mgnr: int, key: str, old_value: str, new_value: str) -> None:
|
|||||||
print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
|
print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None:
|
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str],
|
||||||
|
billing: Optional[str] = None) -> None:
|
||||||
if not args.quiet:
|
if not args.quiet:
|
||||||
print(f'\x1B[1m{mgnr:>6}: '
|
print(f'\x1B[1m{mgnr:>6}: '
|
||||||
f'{" / ".join([e or "" for e in old_name])} -> '
|
f'{" / ".join([e or "" for e in old_name])} -> '
|
||||||
@ -164,6 +165,7 @@ def check_lfbis_nr(nr: str) -> bool:
|
|||||||
v = (11 - (s % 11)) % 10
|
v = (11 - (s % 11)) % 10
|
||||||
return v == int(nr[-1])
|
return v == int(nr[-1])
|
||||||
|
|
||||||
|
|
||||||
def check_ustid_at(nr: str) -> bool:
|
def check_ustid_at(nr: str) -> bool:
|
||||||
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
|
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
|
||||||
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
|
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
|
||||||
@ -175,26 +177,18 @@ def check_ustid_at(nr: str) -> bool:
|
|||||||
return v == int(nr[-1])
|
return v == int(nr[-1])
|
||||||
|
|
||||||
|
|
||||||
def iban_checksum(iban: str) -> int:
|
def modulo(a: str, b: int) -> int:
|
||||||
if not IBAN_RE.fullmatch(iban):
|
s = 0
|
||||||
raise RuntimeError()
|
for ch in a:
|
||||||
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
|
s = (s * 10 + int(ch)) % b
|
||||||
v = 98 - (int(s) % 97)
|
return s
|
||||||
return v
|
|
||||||
|
|
||||||
|
|
||||||
def check_iban(iban: str) -> bool:
|
def check_iban(iban: str) -> bool:
|
||||||
if not IBAN_RE.fullmatch(iban):
|
if not IBAN_RE.fullmatch(iban):
|
||||||
return False
|
return False
|
||||||
return iban_checksum(iban) == 97
|
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
|
||||||
|
return modulo(s, 97) == 1
|
||||||
|
|
||||||
def generate_iban_at(blz: int, ktonr: str) -> str:
|
|
||||||
if blz > 99999 or len(ktonr) > 11:
|
|
||||||
raise RuntimeError()
|
|
||||||
iban = f'AT00{blz:>05}{ktonr:>011}'
|
|
||||||
s = iban_checksum(iban)
|
|
||||||
return iban.replace('00', f'{s:02}', 1)
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_phone_nr(nr: str) -> str:
|
def normalize_phone_nr(nr: str) -> str:
|
||||||
@ -277,7 +271,10 @@ def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
|
|||||||
name = 'Grub an der March'
|
name = 'Grub an der March'
|
||||||
|
|
||||||
cur = DB_CNX.cursor()
|
cur = DB_CNX.cursor()
|
||||||
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name FROM AT_kg k JOIN AT_gem g ON g.gkz = k.gkz JOIN wb_gem wg ON wg.gkz = g.gkz "
|
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name "
|
||||||
|
"FROM AT_kg k "
|
||||||
|
"JOIN AT_gem g ON g.gkz = k.gkz "
|
||||||
|
"JOIN wb_gem wg ON wg.gkz = g.gkz "
|
||||||
"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid = 'WLWV'",
|
"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid = 'WLWV'",
|
||||||
(name.replace('Gr.', 'Groß ').replace('Groß ', 'Groß').replace('-', ''),))
|
(name.replace('Gr.', 'Groß ').replace('Groß ', 'Groß').replace('-', ''),))
|
||||||
rows: List[Tuple[int, str, int, str]] = cur.fetchall()
|
rows: List[Tuple[int, str, int, str]] = cur.fetchall()
|
||||||
@ -322,7 +319,7 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None:
|
|||||||
if name.isupper():
|
if name.isupper():
|
||||||
name = name.title()
|
name = name.title()
|
||||||
|
|
||||||
gem = GEM_MAP[r['GNR']]
|
gem = GEM_MAP[r['GNR']]
|
||||||
kgnr = gem[0][0]
|
kgnr = gem[0][0]
|
||||||
if len(gem) != 1:
|
if len(gem) != 1:
|
||||||
print(gem, name, '->', gem[0])
|
print(gem, name, '->', gem[0])
|
||||||
@ -336,7 +333,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
members = parse_csv(f'{in_dir}/TMitglieder.csv')
|
members = parse_csv(f'{in_dir}/TMitglieder.csv')
|
||||||
fbs = parse_flaechenbindungen(in_dir)
|
fbs = parse_flaechenbindungen(in_dir)
|
||||||
|
|
||||||
with open(f'{out_dir}/member.csv', 'w+') as f_m,open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba:
|
with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba:
|
||||||
f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;'
|
f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;'
|
||||||
'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;'
|
'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;'
|
||||||
'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;'
|
'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;'
|
||||||
@ -358,8 +355,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
given_name = given_name or ''
|
given_name = given_name or ''
|
||||||
family_name = re.sub('\s+', ' ', family_name).strip()
|
family_name = re.sub(r'\s+', ' ', family_name).strip()
|
||||||
given_name = re.sub('\s+', ' ', given_name).strip().replace(', ', ',')
|
given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',')
|
||||||
|
|
||||||
if ' ' in family_name or '.' in family_name or ',' in family_name:
|
if ' ' in family_name or '.' in family_name or ',' in family_name:
|
||||||
if family_name.endswith(' KG'):
|
if family_name.endswith(' KG'):
|
||||||
@ -367,9 +364,10 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
family_name = parts[0].title()
|
family_name = parts[0].title()
|
||||||
billing_name = f'{family_name} KG'
|
billing_name = f'{family_name} KG'
|
||||||
|
|
||||||
convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name)
|
convert_name(mgnr, (m['Nachname'], m['Vorname']),
|
||||||
|
(prefix, given_name, middle_names, family_name, suffix), billing_name)
|
||||||
elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA':
|
elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA':
|
||||||
if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower():
|
if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower():
|
||||||
parts = given_name.split(' ')
|
parts = given_name.split(' ')
|
||||||
family_name = family_name.title()
|
family_name = family_name.title()
|
||||||
billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}'
|
billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}'
|
||||||
@ -382,7 +380,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
family_name = family_name.title()
|
family_name = family_name.title()
|
||||||
given_name = given_name.split(' ')[0].title()
|
given_name = given_name.split(' ')[0].title()
|
||||||
billing_name = f'{family_name} {given_name} KEG'
|
billing_name = f'{family_name} {given_name} KEG'
|
||||||
elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'):
|
elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \
|
||||||
|
given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'):
|
||||||
family_name = family_name.title()
|
family_name = family_name.title()
|
||||||
parts = given_name.split(' ')
|
parts = given_name.split(' ')
|
||||||
suffix = parts[-1].lower()
|
suffix = parts[-1].lower()
|
||||||
@ -398,7 +397,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
family_name = family_name.title()
|
family_name = family_name.title()
|
||||||
given_name = given_name.split(' ')[0].title()
|
given_name = given_name.split(' ')[0].title()
|
||||||
prefix = 'Dipl.-Ing.'
|
prefix = 'Dipl.-Ing.'
|
||||||
elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'):
|
elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \
|
||||||
|
given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'):
|
||||||
family_name = family_name.title()
|
family_name = family_name.title()
|
||||||
parts = given_name.split(' ')
|
parts = given_name.split(' ')
|
||||||
given_name = parts[0].title()
|
given_name = parts[0].title()
|
||||||
@ -406,7 +406,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
if prefix[-1] != '.':
|
if prefix[-1] != '.':
|
||||||
prefix += '.'
|
prefix += '.'
|
||||||
|
|
||||||
convert_name(mgnr, (m['Nachname'], m['Vorname']), (prefix, given_name, middle_names, family_name, suffix), billing_name)
|
convert_name(mgnr, (m['Nachname'], m['Vorname']),
|
||||||
|
(prefix, given_name, middle_names, family_name, suffix), billing_name)
|
||||||
else:
|
else:
|
||||||
family_name = family_name.title()
|
family_name = family_name.title()
|
||||||
given_name = given_name.title()
|
given_name = given_name.title()
|
||||||
@ -425,7 +426,6 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
invalid(mgnr, 'BetriebsNr.', bnr)
|
invalid(mgnr, 'BetriebsNr.', bnr)
|
||||||
bnr = None
|
bnr = None
|
||||||
|
|
||||||
|
|
||||||
ustid: Optional[str] = m['UID']
|
ustid: Optional[str] = m['UID']
|
||||||
if ustid is not None:
|
if ustid is not None:
|
||||||
ustid = ustid.replace(' ', '')
|
ustid = ustid.replace(' ', '')
|
||||||
@ -444,7 +444,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
iban: Optional[str] = m['IBAN']
|
iban: Optional[str] = m['IBAN']
|
||||||
bic: Optional[str] = m['BIC']
|
bic: Optional[str] = m['BIC']
|
||||||
blz: Optional[int] = m['BLZ']
|
blz: Optional[int] = m['BLZ']
|
||||||
ktonr: Optional[str] = m['KontoNr']
|
kto_nr: Optional[str] = m['KontoNr']
|
||||||
|
|
||||||
if iban is None:
|
if iban is None:
|
||||||
pass
|
pass
|
||||||
@ -455,10 +455,6 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
invalid(mgnr, 'IBAN', iban)
|
invalid(mgnr, 'IBAN', iban)
|
||||||
iban = None
|
iban = None
|
||||||
|
|
||||||
if iban is None and blz and ktonr:
|
|
||||||
iban = generate_iban_at(blz, re.sub('[. -]', '', ktonr))
|
|
||||||
success(mgnr, 'IBAN', f'{iban} ({blz}, {ktonr})')
|
|
||||||
|
|
||||||
if bic is not None:
|
if bic is not None:
|
||||||
bic = bic.upper()
|
bic = bic.upper()
|
||||||
if bic == 'RLNWATAUE':
|
if bic == 'RLNWATAUE':
|
||||||
@ -474,7 +470,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
address: Optional[str] = m['Straße']
|
address: Optional[str] = m['Straße']
|
||||||
if address is not None:
|
if address is not None:
|
||||||
address_old = address
|
address_old = address
|
||||||
address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(), re.sub('\s+', ' ', address).strip().title())
|
address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(),
|
||||||
|
re.sub(r'\s+', ' ', address).strip().title())
|
||||||
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
|
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
|
||||||
.replace('Strasse', 'Straße').replace('Str.', 'Straße')\
|
.replace('Strasse', 'Straße').replace('Str.', 'Straße')\
|
||||||
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
|
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
|
||||||
@ -487,7 +484,8 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
|
|||||||
elif address.startswith('Ob. '):
|
elif address.startswith('Ob. '):
|
||||||
address = address.replace('Ob. ', 'Obere ', 1)
|
address = address.replace('Ob. ', 'Obere ', 1)
|
||||||
address = address.replace(' Nr. ', ' ')
|
address = address.replace(' Nr. ', ' ')
|
||||||
address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address)
|
address = re.sub(r'([^0-9]+?)( [0-9])',
|
||||||
|
lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address)
|
||||||
if address_old != address:
|
if address_old != address:
|
||||||
convert(mgnr, 'Adresse', address_old, address)
|
convert(mgnr, 'Adresse', address_old, address)
|
||||||
|
|
||||||
@ -594,8 +592,8 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None:
|
|||||||
if t - f < 50:
|
if t - f < 50:
|
||||||
return [
|
return [
|
||||||
gst
|
gst
|
||||||
for i in range(f, t + 1)
|
for counter in range(f, t + 1)
|
||||||
for p in [f'{b or ""}{i}']
|
for p in [f'{b or ""}{counter}']
|
||||||
for gst in parse_gstnrs(p, kgnr, mgnr)
|
for gst in parse_gstnrs(p, kgnr, mgnr)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user