#!/bin/env python3 from typing import List, Tuple import argparse import requests import re import xlrd import tempfile import os import zipfile import PyPDF2 PLZ_URL = 'https://www.post.at/g/c/postlexikon' PLZ_BUTTON = re.compile(r'title="PLZ Bestimmungsort" href="(.*?)"') KGV_URL = 'https://www.bev.gv.at/portal/page?_pageid=713,2601283&_dad=portal&_schema=PORTAL' KGV_LINK = re.compile(r'') OV_URL = 'https://statistik.gv.at/fileadmin/publications/Ortsverzeichnis_2001__{}.pdf' OV_NAMES = ['Burgenland', 'Niederoesterreich', 'Wien'] GEM = re.compile(r'^([1-9][0-9]{2} [0-9]{2}) [X0-9]+') ORT = re.compile(r'^([0-9]{5}) (([A-Za-zÄÖÜäöü(][A-Za-z0-9äöüÄÖÜßẞ():,.-]* ?|[0-9]+\..*?)+) ?' r'([()X0-9.]+ ?|$)') STRIP_NUM = re.compile(r'[X0-9. -]+$') STRIP_CODE = re.compile(r' *(Agh|Alm|Alpe|B|Bd|Bwg|Burg|Camp|D|E|Fbk|Fhei|Gh|Hgr|Hot|Indz|Jh|Jhtt|Ki|Kl|Krwk|Ks|M|Mh|' r'Mü|R|Ru|Sa|Sä|Sb|Schh|Schih|Schl|Sdlg|Sgr|St|Stbr|Stt|V|W|We|Ek|Z|Zgl|ZH) *(\([0-9]+\) *)?$') PlzRow = Tuple[int, str, int, str, int, str] KgvRow = Tuple[int, str, int, str] OvRow = Tuple[int, int, str] WIEN_UMGEBUNG = { 32401: 30729, 32402: 30730, 32403: 31949, 32404: 31235, 32405: 30731, 32406: 30732, 32407: 30733, 32408: 32144, 32409: 30734, 32410: 30735, 32411: 30736, 32412: 31950, 32413: 30737, 32415: 31951, 32416: 31952, 32417: 30738, 32418: 30739, 32419: 30740, 32421: 31953, 32423: 31954, 32424: 30741, } GKZ = {} def get_plz_url() -> str: r = requests.get(PLZ_URL, headers={'User-Agent': 'Mozilla/5.0'}) if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') matches = PLZ_BUTTON.findall(r.text) if len(matches) == 0: raise RuntimeError('Unable to find url of file') return matches[0] def download_plz() -> List[PlzRow]: f_name = None try: f = tempfile.NamedTemporaryFile(delete=False) with requests.get(get_plz_url(), stream=True) as r: if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') for chunk in r.iter_content(chunk_size=8192): f.write(chunk) f_name = f.name f.close() rows = [] wb = xlrd.open_workbook(f_name) sheet = wb.sheet_by_index(0) for r in range(1, sheet.nrows): row = sheet.row_values(r) rows.append((int(row[0]), row[1], int(row[2]), row[3], int(row[4]), row[5])) return rows finally: if f_name is not None: os.remove(f_name) def get_kvg_zip_url() -> str: r = requests.get(KGV_URL) if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') matches = KGV_LINK.findall(r.text) if len(matches) == 0: raise RuntimeError('Unable to find url of zip file') return matches[0] def download_kgv() -> List[KgvRow]: with tempfile.NamedTemporaryFile() as f: with requests.get(get_kvg_zip_url(), stream=True) as r: if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') for chunk in r.iter_content(chunk_size=8192): f.write(chunk) rows = [] with zipfile.ZipFile(f, 'r') as zip_file: files = [name for name in zip_file.namelist() if name.endswith('.csv')] if len(files) == 0: raise RuntimeError('Unable to find csv file in zip') with zip_file.open(files[0], 'r') as csv: first = True for r_line in csv: if first: first = False continue line = r_line.decode('utf8').rstrip() row = [c[1:-1] if c[0] == '"' else int(c) for c in line.split(';')] rows.append((int(row[0]), str(row[1]), int(row[3]), str(row[4]))) return rows def download_ov_land(bundesland: str) -> List[OvRow]: rows = [] with tempfile.NamedTemporaryFile() as f: r = requests.get(OV_URL.format(bundesland)) if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') for chunk in r.iter_content(chunk_size=8192): f.write(chunk) pdf = PyPDF2.PdfFileReader(f) valid = False for page in pdf.pages: page_num = pdf.getPageNumber(page) text = page.extractText() if len(text) < 100: if text.strip().replace(' ', '') == 'Ortsverzeichnis': valid = True continue elif valid and text.strip().replace(' ', '') == 'ALPHABETISCHEVERZEICHNISSE': break if not valid: continue with open(f'out/{bundesland}/{page_num + 1}.txt', 'w+') as o: o.write(text) return rows def download_ov() -> None: try: os.mkdir('out') for name in OV_NAMES: os.mkdir(f'out/{name}') download_ov_land(name) except FileExistsError: print('Using cache') return def parse_ov() -> List[OvRow]: rows = [] for bundesland in sorted(os.listdir('out')): gkz = None last = None for page_name in sorted(os.listdir(f'out/{bundesland}')): with open(f'out/{bundesland}/{page_name}', 'r') as f: cont = False for line in f: line = line.rstrip() m1 = ORT.match(line) m2 = GEM.match(line) if last is not None: if line == 'Gemeindename': cont = True break elif m1 is None: last = f'{last} {line}' print(last) val = STRIP_NUM.sub('', ORT.match(last).group(0)) okz = int(val[:5]) name = STRIP_CODE.sub('', val[6:]) rows.append((gkz, okz, name)) last = None continue else: val = last okz = int(val[:5]) name = STRIP_CODE.sub('', val[6:]) rows.append((gkz, okz, name)) last = None if 'Katastralgemeinden:' in line: p1 = line.find('Katastralgemeinden:') p2 = line.find('Postleitzahl') val = [' '.join(a.split(' ')[:-2]) for a in line[p1 + 20:p2].split(', ')] GKZ[gkz] = val continue if m1: val = STRIP_NUM.sub('', m1.group(0)) if m1.group(4) == '': last = val else: okz = int(val[:5]) name = STRIP_CODE.sub('', val[6:]) rows.append((gkz, okz, name)) print(rows[-1]) elif m2: if len(line.split(' ')) <= 9: continue gkz = int(m2.group(1).replace(' ', '')) if gkz > 90000: # Gemeinde Wien gkz = 90001 elif gkz >= 32400 and gkz <= 32499: # ehem. Bezirk Wien Umgebung gkz = WIEN_UMGEBUNG[gkz] if cont: continue return rows def write_sql(plz_rows: List[PlzRow], kgv_rows: List[KgvRow], ov_rows: List[OvRow]) -> None: kgv = {kgnr: (kg_name, gkz, gem_name) for kgnr, kg_name, gkz, gem_name in kgv_rows} ov = {okz: (o_name, gkz) for plz, _, okz, o_name, gkz, _ in plz_rows} ov.update({okz: (name, gkz) for gkz, okz, name in ov_rows}) gemeinden = {gkz: (gem_name, [], []) for kgnr, kg_name, gkz, gem_name in kgv_rows} pr = set() with open('plz.sql', 'wb') as f: f.write(b"\nINSERT INTO AT_gem VALUES\n") for gkz, (name, _, _) in gemeinden.items(): f.write(f"({gkz:5}, '{name}'),\n".encode('utf8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_kg VALUES\n") for kgnr, name, gkz, _ in kgv_rows: gemeinden[gkz][1].append(kgnr) f.write(f"({kgnr:5}, {gkz:5}, '{name}'),\n".encode('utf8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_ort VALUES\n") for okz, (name, gkz) in ov.items(): kgnr_o = None if name.startswith('Wien '): name = name.replace('Wien ', 'Wien, ').replace('.', '. ') elif 'Bez.' in name: name = name.replace(',', ', ', 1).replace('.Bez.:', '. Bezirk: ').replace('0', '') if gkz not in gemeinden: print(okz, name, gkz) pr.add(gkz) continue if len(gemeinden[gkz][1]) == 1: kgnr_o = gemeinden[gkz][1][0] else: for kgnr in gemeinden[gkz][1]: n11 = name.lower().replace('-', '').replace('th', 't') n12 = name.lower().replace('-', ' ').replace('th', 't') n21 = kgv[kgnr][0].lower().replace('-', '').replace('th', 't') n22 = kgv[kgnr][0].lower().replace('-', ' ').replace('th', 't') if n11 in n21 or n11 in n22 or n12 in n21 or n12 in n22 or n21 in n11 or n21 in n12 or n22 in n11 or n22 in n12: kgnr_o = kgnr f.write(f"({okz:5}, {gkz:5}, {kgnr_o if kgnr_o is not None else 'NULL':>5}, '{name}'),\n".encode('utf8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_plz VALUES\n") for plz, dest, okz, _, _, _ in plz_rows: f.write(f"({plz:4}, {okz:5}, '{dest}'),\n".encode('utf8')) f.seek(-2, 1) f.write(b';\n') p = set() for e in pr: possible = filter(lambda a: a[2] // 10000 == e // 10000, kgv_rows.copy()) if e in GKZ: for name in GKZ[e]: if name == '': continue possible = filter(lambda a: a[1] == name, possible) possible = list(possible) if len(possible) == 1: print(f' {e}: {possible[0][2]},') p.add(e) u = list(pr - p) u.sort() print(u) if __name__ == '__main__': parser = argparse.ArgumentParser() args = parser.parse_args() print('Downloading Ortsverzeichnis from statistik.gv.at') download_ov() print('Parsing Ortsverzeichnis') ov_data = parse_ov() print('Downloading PLZ data from www.post.at') plz_data = download_plz() print('Downloading Katastralgemeindenverzeichnis from www.bev.gv.at') kgv_data = download_kgv() print('Generating plz.sql') write_sql(plz_data, kgv_data, ov_data) print('Successfully created plz.sql!')