Add csv.py

This commit is contained in:
2023-03-13 21:45:01 +01:00
parent 6c314c5954
commit 7f76088be3
2 changed files with 98 additions and 86 deletions

View File

@ -1,13 +1,13 @@
#!/bin/env python3
from typing import Dict, Any, Tuple, Optional, Iterator, List
from typing import Dict, Any, Tuple, Optional, List
import argparse
import datetime
import os
import re
import sys
import sqlite3
import requests
import csv
DB_CNX: Optional[sqlite3.Connection] = None
@ -60,74 +60,6 @@ STREET_NAMES: Dict[str, str] = {
}
def parse_csv(filename: str) -> Iterator[Dict[str, Any]]:
def parse_line(line_str: str) -> Iterator[str]:
w = None
s = False
for ch in line_str:
if w is None:
if ch == ';':
yield ''
continue
elif ch in (' ', '\t'):
continue
w = ch
s = ch == '"'
continue
elif not s and ch in (';', '\n'):
yield w.strip()
w = None
continue
elif s and ch == '"':
s = False
w += ch
if w is not None:
yield w.strip()
with open(filename, 'r') as f:
header: Optional[Tuple[str]] = None
for line in f:
if header is None:
header = tuple([e.strip() for e in line.strip().split(';')])
continue
obj = {}
for i, part in enumerate(parse_line(line)):
if part == '':
part = None
elif part[0] == '"' and part[-1] == '"':
part = part[1:-1]
elif part == 'T':
part = True
elif part == 'F':
part = False
elif part.isdigit():
part = int(part)
elif re.match(r'[0-9]+\.[0-9]+', part):
part = float(part)
elif len(part) == 10 and part[4] == '-' and part[7] == '-':
part = datetime.datetime.strptime(part, '%Y-%m-%d').date()
else:
raise RuntimeError(part)
obj[header[i]] = part
yield obj
def format_row(*values) -> str:
row = ''
for val in values:
if val is None:
pass
elif type(val) == str:
row += f'"{val}"'
elif type(val) == bool:
row += 'T' if val else 'F'
else:
row += str(val)
row += ';'
return f'{row[:-1]}\n'
def success(mgnr: int, key: str, value: str) -> None:
if not args.quiet:
print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
@ -207,7 +139,7 @@ def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
fbs = parse_csv(f'{in_dir}/TFlaechenbindungen.csv')
fbs = csv.parse(f'{in_dir}/TFlaechenbindungen.csv')
members = {}
for f in fbs:
if f['MGNR'] not in members:
@ -293,18 +225,18 @@ def migrate_branches(in_dir: str, out_dir: str) -> None:
with open(f'{out_dir}/branch.csv', 'w+') as f:
f.write('zwstid;name;country;postal_dest;address;phone_nr\n')
for b in parse_csv(f'{in_dir}/TZweigstellen.csv'):
for b in csv.parse(f'{in_dir}/TZweigstellen.csv'):
BRANCH_MAP[b['ZNR']] = b['Kennbst']
address = b['Straße']
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
f.write(format_row(b['Kennbst'], b['Name'], 'AT', postal_dest, address, b['Telefon']))
f.write(csv.format_row(b['Kennbst'], b['Name'], 'AT', postal_dest, address, b['Telefon']))
def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
global GEM_MAP
GEM_MAP = {}
for g in parse_csv(f'{in_dir}/TGemeinden.csv'):
for g in csv.parse(f'{in_dir}/TGemeinden.csv'):
GEM_MAP[g['GNR']] = lookup_gem_name(g['Bezeichnung'])
@ -314,7 +246,7 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None:
with open(f'{out_dir}/wb_rd.csv', 'w+') as f:
f.write('kgnr;rdnr;name\n')
for r in parse_csv(f'{in_dir}/TRiede.csv'):
for r in csv.parse(f'{in_dir}/TRiede.csv'):
name: str = r['Bezeichnung'].strip()
if name.isupper():
name = name.title()
@ -326,11 +258,11 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None:
rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1
REED_MAP[r['RNR']] = (kgnr, rdnr)
f.write(format_row(kgnr, rdnr, name))
f.write(csv.format_row(kgnr, rdnr, name))
def migrate_members(in_dir: str, out_dir: str) -> None:
members = parse_csv(f'{in_dir}/TMitglieder.csv')
members = csv.parse(f'{in_dir}/TMitglieder.csv')
fbs = parse_flaechenbindungen(in_dir)
with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba:
@ -536,7 +468,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
if kgnr is None:
invalid(mgnr, 'KgNr.', ort)
f_m.write(format_row(
f_m.write(csv.format_row(
mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix,
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
m['BHKontonummer'], zwstid, bnr, ustid,
@ -546,7 +478,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
kgnr, m['Anmerkung']
))
if billing_name:
f_mba.write(format_row(mgnr, billing_name, 'AT', None, None))
f_mba.write(csv.format_row(mgnr, billing_name, 'AT', None, None))
def migrate_contracts(in_dir: str, out_dir: str) -> None:
@ -604,7 +536,7 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None:
f_c.write('vnr;mgnr;year_from;year_to\n')
f_fb.write('vnr;kgnr;gstnr;rdnr;area;sortid;attrid;cultid\n')
for fb in parse_csv(f'{in_dir}/TFlaechenbindungen.csv'):
for fb in csv.parse(f'{in_dir}/TFlaechenbindungen.csv'):
if fb['Von'] is None and fb['Bis'] is None:
continue
parz: str = fb['Parzellennummer']
@ -612,7 +544,9 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None:
gem = GEM_MAP[fb['GNR']]
kgnr = gem[0][0]
f_c.write(format_row(vnr, fb['MGNR'], fb['Von'], fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None))
f_c.write(csv.format_row(
vnr, fb['MGNR'], fb['Von'], fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
))
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
area = int(fb['Flaeche'])
@ -626,15 +560,21 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None:
for i, gstnr in enumerate(gstnrs or ['0000']):
a = area - gst_area * (len(gstnrs) - 1) if i == 0 else gst_area
rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None
f_fb.write(format_row(vnr, kgnr, gstnr, rdnr, a, fb['SNR'], fb['SANR'], CULTIVATION_MAP[fb['BANR']]))
f_fb.write(csv.format_row(
vnr, kgnr, gstnr, rdnr, a, fb['SNR'], fb['SANR'], CULTIVATION_MAP[fb['BANR']]
))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('in_dir')
parser.add_argument('out_dir')
parser.add_argument('-q', '--quiet', action='store_true', default=False)
parser.add_argument('-d', '--database', required=True)
parser.add_argument('in_dir', type=str,
help='The input directory where the exported csv files are stored')
parser.add_argument('out_dir', type=str,
help='The output directory where the migrated csv file should be stored')
parser.add_argument('-q', '--quiet', action='store_true', default=False,
help='Be less verbose')
parser.add_argument('-d', '--database', metavar='DB', required=True,
help='The sqlite database file to look up information')
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)