Migrate Flächenbindungen

This commit is contained in:
2023-02-24 15:34:55 +01:00
parent 5bf4585fb5
commit 25f5c3efcc
2 changed files with 167 additions and 8 deletions

View File

@ -7,16 +7,17 @@ import os
import re
import sys
import sqlite3
import requests
DB_CNX: Optional[sqlite3.Connection] = None
USTID_RE = re.compile('[A-Z]{2}[A-Z0-9]{2,12}')
BIC_RE = re.compile('[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
IBAN_RE = re.compile('[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
EMAIL_RE = re.compile('[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}')
CULTIDS = {1: 'N', 2: 'KIP', 3: 'BIO'}
STREET_NAMES = {
'Hans-Wagnerstraße': 'Hans-Wagner-Straße',
@ -56,6 +57,29 @@ STREET_NAMES = {
def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
def parse_line(l: str) -> Iterator[str]:
w = None
s = False
for ch in l:
if w is None:
if ch == ';':
yield ''
continue
elif ch in (' ', '\t'):
continue
w = ch
s = ch == '"'
continue
elif not s and ch in (';', '\n'):
yield w.strip()
w = None
continue
elif s and ch == '"':
s = False
w += ch
if w is not None:
yield w.strip()
with open(filename, 'r') as f:
header: Optional[Tuple[str]] = None
for line in f:
@ -63,9 +87,8 @@ def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
header = tuple([e.strip() for e in line.strip().split(';')])
continue
parts = [e.strip() for e in line.strip().split(';')]
obj = {}
for i, part in enumerate(parts):
for i, part in enumerate(parse_line(line)):
if part == '':
part = None
elif part[0] == '"' and part[-1] == '"':
@ -177,10 +200,32 @@ def normalize_phone_nr(nr: str) -> str:
return nr
def parse_branches(in_dir: str) -> Dict[str, Any]:
def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/')
if r.status_code != 200:
return None
data = r.json()
return sum([n['fl'] for n in data['properties']['nutzungen']])
def parse_branches(in_dir: str) -> Dict[str, Dict[str, Any]]:
return {b['ZNR']: b for b in parse_csv(f'{in_dir}/TZweigstellen.csv')}
def parse_gemeinden(in_dir: str) -> Dict[int, Dict[str, Any]]:
return {g['GNR']: g for g in parse_csv(f'{in_dir}/TGemeinden.csv')}
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
fbs = parse_csv(f'{in_dir}/TFlaechenbindungen.csv')
members = {}
for f in fbs:
if f['MGNR'] not in members:
members[f['MGNR']] = {}
members[f['MGNR']][f['FBNR']] = f
return members
def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]:
if plz is None or ort is None:
return None
@ -227,10 +272,31 @@ def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
return None
def lookup_kgnr_name(name: str) -> Optional[int]:
cur = DB_CNX.cursor()
cur.execute("SELECT k.kgnr FROM AT_kg k JOIN AT_gem g ON g.gkz = k.gkz JOIN wb_gem wg ON wg.gkz = g.gkz "
"WHERE LOWER(k.name) = LOWER(?) AND wg.hkid = 'WLWV'",
(name.replace('Gr.', 'Groß ').replace('-', '').replace(' ', ''),))
rows: List[Tuple[int]] = cur.fetchall()
cur.close()
if len(rows) == 1:
return rows[0][0]
if name == 'Velm-Götzendorf':
return None
print(name, rows)
raise RuntimeError()
def migrate_members(in_dir: str, out_dir: str) -> None:
members = parse_csv(f'{in_dir}/TMitglieder.csv')
branches = parse_branches(in_dir)
with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba:
gemeinden = parse_gemeinden(in_dir)
fbs = parse_flaechenbindungen(in_dir)
with open(f'{out_dir}/member.csv', 'w+') as f_m,open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba:
f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;'
'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;'
'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;'
@ -238,6 +304,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
'email;phone_landline;phone_mobile_1;phone_mobile_2;'
'default_kgnr;comment\n')
f_mba.write('mgr;name;country;postal_dest;address\n')
for m in members:
mgnr: int = m['MGNR']
family_name: str = m['Nachname']
@ -421,10 +488,15 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
zwstid = m['ZNR'] and branches[m['ZNR']]['Kennbst'] or len(branches) == 1 and list(branches.values())[0]['Kennbst']
postal_dest = lookup_plz(int(m['PLZ']) if m['PLZ'] else None, m['Ort'], address)
#if mgnr in fbs:
# gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020}
# if len(gems) == 1:
# print(gemeinden[list(gems)[0]])
okz = postal_dest % 100000 if postal_dest else None
kgnr = lookup_kgnr(okz)
if kgnr is None:
invalid(mgnr, 'KgNr', ort)
invalid(mgnr, 'KgNr.', ort)
f_m.write(format_row(
mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix,
@ -438,6 +510,92 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
if billing_name:
f_mba.write(format_row(mgnr, billing_name, 'AT', None, None))
def migrate_contracts(in_dir: str, out_dir: str) -> None:
gemeinden = parse_gemeinden(in_dir)
def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]:
if nr_str is None:
return []
elif nr_str.isdigit() and len(nr_str) <= 6:
return [nr_str]
elif nr_str.count('/') == 1:
parts = nr_str.split('/')
if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit():
return [parts[0], parts[1]]
elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3:
return [nr_str]
if nr_str.count('/') > 1:
parts = nr_str.split('/')
if all([p.isdigit() for p in parts]):
if all([len(p) <= 3 for p in parts[1:]]):
return [f'{parts[0]}/{p}' for p in parts[1:]]
elif all([len(p) == len(parts[0]) for p in parts]):
return parts
if nr_str.startswith(f'{kgnr:05}'):
return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr)
if nr_str.endswith(' 2000'):
return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr)
parts = re.split(r' *[,;+&] *', nr_str)
if len(parts) == 1:
parts = nr_str.split(' / ')
if len(parts) == 1 and ' ' not in nr_str:
parts = nr_str.split(' ')
if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str:
parts = nr_str.split(' ')
if len(parts) > 1:
return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)]
m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str)
if m is not None:
b = m.group(1)
f = int(m.group(2))
t = int(m.group(3))
if t < f:
t += f - (f % pow(10, len(m.group(3))))
if t - f < 50:
return [
gst
for i in range(f, t + 1)
for p in [f'{b or ""}{i}']
for gst in parse_gstnrs(p, kgnr, mgnr)
]
invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}')
return [nr_str]
with open(f'{out_dir}/contract.csv', 'w+') as f_c, open(f'{out_dir}/area_commitment.csv', 'w+') as f_fb:
f_c.write('vnr;mgnr;year_from;year_to\n')
f_fb.write('vnr;kgnr;gstnr;rdnr;area;sortid;attrid;cultid\n')
for fb in parse_csv(f'{in_dir}/TFlaechenbindungen.csv'):
if fb['Von'] is None and fb['Bis'] is None:
continue
parz: str = fb['Parzellennummer']
vnr: int = fb['FBNR']
gem = gemeinden[fb['GNR']]
kgnr = lookup_kgnr_name(gem['Bezeichnung'])
if kgnr is None:
# Götzendorf
kgnr = 6007
f_c.write(format_row(vnr, fb['MGNR'], fb['Von'], fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None))
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
area = int(fb['Flaeche'])
gst_area = int(area / (len(gstnrs) or 1))
if parz is None or parz == '0000':
invalid(fb['MGNR'], 'GstNr.', f'{kgnr or 0:05}-{parz}')
elif len(gstnrs) > 1 or (len(gstnrs) == 1 and gstnrs[0] != parz):
convert(fb['MGNR'], 'GstNr.', f'{kgnr or 0:05}-{parz or ""}', ', '.join(gstnrs))
for i, gstnr in enumerate(gstnrs or ['0000']):
a = area - gst_area * (len(gstnrs) - 1) if i == 0 else gst_area
# TODO reed nr
f_fb.write(format_row(vnr, kgnr, gstnr, None, a, fb['SNR'], fb['SANR'], CULTIDS[fb['BANR']]))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('in_dir')
@ -451,5 +609,6 @@ if __name__ == '__main__':
DB_CNX = sqlite3.connect(args.database)
migrate_members(args.in_dir, args.out_dir)
migrate_contracts(args.in_dir, args.out_dir)
DB_CNX.close()