#!/bin/env python3 # -*- coding: utf-8 -*- from typing import List, Tuple, Callable import argparse import requests import re import xlrd import tempfile import os import zipfile import PyPDF2 PLZ_URL = 'https://www.post.at/g/c/postlexikon' PLZ_BUTTON = re.compile(r'title="PLZ Verzeichnis" href="(.*?)"') PLZ_DEST_BUTTON = re.compile(r'title="PLZ Bestimmungsort" href="(.*?)"') KGV_URL = 'https://www.bev.gv.at/Services/Downloads/Produktbezogene-Downloads/Unentgeltliche-Produkte/Kataster-Verzeichnisse/Katastralgemeindenverzeichnis.html' KGV_LINK = re.compile(r'Katastral') OV_URL = 'https://statistik.gv.at/fileadmin/publications/Ortsverzeichnis_2001__{}.pdf' OV_NAMES = ['Burgenland', 'Niederoesterreich', 'Wien'] GEM = re.compile(r'^([1-9][0-9]{2} [0-9]{2}) [X0-9]+') ORT = re.compile(r'^([0-9]{5}) (([A-Za-zÄÖÜäöü(][A-Za-z0-9äöüÄÖÜßẞ():,.-]* ?|[0-9]+\..*?)+)' r'(( [()X0-9.-]+)*)?([A-Za-zÄÖÜäöüßẞ ]+([0-9]*))?$') STRIP_NUM = re.compile(r'[X0-9. -]+$') STRIP_INV = re.compile(r' [A-Za-z]+[0-9]+$') CODES = re.compile(r'Agh|Alm|Alpe|B|Bd|Bwg|Burg|Camp|D|E|Fbk|Fhei|Gh|Hgr|Hot|Indz|Jh|Jhtt|Ki|Kl|Krwk|Ks|M|Mh|' r'Mü|R|Ru|Sa|Sä|Sb|Schh|Schih|Schl|Sdlg|Sgr|St|Stbr|Stt|V|W|We|Ek|Z|Zgl|ZH') STRIP_CODE = re.compile(r' *(' + CODES.pattern + r')[()X0-9. -]*$') STRIP_CODE_ALL = re.compile(r' +\b(' + CODES.pattern + r')\b.*$') PlzRow = Tuple[int, str, int, str, bool, bool, bool] PlzDestRow = Tuple[int, str, int, str, int, str] KgvRow = Tuple[int, str, int, str] OvRow = Tuple[int, int, str] PLZ_BUNDESLAND = { 'B': 1, 'K': 2, 'N': 3, 'O': 4, 'Sa': 5, 'St': 6, 'T': 7, 'V': 8, 'W': 9, } WIEN_UMGEBUNG = { 32401: 30729, 32402: 30730, 32403: 31949, 32404: 31235, 32405: 30731, 32406: 30732, 32407: 30733, 32408: 32144, 32409: 30734, 32410: 30735, 32411: 30736, 32412: 31950, 32413: 30737, 32415: 31951, 32416: 31952, 32417: 30738, 32418: 30739, 32419: 30740, 32421: 31953, 32423: 31954, 32424: 30741, } GKZ = {} def get_plz_url(button: re.Pattern) -> str: r = requests.get(PLZ_URL, headers={'User-Agent': 'Mozilla/5.0'}) if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') matches = button.findall(r.text) if len(matches) == 0: raise RuntimeError('Unable to find url of file') return matches[0] def download_excel(url: str, transform: Callable[[List[str]], Tuple]) -> List: f_name = None try: f = tempfile.NamedTemporaryFile(delete=False) with requests.get(url, stream=True, headers={'User-Agent': 'Mozilla/5.0'}) as r: if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') for chunk in r.iter_content(chunk_size=8192): f.write(chunk) f_name = f.name f.close() rows = [] wb = xlrd.open_workbook(f_name) sheet = wb.sheet_by_index(0) for r in range(1, sheet.nrows): row = sheet.row_values(r) rows.append(transform(row)) return rows finally: if f_name is not None: os.remove(f_name) def download_plz() -> List[PlzRow]: return download_excel(get_plz_url(PLZ_BUTTON), lambda r: (int(r[0]), r[1], PLZ_BUNDESLAND[r[2]], r[5], r[6] == 'intern', r[7] == 'Ja', r[8] == 'Ja')) def download_plz_dest() -> List[PlzDestRow]: return download_excel(get_plz_url(PLZ_DEST_BUTTON), lambda r: (int(r[0]), r[1], int(r[2]), r[3], int(r[4]), r[5])) def get_kvg_zip_url() -> str: r = requests.get(KGV_URL) if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') matches = KGV_LINK.findall(r.text) if len(matches) == 0: raise RuntimeError('Unable to find url of zip file') return matches[0] def download_kgv() -> List[KgvRow]: with tempfile.NamedTemporaryFile() as f: with requests.get(get_kvg_zip_url(), stream=True) as r: if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') for chunk in r.iter_content(chunk_size=8192): f.write(chunk) rows = [] with zipfile.ZipFile(f, 'r') as zip_file: files = [name for name in zip_file.namelist() if name.endswith('.csv')] if len(files) == 0: raise RuntimeError('Unable to find csv file in zip') with zip_file.open(files[0], 'r') as csv: first = True for r_line in csv: if first: first = False continue line = r_line.decode('utf-8').rstrip() row = [c[1:-1] if c[0] == '"' else int(c) for c in line.split(';')] rows.append((int(row[0]), str(row[1]), int(row[3]), str(row[4]))) return rows def download_ov_land(bundesland: str) -> List[OvRow]: rows = [] with tempfile.NamedTemporaryFile() as f: r = requests.get(OV_URL.format(bundesland)) if r.status_code != 200: raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') for chunk in r.iter_content(chunk_size=8192): f.write(chunk) pdf = PyPDF2.PdfReader(f) valid = False for page in pdf.pages: page_num = pdf.get_page_number(page) text = page.extract_text() if len(text) < 100: if text.strip().replace(' ', '') == 'Ortsverzeichnis': valid = True continue elif valid and text.strip().replace(' ', '') == 'ALPHABETISCHEVERZEICHNISSE': break if not valid: continue with open(f'out/{bundesland}/{page_num + 1:03}.txt', 'w+', encoding='utf-8') as o: o.write(text) return rows def download_ov() -> None: try: os.mkdir('out') for name in OV_NAMES: os.mkdir(f'out/{name}') download_ov_land(name) except FileExistsError: print('Using cache') return def parse_ov() -> List[OvRow]: rows = [] for bundesland in sorted(os.listdir('out')): gkz = None last = None for page_name in sorted(os.listdir(f'out/{bundesland}')): with open(f'out/{bundesland}/{page_name}', 'r', encoding='utf-8') as f: cont = False for line in f: line = line.rstrip() m1 = ORT.match(line) m2 = GEM.match(line) if last is not None: if line == 'Gemeindename': cont = True break elif m1 is None: if ',' not in line and ':' not in line: last = f'{last} {line}' m3 = ORT.match(last) okz = int(m3.group(1)) name = STRIP_CODE_ALL.sub('', m3.group(2)) rows.append((gkz, okz, name)) last = None continue else: m3 = ORT.match(last) okz = int(m3.group(1)) name = STRIP_CODE.sub('', m3.group(2)) rows.append((gkz, okz, name)) last = None if 'Katastralgemeinden:' in line: p1 = line.find('Katastralgemeinden:') p2 = line.find('Postleitzahl') val = [' '.join(a.split(' ')[:-2]) for a in line[p1 + 20:p2].split(', ')] GKZ[gkz] = val continue if m1: val = STRIP_NUM.sub('', m1.group(0)) if m1.group(4) == '' or (m1.group(6) is not None and m1.group(6) != ''): last = m1.group(1) + ' ' + m1.group(2) else: okz = int(m1.group(1)) name = STRIP_CODE.sub('', m1.group(2)) if len(name) == len(m1.group(2)): last = val else: rows.append((gkz, okz, name)) elif m2: if len(line.split(' ')) <= 9: continue gkz = int(m2.group(1).replace(' ', '')) if gkz > 90000: # Gemeinde Wien gkz = 90001 elif gkz >= 32400 and gkz <= 32499: # ehem. Bezirk Wien Umgebung gkz = WIEN_UMGEBUNG[gkz] if cont: continue return rows def write_sql(plz_rows: List[PlzRow], plz_dest_rows: List[PlzDestRow], kgv_rows: List[KgvRow], ov_rows: List[OvRow]) -> None: kgv = {kgnr: (kg_name, gkz, gem_name) for kgnr, kg_name, gkz, gem_name in kgv_rows} ov = {okz: (o_name, gkz) for plz, _, okz, o_name, gkz, _ in plz_dest_rows} ov.update({okz: (name, gkz) for gkz, okz, name in ov_rows}) gemeinden = {gkz: (gem_name, [], []) for kgnr, kg_name, gkz, gem_name in kgv_rows} pr = set() with open('90.plz.sql', 'wb') as f: f.write(b"\nINSERT INTO AT_gem VALUES\n") for gkz, (name, _, _) in sorted(gemeinden.items(), key=lambda i: i[0]): f.write(f"({gkz:5}, '{name}'),\n".encode('utf-8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_kg VALUES\n") for kgnr, name, gkz, _ in sorted(kgv_rows, key=lambda i: i[0]): gemeinden[gkz][1].append(kgnr) f.write(f"({kgnr:5}, {gkz:5}, '{name}'),\n".encode('utf-8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_ort VALUES\n") for okz, (name, gkz) in sorted(ov.items(), key=lambda i: i[0]): kgnr_o = None if name.startswith('Wien '): name = name.replace('Wien ', 'Wien, ').replace('.', '. ') elif 'Bez.' in name: name = name.replace(',', ', ', 1).replace('.Bez.:', '. Bezirk: ').replace('0', '') name = STRIP_INV.sub('', name) if gkz not in gemeinden: print(okz, name, gkz) pr.add(gkz) continue if len(gemeinden[gkz][1]) == 1: kgnr_o = gemeinden[gkz][1][0] else: for kgnr in gemeinden[gkz][1]: n11 = name.lower().replace('-', '').replace('th', 't') n12 = name.lower().replace('-', ' ').replace('th', 't') n21 = kgv[kgnr][0].lower().replace('-', '').replace('th', 't') n22 = kgv[kgnr][0].lower().replace('-', ' ').replace('th', 't') if n11 in n21 or n11 in n22 or n12 in n21 or n12 in n22 or n21 in n11 or n21 in n12 or n22 in n11 or n22 in n12: kgnr_o = kgnr f.write(f"({okz:5}, {gkz:5}, {kgnr_o if kgnr_o is not None else 'NULL':>5}, '{name}'),\n".encode('utf-8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_plz VALUES\n") for plz, ort, blnr, plz_type, internal, addr, po_box in sorted(plz_rows, key=lambda i: i[0]): f.write(f"({plz:4}, '{ort}', {blnr}, '{plz_type}', {internal and 'TRUE' or 'FALSE'}, " f"{addr and 'TRUE' or 'FALSE'}, {po_box and 'TRUE' or 'FALSE'}),\n".encode('utf-8')) f.seek(-2, 1) f.write(b';\n') f.write(b"\nINSERT INTO AT_plz_dest VALUES\n") for plz, dest, okz, _, _, _ in sorted(plz_dest_rows, key=lambda i: (i[0], i[2])): f.write(f"({plz:4}, {okz:5}, '{dest}'),\n".encode('utf-8')) f.seek(-2, 1) f.write(b';\n') p = set() for e in pr: possible = filter(lambda a: a[2] // 10000 == e // 10000, kgv_rows.copy()) if e in GKZ: for name in GKZ[e]: if name == '': continue possible = filter(lambda a: a[1] == name, possible) possible = list(possible) if len(possible) == 1: print(f' {e}: {possible[0][2]},') p.add(e) u = list(pr - p) u.sort() print(u) if __name__ == '__main__': parser = argparse.ArgumentParser() args = parser.parse_args() print('Downloading Ortsverzeichnis from statistik.gv.at') download_ov() print('Parsing Ortsverzeichnis') ov_data = parse_ov() print('Downloading PLZ data from www.post.at') plz_data = download_plz() print('Downloading PLZ destination data from www.post.at') plz_dest_data = download_plz_dest() print('Downloading Katastralgemeindenverzeichnis from www.bev.gv.at') kgv_data = download_kgv() print('Generating 90.plz.sql') write_sql(plz_data, plz_dest_data, kgv_data, ov_data) print('Successfully created 90.plz.sql!')