Gemeinden

This commit is contained in:
2022-11-29 22:46:52 +01:00
parent f520c64052
commit 6e991ce019
2 changed files with 312 additions and 22 deletions

302
data/gemeinden.py Executable file
View File

@ -0,0 +1,302 @@
#!/bin/env python3
from typing import List, Tuple
import argparse
import requests
import re
import tempfile
import zipfile
import PyPDF2
GEM = re.compile(r'^([1-9][0-9]{2} [0-9]{2}) X')
ORT = re.compile(r'^([0-9]{5}) (([A-Za-zÄÖÜäöü][A-Za-z0-9äöüÄÖÜßẞ:.-]* |[0-9]+\..*?)+)( ?[0-9]+ |$)')
STRIP_NUM = re.compile(r'[X0-9. -]+$')
STRIP_CODE = re.compile(r' *(Agh|Alm|Alpe|B|Bd|Bwg|Burg|Camp|D|E|Fbk|Fhei|Gh|Hgr|Hot|Indz|Jh|Jhtt|Ki|Kl|Krwk|Ks|M|Mh|'
r'Mü|R|Ru|Sa|Sä|Sb|Schh|Schih|Schl|Sdlg|Sgr|St|Stbr|Stt|V|W|We|Ek|Z|Zgl|ZH) *(\([0-9]+\) *)?$')
KGV_URL = 'https://www.bev.gv.at/portal/page?_pageid=713,2601283&_dad=portal&_schema=PORTAL'
KGV_LINK = re.compile(r'<a\s+href="(.*?)"\s*>')
OV_URL = 'https://statistik.at/fileadmin/publications/Ortsverzeichnis_2001__{}.pdf'
OV_NAMES = ['Wien', 'Niederoesterreich', 'Oberoesterreich', 'Kaernten',
'Steiermark', 'Vorarlberg', 'Burgenland', 'Tirol', 'Salzburg']
WIEN_UMGEBUNG = {
32401: 30729,
32402: 30730,
32403: 31949,
32404: 31235,
32405: 30731,
32406: 30732,
32407: 30733,
32408: 32144,
32409: 30734,
32410: 30735,
32411: 30736,
32412: 31950,
32413: 30737,
32415: 31951,
32416: 31952,
32417: 30738,
32418: 30739,
32419: 30740,
32421: 31953,
32423: 31954,
32424: 30741,
}
GEMEINDEN = {
# Oberösterreich
40803: 40835,
40819: 40835,
41308: 41344,
41330: 41344,
41519: 41344,
41625: 41628,
# Steiermark
60204: 62139,
60207: 62142,
60209: 62140,
60211: 62139,
60213: 62125,
60216: 62128,
60217: 62146,
60219: 62147,
60301: 60345,
60307: 60344,
60309: 60350,
60310: 60348,
60313: 60345,
60315: 60349,
60327: 60347,
60330: 60348,
60331: 60349,
60336: 60347,
60339: 60346,
60407: 62311,
60410: 62378,
60416: 62380,
60426: 61057,
60435: 62382,
61504: 62383,
61508: 62390,
61510: 62383,
61514: 62377,
61517: 62380,
61009: 61054,
61011: 61012,
61015: 61052,
60512: 62273,
61034: 61052,
61036: 61061,
61041: 61050,
61602: 61631,
61609: 61630,
61104: 61107,
61620: 61628,
60601: 60665,
60603: 60666,
60607: 60664,
60614: 60664,
60620: 60661,
60634: 60619,
60658: 60660,
61706: 61711,
61712: 61759,
61713: 62266,
61720: 61763,
61721: 61727,
60703: 62272,
60706: 62211,
60708: 62270,
61218: 61222,
61223: 61260,
61734: 61763,
61737: 61762,
61226: 61254,
60723: 62275,
60726: 62277,
61238: 61266,
60729: 62266,
61244: 61256,
60736: 62265,
60738: 62275,
60741: 62268,
61301: 62144,
61306: 62115,
61308: 62144,
61311: 62143,
60801: 62044,
61314: 62132,
60804: 62010,
60809: 62048,
60810: 62044,
60814: 62044,
60821: 62042,
60824: 62039,
61411: 61439,
60907: 62034,
}
KgvRow = Tuple[int, str, int, str]
OvRow = Tuple[int, int, str]
def get_kvg_zip_url() -> str:
r = requests.get(KGV_URL)
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
matches = KGV_LINK.findall(r.text)
if len(matches) == 0:
raise RuntimeError('Unable to find url of zip file')
return matches[0]
def download_kgv() -> List[KgvRow]:
with tempfile.NamedTemporaryFile() as f:
with requests.get(get_kvg_zip_url(), stream=True) as r:
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
rows = []
with zipfile.ZipFile(f, 'r') as zip_file:
files = [name for name in zip_file.namelist() if name.endswith('.csv')]
if len(files) == 0:
raise RuntimeError('Unable to find csv file in zip')
with zip_file.open(files[0], 'r') as csv:
first = True
for r_line in csv:
if first:
first = False
continue
line = r_line.decode('utf8').rstrip()
row = [c[1:-1] if c[0] == '"' else int(c) for c in line.split(';')]
rows.append((int(row[0]), str(row[1]), int(row[3]), str(row[4])))
return rows
def download_ov_land(bundesland: str) -> List[OvRow]:
rows = []
with tempfile.NamedTemporaryFile() as f:
r = requests.get(OV_URL.format(bundesland))
if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
pdf = PyPDF2.PdfFileReader(f)
gkz = None
last = None
valid = False
for page in pdf.pages:
# page_num = pdf.getPageNumber(page)
text = page.extractText()
if len(text) < 100:
if text.strip().replace(' ', '') == 'Ortsverzeichnis':
valid = True
continue
elif valid and text.strip().replace(' ', '') == 'ALPHABETISCHEVERZEICHNISSE':
break
if not valid:
continue
lines = text.splitlines()
for line in lines:
if last is not None:
last += ' ' + line
val = STRIP_NUM.sub('', ORT.match(last).group(0))
okz = int(val[:5])
name = STRIP_CODE.sub('', val[6:])
rows.append((gkz, okz, name))
last = None
continue
m1 = ORT.match(line)
m2 = GEM.match(line)
if m1:
val = STRIP_NUM.sub('', m1.group(0))
if m1.group(4) == '':
last = val
else:
okz = int(val[:5])
name = STRIP_CODE.sub('', val[6:])
rows.append((gkz, okz, name))
elif m2:
gkz = int(m2.group(1).replace(' ', ''))
if gkz > 90000:
# Gemeinde Wien
gkz = 90001
elif gkz >= 32400 and gkz <= 32499:
# ehem. Bezirk Wien Umgebung
gkz = WIEN_UMGEBUNG[gkz]
elif gkz in GEMEINDEN:
# Gemeindereformen (OÖ, Stmk.)
gkz = GEMEINDEN[gkz]
return rows
def download_ov() -> List[OvRow]:
rows = []
for name in OV_NAMES:
rows += download_ov_land(name)
return rows
def write_sql(kgv_rows: List[KgvRow], ov_rows: List[OvRow]) -> None:
kgv = {kgnr: (kg_name, gkz, gem_name) for kgnr, kg_name, gkz, gem_name in kgv_rows}
ov = {okz: (name, gkz) for gkz, okz, name in ov_rows}
gemeinden = {gkz: (gem_name, []) for kgnr, kg_name, gkz, gem_name in kgv_rows}
with open('gemeinden.sql', 'wb') as f:
f.write(b"\nINSERT INTO AT_gem VALUES\n")
for gkz, (name, _) in gemeinden.items():
f.write(f"({gkz:5}, '{name}'),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_kg VALUES\n")
for kgnr, name, gkz, _ in kgv_rows:
gemeinden[gkz][1].append(kgnr)
f.write(f"({kgnr:5}, {gkz:5}, '{name}'),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_ort VALUES\n")
pr = set()
for gkz, okz, name in ov_rows:
if name.lower() == 'kollnbrunn':
print(name)
kgnr_o = None
if gkz not in gemeinden:
pr.add(gkz)
continue
for kgnr in gemeinden[gkz][1]:
if kgv[kgnr][0] in name or name in kgv[kgnr][0]:
kgnr_o = kgnr
f.write(f"({okz:5}, {kgnr_o if kgnr_o is not None else 'NULL':5}, '{name}'),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n')
print(pr)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
args = parser.parse_args()
print('Downloading and parsing Ortsverzeichnis from statistik.at')
ov_data = download_ov()
print('Downloading Katastralgemeindenverzeichnis from www.bev.gv.at')
kgv_data = download_kgv()
write_sql(kgv_data, ov_data)
print('Successfully created gemeinden.sql!')

View File

@ -12,6 +12,8 @@ import os
URL = 'https://www.post.at/g/c/postlexikon'
BUTTON = re.compile(r'title="PLZ Bestimmungsort" href="(.*?)"')
Row = Tuple[int, str, int, str, int, str]
def get_excel_url() -> str:
r = requests.get(URL, headers={'User-Agent': 'Mozilla/5.0'})
@ -25,7 +27,7 @@ def get_excel_url() -> str:
return matches[0]
def download_excel() -> List[Tuple[int, str, int, str, int, str]]:
def download_excel() -> List[Row]:
f_name = None
try:
f = tempfile.NamedTemporaryFile(delete=False)
@ -49,33 +51,19 @@ def download_excel() -> List[Tuple[int, str, int, str, int, str]]:
os.remove(f_name)
def write_sql(data: List[Tuple[int, str, int, str, int, str]]) -> None:
gemeinden = {gkz: name for _, _, _, _, gkz, name in data}
orte = {okz: (name, gkz) for _, _, okz, name, gkz, _ in data}
def write_sql(data: List[Row]) -> None:
with open('plz.sql', 'wb') as f:
f.write(b'\n')
f.write(b"INSERT INTO AT_gemeinde VALUES\n")
for gem_nr, gem_name in gemeinden.items():
f.write(f"({gem_nr}, {gem_nr // 10000}, '{gem_name}'),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n\n')
f.write(b"INSERT INTO AT_ort VALUES\n")
for okz, (name, gkz) in orte.items():
f.write(f"({okz:5}, {gkz}, '{name.replace(',', ', ').replace('.Bez.:', '. Bez.: ')}'),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n\n')
f.write(b"INSERT INTO AT_plz VALUES\n")
f.write(b"\nINSERT INTO AT_plz VALUES\n")
for plz, dest, okz, _, _, _ in data:
f.write(f"({plz}, {okz:5}, {100000 * plz + okz}, '{dest}'),\n".encode('utf8'))
f.write(f"({plz:4}, {okz:5}, '{dest}'),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n\n')
f.write(b';\n')
if __name__ == '__main__':
parser = argparse.ArgumentParser()
args = parser.parse_args()
print('Downloading PLZ data from www.post.at')
write_sql(download_excel())
print('Successfully created plz.sql!')