Add AT_plz_dest table

This commit is contained in:
2023-03-14 11:25:18 +01:00
parent 783396a0b0
commit 827fcb517b
6 changed files with 92 additions and 32 deletions

View File

@ -34,12 +34,23 @@ python3 plz.py
cd .. cd ..
```` ````
## Create temporary database
This will create `wgtemp.sqlite3`.
```shell
rm wgtemp.sqlite3
echo ".q" | ./sqlite.sh wgtemp.sqlite3
```
## Migrate ## Migrate
This will create `wgprod.sqlite3` and migrate all WG Master data into it.
```shell ```shell
cd wgmaster cd wgmaster
python3 migrate.py -q TABLES_DIR MIGRATE_DIR -d DB.sqlite3 python3 migrate.py -q TABLES_DIR MIGRATE_DIR -d ../wgtemp.sqlite3
python3 import.py MIGRATE_DIR NEW.sqlite3 python3 import.py MIGRATE_DIR ../wgprod.sqlite3
cd .. cd ..
``` ```

View File

@ -1,6 +1,6 @@
#!/bin/env python3 #!/bin/env python3
from typing import List, Tuple from typing import List, Tuple, Callable
import argparse import argparse
import requests import requests
import re import re
@ -12,7 +12,8 @@ import PyPDF2
PLZ_URL = 'https://www.post.at/g/c/postlexikon' PLZ_URL = 'https://www.post.at/g/c/postlexikon'
PLZ_BUTTON = re.compile(r'title="PLZ Bestimmungsort" href="(.*?)"') PLZ_BUTTON = re.compile(r'title="PLZ Verzeichnis" href="(.*?)"')
PLZ_DEST_BUTTON = re.compile(r'title="PLZ Bestimmungsort" href="(.*?)"')
KGV_URL = 'https://www.bev.gv.at/Services/Downloads/Produktbezogene-Downloads/Unentgeltliche-Produkte/Kataster-Verzeichnisse/Katastralgemeindenverzeichnis.html' KGV_URL = 'https://www.bev.gv.at/Services/Downloads/Produktbezogene-Downloads/Unentgeltliche-Produkte/Kataster-Verzeichnisse/Katastralgemeindenverzeichnis.html'
KGV_LINK = re.compile(r'<a\s+href="(.*?)".*?>Katastral') KGV_LINK = re.compile(r'<a\s+href="(.*?)".*?>Katastral')
@ -31,10 +32,23 @@ CODES = re.compile(r'Agh|Alm|Alpe|B|Bd|Bwg|Burg|Camp|D|E|Fbk|Fhei|Gh|Hgr|Hot|Ind
STRIP_CODE = re.compile(r' *(' + CODES.pattern + r')[()X0-9. -]*$') STRIP_CODE = re.compile(r' *(' + CODES.pattern + r')[()X0-9. -]*$')
STRIP_CODE_ALL = re.compile(r' +\b(' + CODES.pattern + r')\b.*$') STRIP_CODE_ALL = re.compile(r' +\b(' + CODES.pattern + r')\b.*$')
PlzRow = Tuple[int, str, int, str, int, str] PlzRow = Tuple[int, str, int, str, bool, bool, bool]
PlzDestRow = Tuple[int, str, int, str, int, str]
KgvRow = Tuple[int, str, int, str] KgvRow = Tuple[int, str, int, str]
OvRow = Tuple[int, int, str] OvRow = Tuple[int, int, str]
PLZ_BUNDESLAND = {
'B': 1,
'K': 2,
'N': 3,
'O': 4,
'Sa': 5,
'St': 6,
'T': 7,
'V': 8,
'W': 9,
}
WIEN_UMGEBUNG = { WIEN_UMGEBUNG = {
32401: 30729, 32401: 30729,
32402: 30730, 32402: 30730,
@ -62,23 +76,23 @@ WIEN_UMGEBUNG = {
GKZ = {} GKZ = {}
def get_plz_url() -> str: def get_plz_url(button: re.Pattern) -> str:
r = requests.get(PLZ_URL, headers={'User-Agent': 'Mozilla/5.0'}) r = requests.get(PLZ_URL, headers={'User-Agent': 'Mozilla/5.0'})
if r.status_code != 200: if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
matches = PLZ_BUTTON.findall(r.text) matches = button.findall(r.text)
if len(matches) == 0: if len(matches) == 0:
raise RuntimeError('Unable to find url of file') raise RuntimeError('Unable to find url of file')
return matches[0] return matches[0]
def download_plz() -> List[PlzRow]: def download_excel(url: str, transform: Callable[[List[str]], Tuple]) -> List:
f_name = None f_name = None
try: try:
f = tempfile.NamedTemporaryFile(delete=False) f = tempfile.NamedTemporaryFile(delete=False)
with requests.get(get_plz_url(), stream=True, headers={'User-Agent': 'Mozilla/5.0'}) as r: with requests.get(url, stream=True, headers={'User-Agent': 'Mozilla/5.0'}) as r:
if r.status_code != 200: if r.status_code != 200:
raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}') raise RuntimeError(f'Unexpected response: {r.status_code} {r.reason}')
for chunk in r.iter_content(chunk_size=8192): for chunk in r.iter_content(chunk_size=8192):
@ -91,13 +105,24 @@ def download_plz() -> List[PlzRow]:
sheet = wb.sheet_by_index(0) sheet = wb.sheet_by_index(0)
for r in range(1, sheet.nrows): for r in range(1, sheet.nrows):
row = sheet.row_values(r) row = sheet.row_values(r)
rows.append((int(row[0]), row[1], int(row[2]), row[3], int(row[4]), row[5])) rows.append(transform(row))
return rows return rows
finally: finally:
if f_name is not None: if f_name is not None:
os.remove(f_name) os.remove(f_name)
def download_plz() -> List[PlzRow]:
return download_excel(get_plz_url(PLZ_BUTTON),
lambda r: (int(r[0]), r[1], PLZ_BUNDESLAND[r[2]], r[5],
r[6] == 'intern', r[7] == 'Ja', r[8] == 'Ja'))
def download_plz_dest() -> List[PlzDestRow]:
return download_excel(get_plz_url(PLZ_DEST_BUTTON),
lambda r: (int(r[0]), r[1], int(r[2]), r[3], int(r[4]), r[5]))
def get_kvg_zip_url() -> str: def get_kvg_zip_url() -> str:
r = requests.get(KGV_URL) r = requests.get(KGV_URL)
if r.status_code != 200: if r.status_code != 200:
@ -244,9 +269,9 @@ def parse_ov() -> List[OvRow]:
return rows return rows
def write_sql(plz_rows: List[PlzRow], kgv_rows: List[KgvRow], ov_rows: List[OvRow]) -> None: def write_sql(plz_rows: List[PlzRow], plz_dest_rows: List[PlzDestRow], kgv_rows: List[KgvRow], ov_rows: List[OvRow]) -> None:
kgv = {kgnr: (kg_name, gkz, gem_name) for kgnr, kg_name, gkz, gem_name in kgv_rows} kgv = {kgnr: (kg_name, gkz, gem_name) for kgnr, kg_name, gkz, gem_name in kgv_rows}
ov = {okz: (o_name, gkz) for plz, _, okz, o_name, gkz, _ in plz_rows} ov = {okz: (o_name, gkz) for plz, _, okz, o_name, gkz, _ in plz_dest_rows}
ov.update({okz: (name, gkz) for gkz, okz, name in ov_rows}) ov.update({okz: (name, gkz) for gkz, okz, name in ov_rows})
gemeinden = {gkz: (gem_name, [], []) for kgnr, kg_name, gkz, gem_name in kgv_rows} gemeinden = {gkz: (gem_name, [], []) for kgnr, kg_name, gkz, gem_name in kgv_rows}
@ -297,7 +322,14 @@ def write_sql(plz_rows: List[PlzRow], kgv_rows: List[KgvRow], ov_rows: List[OvRo
f.write(b';\n') f.write(b';\n')
f.write(b"\nINSERT INTO AT_plz VALUES\n") f.write(b"\nINSERT INTO AT_plz VALUES\n")
for plz, dest, okz, _, _, _ in plz_rows: for plz, ort, blnr, plz_type, internal, addr, po_box in plz_rows:
f.write(f"({plz:4}, '{ort}', {blnr}, '{plz_type}', {internal and 'TRUE' or 'FALSE'}, "
f"{addr and 'TRUE' or 'FALSE'}, {po_box and 'TRUE' or 'FALSE'}),\n".encode('utf8'))
f.seek(-2, 1)
f.write(b';\n')
f.write(b"\nINSERT INTO AT_plz_dest VALUES\n")
for plz, dest, okz, _, _, _ in plz_dest_rows:
f.write(f"({plz:4}, {okz:5}, '{dest}'),\n".encode('utf8')) f.write(f"({plz:4}, {okz:5}, '{dest}'),\n".encode('utf8'))
f.seek(-2, 1) f.seek(-2, 1)
f.write(b';\n') f.write(b';\n')
@ -330,8 +362,10 @@ if __name__ == '__main__':
ov_data = parse_ov() ov_data = parse_ov()
print('Downloading PLZ data from www.post.at') print('Downloading PLZ data from www.post.at')
plz_data = download_plz() plz_data = download_plz()
print('Downloading PLZ destination data from www.post.at')
plz_dest_data = download_plz_dest()
print('Downloading Katastralgemeindenverzeichnis from www.bev.gv.at') print('Downloading Katastralgemeindenverzeichnis from www.bev.gv.at')
kgv_data = download_kgv() kgv_data = download_kgv()
print('Generating 90.plz.sql') print('Generating 90.plz.sql')
write_sql(plz_data, kgv_data, ov_data) write_sql(plz_data, plz_dest_data, kgv_data, ov_data)
print('Successfully created 90.plz.sql!') print('Successfully created 90.plz.sql!')

View File

@ -1,9 +1,9 @@
INSERT INTO AT_plz (plz, okz, dest) INSERT INTO AT_plz_dest (plz, okz, dest)
VALUES (2241, 3560, 'Schönkirchen-Reyersdorf'), VALUES (2241, 3560, 'Schönkirchen-Reyersdorf'),
(2165, 5013, 'Drasenhofen'), (2165, 5013, 'Drasenhofen'),
(2134, 5115, 'Staatz-Kautendorf'); (2134, 5115, 'Staatz-Kautendorf');
UPDATE AT_ort SET name = 'Etzmannsdorf am Kamp' WHERE okz = 3938; UPDATE AT_ort SET name = 'Etzmannsdorf am Kamp' WHERE okz = 3938;
DELETE FROM AT_plz WHERE (plz, okz) = (2231, 5011); DELETE FROM AT_plz_dest WHERE (plz, okz) = (2231, 5011);

View File

@ -91,6 +91,22 @@ CREATE TABLE AT_ort (
CREATE TABLE AT_plz ( CREATE TABLE AT_plz (
plz INTEGER NOT NULL CHECK (plz >= 1000 AND plz <= 9999), plz INTEGER NOT NULL CHECK (plz >= 1000 AND plz <= 9999),
ort TEXT NOT NULL,
blnr INTEGER NOT NULL,
type TEXT NOT NULL,
internal INTEGER NOT NULL CHECK (internal IN (TRUE, FALSE)),
addressable INTEGER NOT NULL CHECK (addressable IN (TRUE, FALSE)),
po_box INTEGER NOT NULL CHECK (po_box IN (TRUE, FALSE)),
CONSTRAINT pk_AT_plz PRIMARY KEY (plz),
CONSTRAINT fk_AT_plz_AT_bundesland FOREIGN KEY (blnr) REFERENCES AT_bundesland (blnr)
ON UPDATE CASCADE
ON DELETE RESTRICT
);
CREATE TABLE AT_plz_dest (
plz INTEGER NOT NULL CHECK (plz >= 1000 AND plz <= 9999),
okz INTEGER NOT NULL, okz INTEGER NOT NULL,
country TEXT NOT NULL GENERATED ALWAYS AS ('AT') VIRTUAL, country TEXT NOT NULL GENERATED ALWAYS AS ('AT') VIRTUAL,
@ -98,30 +114,30 @@ CREATE TABLE AT_plz (
dest TEXT NOT NULL, dest TEXT NOT NULL,
CONSTRAINT pk_AT_plz PRIMARY KEY (plz, okz), CONSTRAINT pk_AT_plz_dest PRIMARY KEY (plz, okz),
CONSTRAINT sk_AT_plz_id UNIQUE (id), CONSTRAINT sk_AT_plz_dest_id UNIQUE (id),
CONSTRAINT fk_AT_plz_AT_ort FOREIGN KEY (okz) REFERENCES AT_ort (okz) CONSTRAINT fk_AT_plz_dest_AT_ort FOREIGN KEY (okz) REFERENCES AT_ort (okz)
ON UPDATE CASCADE ON UPDATE CASCADE
ON DELETE RESTRICT, ON DELETE RESTRICT,
CONSTRAINT fk_AT_plz_postal_dest FOREIGN KEY (country, id) REFERENCES postal_dest (country, id) CONSTRAINT fk_AT_plz_dest_postal_dest FOREIGN KEY (country, id) REFERENCES postal_dest (country, id)
ON UPDATE CASCADE ON UPDATE CASCADE
ON DELETE CASCADE ON DELETE CASCADE
) STRICT; ) STRICT;
CREATE TRIGGER t_AT_plz_i CREATE TRIGGER t_AT_plz_dest_i
AFTER INSERT ON AT_plz FOR EACH ROW AFTER INSERT ON AT_plz_dest FOR EACH ROW
BEGIN BEGIN
INSERT INTO postal_dest (country, id) VALUES (NEW.country, NEW.id); INSERT INTO postal_dest (country, id) VALUES (NEW.country, NEW.id);
END; END;
CREATE TRIGGER t_AT_plz_u CREATE TRIGGER t_AT_plz_dest_u
AFTER UPDATE OF id ON AT_plz FOR EACH ROW AFTER UPDATE OF id ON AT_plz_dest FOR EACH ROW
BEGIN BEGIN
UPDATE postal_dest SET country = NEW.country, id = NEW.id WHERE (country, id) = (OLD.country, OLD.id); UPDATE postal_dest SET country = NEW.country, id = NEW.id WHERE (country, id) = (OLD.country, OLD.id);
END; END;
CREATE TRIGGER t_AT_plz_d CREATE TRIGGER t_AT_plz_dest_d
AFTER DELETE ON AT_plz FOR EACH ROW AFTER DELETE ON AT_plz_dest FOR EACH ROW
BEGIN BEGIN
DELETE FROM postal_dest WHERE (country, id) = (OLD.country, OLD.id); DELETE FROM postal_dest WHERE (country, id) = (OLD.country, OLD.id);
END; END;

View File

@ -3,4 +3,4 @@ CREATE VIEW v_plz AS
SELECT plz, p.dest AS bestimmungsort, g.name AS gemeinde, g.gkz, o.name AS ort, o.okz SELECT plz, p.dest AS bestimmungsort, g.name AS gemeinde, g.gkz, o.name AS ort, o.okz
FROM AT_gem g FROM AT_gem g
JOIN AT_ort o ON o.gkz = g.gkz JOIN AT_ort o ON o.gkz = g.gkz
JOIN AT_plz p ON p.okz = o.okz; JOIN AT_plz_dest p ON p.okz = o.okz;

View File

@ -153,7 +153,7 @@ def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] =
return None return None
cur = DB_CNX.cursor() cur = DB_CNX.cursor()
cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,)) cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,))
rows: List[Tuple[int, str, str]] = cur.fetchall() rows: List[Tuple[int, str, str]] = cur.fetchall()
cur.close() cur.close()
@ -176,8 +176,7 @@ def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] =
# Götzendorf # Götzendorf
return plz * 100000 + 3571 return plz * 100000 + 3571
print(ort, address) raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})')
raise RuntimeError()
def lookup_kgnr(okz: Optional[int]) -> Optional[int]: def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
@ -226,10 +225,10 @@ def migrate_branches(in_dir: str, out_dir: str) -> None:
with open(f'{out_dir}/branch.csv', 'w+') as f: with open(f'{out_dir}/branch.csv', 'w+') as f:
f.write('zwstid;name;country;postal_dest;address;phone_nr\n') f.write('zwstid;name;country;postal_dest;address;phone_nr\n')
for b in csv.parse(f'{in_dir}/TZweigstellen.csv'): for b in csv.parse(f'{in_dir}/TZweigstellen.csv'):
BRANCH_MAP[b['ZNR']] = b['Kennbst'].strip().title() BRANCH_MAP[b['ZNR']] = b['Kennbst']
address = b['Straße'] address = b['Straße']
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
f.write(csv.format_row(b['Kennbst'], b['Name'], 'AT', postal_dest, address, b['Telefon'])) f.write(csv.format_row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon']))
def migrate_grosslagen(in_dir: str, out_dir: str) -> None: def migrate_grosslagen(in_dir: str, out_dir: str) -> None: