From 7f76088be38e567b3da111ac42d584ae26988afc Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Mon, 13 Mar 2023 21:45:01 +0100 Subject: [PATCH] Add csv.py --- wgmaster/csv.py | 72 ++++++++++++++++++++++++++++ wgmaster/migrate.py | 112 ++++++++++---------------------------------- 2 files changed, 98 insertions(+), 86 deletions(-) create mode 100644 wgmaster/csv.py diff --git a/wgmaster/csv.py b/wgmaster/csv.py new file mode 100644 index 0000000..9063a22 --- /dev/null +++ b/wgmaster/csv.py @@ -0,0 +1,72 @@ + +from typing import Iterator, Dict, Any, Optional, Tuple +import re +import datetime + + +def parse(filename: str) -> Iterator[Dict[str, Any]]: + def parse_line(line_str: str) -> Iterator[str]: + w = None + s = False + for ch in line_str: + if w is None: + if ch == ';': + yield '' + continue + elif ch in (' ', '\t'): + continue + w = ch + s = ch == '"' + continue + elif not s and ch in (';', '\n'): + yield w.strip() + w = None + continue + elif s and ch == '"': + s = False + w += ch + if w is not None: + yield w.strip() + + with open(filename, 'r') as f: + header: Optional[Tuple[str]] = None + for line in f: + if header is None: + header = tuple([e.strip() for e in line.strip().split(';')]) + continue + + obj = {} + for i, part in enumerate(parse_line(line)): + if part == '': + part = None + elif part[0] == '"' and part[-1] == '"': + part = part[1:-1] + elif part == 'T': + part = True + elif part == 'F': + part = False + elif part.isdigit(): + part = int(part) + elif re.match(r'[0-9]+\.[0-9]+', part): + part = float(part) + elif len(part) == 10 and part[4] == '-' and part[7] == '-': + part = datetime.datetime.strptime(part, '%Y-%m-%d').date() + else: + raise RuntimeError(part) + obj[header[i]] = part + yield obj + + +def format_row(*values) -> str: + row = '' + for val in values: + if val is None: + pass + elif type(val) == str: + row += f'"{val}"' + elif type(val) == bool: + row += 'T' if val else 'F' + else: + row += str(val) + row += ';' + return f'{row[:-1]}\n' diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index aeb7acc..6958631 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -1,13 +1,13 @@ #!/bin/env python3 -from typing import Dict, Any, Tuple, Optional, Iterator, List +from typing import Dict, Any, Tuple, Optional, List import argparse -import datetime import os import re import sys import sqlite3 import requests +import csv DB_CNX: Optional[sqlite3.Connection] = None @@ -60,74 +60,6 @@ STREET_NAMES: Dict[str, str] = { } -def parse_csv(filename: str) -> Iterator[Dict[str, Any]]: - def parse_line(line_str: str) -> Iterator[str]: - w = None - s = False - for ch in line_str: - if w is None: - if ch == ';': - yield '' - continue - elif ch in (' ', '\t'): - continue - w = ch - s = ch == '"' - continue - elif not s and ch in (';', '\n'): - yield w.strip() - w = None - continue - elif s and ch == '"': - s = False - w += ch - if w is not None: - yield w.strip() - - with open(filename, 'r') as f: - header: Optional[Tuple[str]] = None - for line in f: - if header is None: - header = tuple([e.strip() for e in line.strip().split(';')]) - continue - - obj = {} - for i, part in enumerate(parse_line(line)): - if part == '': - part = None - elif part[0] == '"' and part[-1] == '"': - part = part[1:-1] - elif part == 'T': - part = True - elif part == 'F': - part = False - elif part.isdigit(): - part = int(part) - elif re.match(r'[0-9]+\.[0-9]+', part): - part = float(part) - elif len(part) == 10 and part[4] == '-' and part[7] == '-': - part = datetime.datetime.strptime(part, '%Y-%m-%d').date() - else: - raise RuntimeError(part) - obj[header[i]] = part - yield obj - - -def format_row(*values) -> str: - row = '' - for val in values: - if val is None: - pass - elif type(val) == str: - row += f'"{val}"' - elif type(val) == bool: - row += 'T' if val else 'F' - else: - row += str(val) - row += ';' - return f'{row[:-1]}\n' - - def success(mgnr: int, key: str, value: str) -> None: if not args.quiet: print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) @@ -207,7 +139,7 @@ def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: - fbs = parse_csv(f'{in_dir}/TFlaechenbindungen.csv') + fbs = csv.parse(f'{in_dir}/TFlaechenbindungen.csv') members = {} for f in fbs: if f['MGNR'] not in members: @@ -293,18 +225,18 @@ def migrate_branches(in_dir: str, out_dir: str) -> None: with open(f'{out_dir}/branch.csv', 'w+') as f: f.write('zwstid;name;country;postal_dest;address;phone_nr\n') - for b in parse_csv(f'{in_dir}/TZweigstellen.csv'): + for b in csv.parse(f'{in_dir}/TZweigstellen.csv'): BRANCH_MAP[b['ZNR']] = b['Kennbst'] address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) - f.write(format_row(b['Kennbst'], b['Name'], 'AT', postal_dest, address, b['Telefon'])) + f.write(csv.format_row(b['Kennbst'], b['Name'], 'AT', postal_dest, address, b['Telefon'])) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP GEM_MAP = {} - for g in parse_csv(f'{in_dir}/TGemeinden.csv'): + for g in csv.parse(f'{in_dir}/TGemeinden.csv'): GEM_MAP[g['GNR']] = lookup_gem_name(g['Bezeichnung']) @@ -314,7 +246,7 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None: with open(f'{out_dir}/wb_rd.csv', 'w+') as f: f.write('kgnr;rdnr;name\n') - for r in parse_csv(f'{in_dir}/TRiede.csv'): + for r in csv.parse(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper(): name = name.title() @@ -326,11 +258,11 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None: rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr) - f.write(format_row(kgnr, rdnr, name)) + f.write(csv.format_row(kgnr, rdnr, name)) def migrate_members(in_dir: str, out_dir: str) -> None: - members = parse_csv(f'{in_dir}/TMitglieder.csv') + members = csv.parse(f'{in_dir}/TMitglieder.csv') fbs = parse_flaechenbindungen(in_dir) with open(f'{out_dir}/member.csv', 'w+') as f_m, open(f'{out_dir}/member_billing_address.csv', 'w+') as f_mba: @@ -536,7 +468,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: if kgnr is None: invalid(mgnr, 'KgNr.', ort) - f_m.write(format_row( + f_m.write(csv.format_row( mgnr, m['MGNR-Vorgänger'], prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid, @@ -546,7 +478,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: kgnr, m['Anmerkung'] )) if billing_name: - f_mba.write(format_row(mgnr, billing_name, 'AT', None, None)) + f_mba.write(csv.format_row(mgnr, billing_name, 'AT', None, None)) def migrate_contracts(in_dir: str, out_dir: str) -> None: @@ -604,7 +536,7 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None: f_c.write('vnr;mgnr;year_from;year_to\n') f_fb.write('vnr;kgnr;gstnr;rdnr;area;sortid;attrid;cultid\n') - for fb in parse_csv(f'{in_dir}/TFlaechenbindungen.csv'): + for fb in csv.parse(f'{in_dir}/TFlaechenbindungen.csv'): if fb['Von'] is None and fb['Bis'] is None: continue parz: str = fb['Parzellennummer'] @@ -612,7 +544,9 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None: gem = GEM_MAP[fb['GNR']] kgnr = gem[0][0] - f_c.write(format_row(vnr, fb['MGNR'], fb['Von'], fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None)) + f_c.write(csv.format_row( + vnr, fb['MGNR'], fb['Von'], fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None + )) gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR']) area = int(fb['Flaeche']) @@ -626,15 +560,21 @@ def migrate_contracts(in_dir: str, out_dir: str) -> None: for i, gstnr in enumerate(gstnrs or ['0000']): a = area - gst_area * (len(gstnrs) - 1) if i == 0 else gst_area rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None - f_fb.write(format_row(vnr, kgnr, gstnr, rdnr, a, fb['SNR'], fb['SANR'], CULTIVATION_MAP[fb['BANR']])) + f_fb.write(csv.format_row( + vnr, kgnr, gstnr, rdnr, a, fb['SNR'], fb['SANR'], CULTIVATION_MAP[fb['BANR']] + )) if __name__ == '__main__': parser = argparse.ArgumentParser() - parser.add_argument('in_dir') - parser.add_argument('out_dir') - parser.add_argument('-q', '--quiet', action='store_true', default=False) - parser.add_argument('-d', '--database', required=True) + parser.add_argument('in_dir', type=str, + help='The input directory where the exported csv files are stored') + parser.add_argument('out_dir', type=str, + help='The output directory where the migrated csv file should be stored') + parser.add_argument('-q', '--quiet', action='store_true', default=False, + help='Be less verbose') + parser.add_argument('-d', '--database', metavar='DB', required=True, + help='The sqlite database file to look up information') args = parser.parse_args() os.makedirs(args.out_dir, exist_ok=True)