Files
elwig-misc/data/plz.py

376 lines
13 KiB
Python
Executable File

#!/bin/env python3
# -*- coding: utf-8 -*-
from typing import List, Tuple, Callable
import argparse
import requests
import re
import openpyxl
import tempfile
import os
import zipfile
import PyPDF2
PLZ_URL = 'https://www.post.at/g/c/postlexikon'
PLZ_BUTTON = re.compile(r'title="PLZ Verzeichnis" href="(.*?)"')
PLZ_DEST_BUTTON = re.compile(r'title="PLZ Bestimmungsort" href="(.*?)"')
KGV_URL = 'https://www.bev.gv.at/Services/Downloads/Produktbezogene-Downloads/Unentgeltliche-Produkte/Kataster-Verzeichnisse/Katastralgemeindenverzeichnis.html'
KGV_LINK = re.compile(r'<a\s+href="(.*?)".*?>Katastral')
OV_URL = 'https://statistik.gv.at/fileadmin/publications/Ortsverzeichnis_2001__{}.pdf'
OV_NAMES = ['Burgenland', 'Niederoesterreich', 'Wien']
GEM = re.compile(r'^([1-9][0-9]{2} [0-9]{2}) [X0-9]+')
ORT = re.compile(r'^([0-9]{5}) (([A-Za-zÄÖÜäöü(][A-Za-z0-9äöüÄÖÜßẞ():,.-]* ?|[0-9]+\..*?)+)'
r'(( [()X0-9.-]+)*)?([A-Za-zÄÖÜäöüßẞ ]+([0-9]*))?$')
STRIP_NUM = re.compile(r'[X0-9. -]+$')
STRIP_INV = re.compile(r' [A-Za-z]+[0-9]+$')
CODES = re.compile(r'Agh|Alm|Alpe|B|Bd|Bwg|Burg|Camp|D|E|Fbk|Fhei|Gh|Hgr|Hot|Indz|Jh|Jhtt|Ki|Kl|Krwk|Ks|M|Mh|'
r'Mü|R|Ru|Sa|Sä|Sb|Schh|Schih|Schl|Sdlg|Sgr|St|Stbr|Stt|V|W|We|Ek|Z|Zgl|ZH')
STRIP_CODE = re.compile(r' *(' + CODES.pattern + r')[()X0-9. -]*$')
STRIP_CODE_ALL = re.compile(r' +\b(' + CODES.pattern + r')\b.*$')
PlzRow = Tuple[int, str, int, str, bool, bool, bool]
PlzDestRow = Tuple[int, str, int, str, int, str]
KgvRow = Tuple[int, str, int, str]
OvRow = Tuple[int, int, str]
PLZ_BUNDESLAND = {
'B': 1,
'K': 2,
'N': 3,
'O': 4,
'Sa': 5,
'St': 6,
'T': 7,
'V': 8,
'W': 9,
}
WIEN_UMGEBUNG = {
32401: 30729,
32402: 30730,
32403: 31949,
32404: 31235,
32405: 30731,
32406: 30732,
32407: 30733,
32408: 32144,
32409: 30734,
32410: 30735,
32411: 30736,
32412: 31950,
32413: 30737,
32415: 31951,
32416: 31952,
32417: 30738,
32418: 30739,
32419: 30740,
32421: 31953,
32423: 31954,
32424: 30741,
}
GKZ = {}
def get_plz_url(button: re.Pattern) -> str:
r = requests.get(PLZ_URL, headers={'User-Agent': 'Mozilla/5.0'})
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
matches = button.findall(r.text)
if len(matches) == 0:
raise RuntimeError('Unable to find url of file')
return matches[0]
def download_excel(url: str, transform: Callable[[List[str]], Tuple]) -> List:
f_name = None
try:
f = tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False)
with requests.get(url, stream=True, headers={'User-Agent': 'Mozilla/5.0'}) as r:
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
f_name = f.name
f.close()
rows = []
wb = openpyxl.open(f_name)
sheet = wb.worksheets[0]
first = True
for row in sheet.rows:
if first:
first = False
continue
rows.append(transform([r.value for r in row]))
return rows
finally:
if f_name is not None:
os.remove(f_name)
def download_plz() -> List[PlzRow]:
return download_excel(get_plz_url(PLZ_BUTTON),
lambda r: (int(r[0]), r[1], PLZ_BUNDESLAND[r[2]], r[5],
r[6] == 'intern', r[7] == 'Ja', r[8] == 'Ja'))
def download_plz_dest() -> List[PlzDestRow]:
return download_excel(get_plz_url(PLZ_DEST_BUTTON),
lambda r: (int(r[0]), r[1], int(r[2]), r[3], int(r[4]), r[5]))
def get_kvg_zip_url() -> str:
r = requests.get(KGV_URL)
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
matches = KGV_LINK.findall(r.text)
if len(matches) == 0:
raise RuntimeError('Unable to find url of zip file')
return matches[0]
def download_kgv() -> List[KgvRow]:
with tempfile.NamedTemporaryFile() as f:
with requests.get(get_kvg_zip_url(), stream=True) as r:
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
rows = []
with zipfile.ZipFile(f, 'r') as zip_file:
files = [name for name in zip_file.namelist() if name.endswith('.csv')]
if len(files) == 0:
raise RuntimeError('Unable to find csv file in zip')
with zip_file.open(files[0], 'r') as csv:
first = True
for r_line in csv:
if first:
first = False
continue
line = r_line.decode('utf-8').rstrip()
row = [c[1:-1] if c[0] == '"' else int(c) for c in line.split(';')]
rows.append((int(row[0]), str(row[1]), int(row[3]), str(row[4])))
return rows
def download_ov_land(bundesland: str) -> List[OvRow]:
rows = []
with tempfile.NamedTemporaryFile() as f:
r = requests.get(OV_URL.format(bundesland))
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
pdf = PyPDF2.PdfReader(f)
valid = False
for page in pdf.pages:
page_num = pdf.get_page_number(page)
text = page.extract_text()
if len(text) < 100:
if text.strip().replace(' ', '') == 'Ortsverzeichnis':
valid = True
continue
elif valid and text.strip().replace(' ', '') == 'ALPHABETISCHEVERZEICHNISSE':
break
if not valid:
continue
with open(f'out/{bundesland}/{page_num + 1:03}.txt', 'w+', encoding='utf-8') as o:
o.write(text)
return rows
def download_ov() -> None:
try:
os.mkdir('out')
for name in OV_NAMES:
os.mkdir(f'out/{name}')
download_ov_land(name)
except FileExistsError:
print('Using cache')
return
def parse_ov() -> List[OvRow]:
rows = []
for bundesland in sorted(os.listdir('out')):
gkz = None
last = None
for page_name in sorted(os.listdir(f'out/{bundesland}')):
with open(f'out/{bundesland}/{page_name}', 'r', encoding='utf-8') as f:
cont = False
for line in f:
line = line.rstrip()
m1 = ORT.match(line)
m2 = GEM.match(line)
if last is not None:
if line == 'Gemeindename':
cont = True
break
elif m1 is None:
if ',' not in line and ':' not in line:
last = f'{last} {line}'
m3 = ORT.match(last)
okz = int(m3.group(1))
name = STRIP_CODE_ALL.sub('', m3.group(2))
rows.append((gkz, okz, name))
last = None
continue
else:
m3 = ORT.match(last)
okz = int(m3.group(1))
name = STRIP_CODE.sub('', m3.group(2))
rows.append((gkz, okz, name))
last = None
if 'Katastralgemeinden:' in line:
p1 = line.find('Katastralgemeinden:')
p2 = line.find('Postleitzahl')
val = [' '.join(a.split(' ')[:-2]) for a in line[p1 + 20:p2].split(', ')]
GKZ[gkz] = val
continue
if m1:
val = STRIP_NUM.sub('', m1.group(0))
if m1.group(4) == '' or (m1.group(6) is not None and m1.group(6) != ''):
last = m1.group(1) + ' ' + m1.group(2)
else:
okz = int(m1.group(1))
name = STRIP_CODE.sub('', m1.group(2))
if len(name) == len(m1.group(2)):
last = val
else:
rows.append((gkz, okz, name))
elif m2:
if len(line.split(' ')) <= 9:
continue
gkz = int(m2.group(1).replace(' ', ''))
if gkz > 90000:
# Gemeinde Wien
gkz = 90001
elif gkz >= 32400 and gkz <= 32499:
# ehem. Bezirk Wien Umgebung
gkz = WIEN_UMGEBUNG[gkz]
if cont:
continue
return rows
def write_sql(plz_rows: List[PlzRow], plz_dest_rows: List[PlzDestRow], kgv_rows: List[KgvRow], ov_rows: List[OvRow]) -> None:
kgv = {kgnr: (kg_name, gkz, gem_name) for kgnr, kg_name, gkz, gem_name in kgv_rows}
ov = {okz: (o_name, gkz) for plz, _, okz, o_name, gkz, _ in plz_dest_rows}
ov.update({okz: (name, gkz) for gkz, okz, name in ov_rows})
gemeinden = {gkz: (gem_name, [], []) for kgnr, kg_name, gkz, gem_name in kgv_rows}
pr = set()
with open('90.plz.sql', 'wb') as f:
f.write(b"\nINSERT INTO AT_gem VALUES\n")
for gkz, (name, _, _) in sorted(gemeinden.items(), key=lambda i: i[0]):
f.write(f"({gkz:5}, '{name}'),\n".encode('utf-8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_kg VALUES\n")
for kgnr, name, gkz, _ in sorted(kgv_rows, key=lambda i: i[0]):
gemeinden[gkz][1].append(kgnr)
f.write(f"({kgnr:5}, {gkz:5}, '{name}'),\n".encode('utf-8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_ort VALUES\n")
for okz, (name, gkz) in sorted(ov.items(), key=lambda i: i[0]):
kgnr_o = None
if name.startswith('Wien '):
name = name.replace('Wien ', 'Wien, ').replace('.', '. ')
elif 'Bez.' in name:
name = name.replace(',', ', ', 1).replace('.Bez.:', '. Bezirk: ').replace('0', '')
name = STRIP_INV.sub('', name)
if gkz not in gemeinden:
print(okz, name, gkz)
pr.add(gkz)
continue
if len(gemeinden[gkz][1]) == 1:
kgnr_o = gemeinden[gkz][1][0]
else:
for kgnr in gemeinden[gkz][1]:
n11 = name.lower().replace('-', '').replace('th', 't')
n12 = name.lower().replace('-', ' ').replace('th', 't')
n21 = kgv[kgnr][0].lower().replace('-', '').replace('th', 't')
n22 = kgv[kgnr][0].lower().replace('-', ' ').replace('th', 't')
if n11 in n21 or n11 in n22 or n12 in n21 or n12 in n22 or n21 in n11 or n21 in n12 or n22 in n11 or n22 in n12:
kgnr_o = kgnr
f.write(f"({okz:5}, {gkz:5}, {kgnr_o if kgnr_o is not None else 'NULL':>5}, '{name}'),\n".encode('utf-8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_plz VALUES\n")
for plz, ort, blnr, plz_type, internal, addr, po_box in sorted(plz_rows, key=lambda i: i[0]):
f.write(f"({plz:4}, '{ort}', {blnr}, '{plz_type}', {internal and 'TRUE' or 'FALSE'}, "
f"{addr and 'TRUE' or 'FALSE'}, {po_box and 'TRUE' or 'FALSE'}),\n".encode('utf-8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_plz_dest VALUES\n")
for plz, dest, okz, _, _, _ in sorted(plz_dest_rows, key=lambda i: (i[0], i[2])):
f.write(f"({plz:4}, {okz:5}, '{dest}'),\n".encode('utf-8'))
f.seek(-2, 1)
f.write(b';\n')
p = set()
for e in pr:
possible = filter(lambda a: a[2] // 10000 == e // 10000, kgv_rows.copy())
if e in GKZ:
for name in GKZ[e]:
if name == '':
continue
possible = filter(lambda a: a[1] == name, possible)
possible = list(possible)
if len(possible) == 1:
print(f' {e}: {possible[0][2]},')
p.add(e)
u = list(pr - p)
u.sort()
print(u)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
args = parser.parse_args()
print('Downloading Ortsverzeichnis from statistik.gv.at')
download_ov()
print('Parsing Ortsverzeichnis')
ov_data = parse_ov()
print('Downloading PLZ data from www.post.at')
plz_data = download_plz()
print('Downloading PLZ destination data from www.post.at')
plz_dest_data = download_plz_dest()
print('Downloading Katastralgemeindenverzeichnis from www.bev.gv.at')
kgv_data = download_kgv()
print('Generating 90.plz.sql')
write_sql(plz_data, plz_dest_data, kgv_data, ov_data)
print('Successfully created 90.plz.sql!')