diff --git a/wgmaster/csv.py b/wgmaster/csv.py deleted file mode 100644 index 151c37f..0000000 --- a/wgmaster/csv.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -from typing import Iterator, Dict, Any, Tuple -import re -import datetime - -RE_INT = re.compile(r'-?[0-9]+') -RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+') -RE_STR_START = re.compile(r'.*;"[^"]*$') -RE_STR_END = re.compile(r'^[^"]*";.*') - - -def cast_value(value: str) -> Any: - if value == '': - return None - elif value[0] == '"' and value[-1] == '"': - return value[1:-1] - elif value == 'T': - return True - elif value == 'F': - return False - elif RE_INT.fullmatch(value): - return int(value) - elif RE_FLOAT.fullmatch(value): - return float(value) - elif len(value) == 10 and value[4] == '-' and value[7] == '-': - return datetime.datetime.strptime(value, '%Y-%m-%d').date() - elif len(value) == 8 and value[2] == ':' and value[5] == ':': - return datetime.time.fromisoformat(value) - else: - raise RuntimeError(f'unable to infer type of value "{value}"') - - -def convert_value(value: Any, table: str = None, column: str = None) -> str: - if value is None: - return '' - if type(value) == str: - return f'"{value}"' - elif type(value) == bool: - return 'T' if value else 'F' - elif type(value) == datetime.datetime and table is not None and column is not None: - if value.year == 1899 and value.month == 12 and value.day == 30: - return value.strftime('%H:%M:%S') - elif value.hour == 0 and value.minute == 0 and value.second == 0: - return value.strftime('%Y-%m-%d') - return str(value) - - -def parse_line(line_str: str) -> Iterator[str]: - w = None - s = False - for ch in line_str: - if w is None: - if ch == ';': - yield '' - continue - elif ch in (' ', '\t'): - continue - w = ch - s = ch == '"' - continue - elif not s and ch in (';', '\n'): - yield w.strip() - w = None - continue - elif s and ch == '"': - s = False - w += ch - if w is not None: - yield w.strip() - - -def parse(filename: str) -> Iterator[Tuple]: - with open(filename, 'r', encoding='utf-8') as f: - lines = f.__iter__() - yield tuple([part.strip() for part in next(lines).split(';')]) - in_str = False - for cur_line in lines: - if in_str: - line += cur_line - if not RE_STR_END.match(cur_line): - continue - in_str = False - else: - line = cur_line - if RE_STR_START.match(cur_line): - in_str = True - continue - yield tuple([cast_value(part) for part in parse_line(line)]) - - -def parse_dict(filename: str) -> Iterator[Dict[str, Any]]: - rows = parse(filename) - header = next(rows) - for row in rows: - yield {header[i]: part for i, part in enumerate(row)} - - -def format_row(*values) -> str: - return ';'.join([convert_value(v) for v in values]) + '\n' diff --git a/wgmaster/export.py b/wgmaster/export.py index 19ee1cf..a66fdd2 100644 --- a/wgmaster/export.py +++ b/wgmaster/export.py @@ -11,7 +11,7 @@ import ctypes.wintypes import hashlib import pypyodbc -import csv +import utils IGNORED_NAMES = ['Windows', 'Program Files', 'Program Files (x86)', 'AppData'] @@ -138,11 +138,10 @@ def main() -> None: cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;") cols = [t[0] for t in cur.description] - with open(f'{args.output}/{t_name}.csv', 'wb+') as f: - f.write((';'.join(cols) + '\n').encode('utf-8')) + with utils.csv_open(f'{args.output}/{t_name}.csv') as f: + f.header(cols) for row in cur: - values = [csv.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)] - f.write((';'.join(values) + '\n').encode('utf-8')) + f.row((utils.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)), raw=True) print(f'Exported {t_name} successfully!', flush=True) finally: diff --git a/wgmaster/import.py b/wgmaster/import.py index 149e9dc..48ad95e 100755 --- a/wgmaster/import.py +++ b/wgmaster/import.py @@ -8,7 +8,7 @@ import os import re import datetime -import csv +import utils DIR: str @@ -46,7 +46,7 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]: def import_csv(cur: sqlite3.Cursor, table_name: str) -> None: - rows = csv.parse(f'{DIR}/{table_name}.csv') + rows = utils.csv_parse(f'{DIR}/{table_name}.csv') names = next(rows) sql = f'INSERT INTO {table_name} ({", ".join(names)}) VALUES ({", ".join(["?"] * len(names))})' diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index d097a75..ca2bc4f 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -10,7 +10,7 @@ import sqlite3 import requests import datetime -import csv +import utils DB_CNX: Optional[sqlite3.Connection] = None @@ -170,9 +170,8 @@ def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]: def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: - fbs = csv.parse_dict(f'{in_dir}/TFlaechenbindungen.csv') members = {} - for f in fbs: + for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if f['MGNR'] not in members: members[f['MGNR']] = {} members[f['MGNR']][f['FBNR']] = f @@ -260,7 +259,7 @@ def lookup_kg_name(kgnr: int) -> str: def migrate_gradation(in_dir: str, out_dir: str) -> None: global GRADATION_MAP GRADATION_MAP = {} - for g in csv.parse_dict(f'{in_dir}/TUmrechnung.csv'): + for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'): GRADATION_MAP[g['Oechsle']] = g['KW'] @@ -268,13 +267,13 @@ def migrate_branches(in_dir: str, out_dir: str) -> None: global BRANCH_MAP BRANCH_MAP = {} - with open(f'{out_dir}/branch.csv', 'w+', encoding='utf-8') as f: - f.write('zwstid;name;country;postal_dest;address;phone_nr\n') - for b in csv.parse_dict(f'{in_dir}/TZweigstellen.csv'): + with utils.csv_open(f'{out_dir}/branch.csv') as f: + f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr') + for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'): BRANCH_MAP[b['ZNR']] = b['Kennbst'] address = b['Straße'] postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) - f.write(csv.format_row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon'])) + f.row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon']) def migrate_grosslagen(in_dir: str, out_dir: str) -> None: @@ -282,34 +281,34 @@ def migrate_grosslagen(in_dir: str, out_dir: str) -> None: GROSSLAGE_MAP = {} glnr = 0 - with open(f'{out_dir}/wb_gl.csv', 'w+', encoding='utf-8') as f: - f.write('glnr;name\n') - for gl in csv.parse_dict(f'{in_dir}/TGrosslagen.csv'): + with utils.csv_open(f'{out_dir}/wb_gl.csv') as f: + f.header('glnr', 'name') + for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'): glnr += 1 GROSSLAGE_MAP[gl['GLNR']] = glnr - f.write(csv.format_row(glnr, gl['Bezeichnung'])) + f.row(glnr, gl['Bezeichnung']) def migrate_gemeinden(in_dir: str, out_dir: str) -> None: global GEM_MAP GEM_MAP = {} - with open(f'{out_dir}/wb_kg.csv', 'w+', encoding='utf-8') as f: - f.write('kgnr;glnr\n') - for g in csv.parse_dict(f'{in_dir}/TGemeinden.csv'): + with utils.csv_open(f'{out_dir}/wb_kg.csv') as f: + f.header('kgnr', 'glnr') + for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'): gems = lookup_gem_name(g['Bezeichnung']) GEM_MAP[g['GNR']] = gems for kgnr, gkz in gems: - f.write(csv.format_row(kgnr, GROSSLAGE_MAP[g['GLNR']])) + f.row(kgnr, GROSSLAGE_MAP[g['GLNR']]) def migrate_reeds(in_dir: str, out_dir: str) -> None: global REED_MAP REED_MAP = {} - with open(f'{out_dir}/wb_rd.csv', 'w+', encoding='utf-8') as f: - f.write('kgnr;rdnr;name\n') - for r in csv.parse_dict(f'{in_dir}/TRiede.csv'): + with utils.csv_open(f'{out_dir}/wb_rd.csv') as f: + f.header('kgnr', 'rdnr', 'name') + for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'): name: str = r['Bezeichnung'].strip() if name.isupper(): name = name.title() @@ -321,26 +320,26 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None: rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1 REED_MAP[r['RNR']] = (kgnr, rdnr) - f.write(csv.format_row(kgnr, rdnr, name)) + f.row(kgnr, rdnr, name) def migrate_attributes(in_dir: str, out_dir: str) -> None: - with open(f'{out_dir}/wine_attribute.csv', 'w+', encoding='utf-8') as f: - f.write('attrid;name;kg_per_ha\n') - for a in csv.parse_dict(f'{in_dir}/TSortenAttribute.csv'): - f.write(csv.format_row(a['SANR'], a['Attribut'], int(a['KgProHa']))) + with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f: + f.header('attrid', 'name', 'kg_per_ha') + for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'): + f.row(a['SANR'], a['Attribut'], int(a['KgProHa'])) if WG == 'MATZEN': - f.write(csv.format_row('M', 'Matzen', 10000)) - f.write(csv.format_row('HU', 'Huber', 10000)) + f.row('M', 'Matzen', 10000) + f.row('HU', 'Huber', 10000) def migrate_cultivations(in_dir: str, out_dir: str) -> None: global CULTIVATION_MAP CULTIVATION_MAP = {} - with open(f'{out_dir}/wine_cultivation.csv', 'w+', encoding='utf-8') as f: - f.write('cultid;name\n') - for c in csv.parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): + with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f: + f.header('cultid', 'name') + for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): name: str = c['Bezeichnung'] cultid = name[0].upper() if name.isupper(): @@ -348,27 +347,28 @@ def migrate_cultivations(in_dir: str, out_dir: str) -> None: elif 'biolog' in name.lower(): cultid = 'BIO' CULTIVATION_MAP[c['BANR']] = cultid - f.write(csv.format_row(cultid, name)) + f.row(cultid, name) def migrate_members(in_dir: str, out_dir: str) -> None: global MEMBER_MAP MEMBER_MAP = {} - members = [m for m in csv.parse_dict(f'{in_dir}/TMitglieder.csv')] + members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')] mgnrs = [m['MGNR'] for m in members] fbs = parse_flaechenbindungen(in_dir) - with open(f'{out_dir}/member.csv', 'w+', encoding='utf-8') as f_m,\ - open(f'{out_dir}/member_billing_address.csv', 'w+', encoding='utf-8') as f_mba,\ - open(f'{out_dir}/wb_kg.csv', 'a', encoding='utf-8') as f_kg: - f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' - 'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' - 'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' - 'country;postal_dest;address;' - 'email;phone_landline;phone_mobile_1;phone_mobile_2;' - 'default_kgnr;comment\n') - f_mba.write('mgnr;name;country;postal_dest;address\n') + with utils.csv_open(f'{out_dir}/member.csv') as f_m,\ + utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba,\ + utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg: + f_m.header( + 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix', + 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid', + 'lfbis_nr', 'ustid', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic', + 'country', 'postal_dest', 'address', + 'email', 'phone_landline', 'phone_mobile_1', 'phone_mobile_2', + 'default_kgnr', 'comment') + f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address') for m in members: mgnr: int = m['MGNR'] @@ -573,7 +573,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: glnr = list(GROSSLAGE_MAP.values())[0] print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})') - f_kg.write(csv.format_row(kgnr, glnr)) + f_kg.row(kgnr, glnr) if 9999 not in GEM_MAP: GEM_MAP[9999] = [] GEM_MAP[9999].append((kgnr, 0)) @@ -583,7 +583,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None: continue pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None - f_m.write(csv.format_row( + f_m.row( mgnr, pred, prefix, given_name, middle_names, family_name, suffix, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['BHKontonummer'], zwstid, bnr, ustid, @@ -591,12 +591,12 @@ def migrate_members(in_dir: str, out_dir: str) -> None: iban, bic, 'AT', postal_dest, address or '-', email, phone_landline, phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None, kgnr, m['Anmerkung'] - )) + ) MEMBER_MAP[mgnr] = { 'default_kgnr': kgnr } if billing_name: - f_mba.write(csv.format_row(mgnr, billing_name, 'AT', postal_dest, address or '-')) + f_mba.row(mgnr, billing_name, 'AT', postal_dest, address or '-') def migrate_area_commitments(in_dir: str, out_dir: str) -> None: @@ -692,12 +692,13 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text) return text - with open(f'{out_dir}/area_commitment.csv', 'w+', encoding='utf-8') as f_fb, \ - open(f'{out_dir}/area_commitment_attribute.csv', 'w+', encoding='utf-8') as f_attr: - f_fb.write('fbnr;mgnr;sortid;cultid;area;kgnr;gstnr;rdnr;year_from;year_to;comment\n') - f_attr.write('fbnr;attrid\n') + with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \ + utils.csv_open(f'{out_dir}/area_commitment_attribute.csv',) as f_attr: + f_fb.header('fbnr', 'mgnr', 'sortid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr', + 'year_from', 'year_to', 'comment') + f_attr.header('fbnr', 'attrid') - for fb in csv.parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): + for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): if fb['Von'] is None and fb['Bis'] is None: continue parz: str = fb['Parzellennummer'] @@ -723,10 +724,10 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None: rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None - f_fb.write(csv.format_row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area, - kgnr, gstnr, rdnr, fb['Von'], to, comment)) + f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area, + kgnr, gstnr, rdnr, fb['Von'], to, comment) if fb['SANR']: - f_attr.write(csv.format_row(fbnr, fb['SANR'])) + f_attr.row(fbnr, fb['SANR']) def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: @@ -782,7 +783,7 @@ def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, def migrate_deliveries(in_dir: str, out_dir: str) -> None: - modifiers = {m['ASNR']: m for m in csv.parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']} + modifiers = {m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']} delivery_map = {} seasons = {} branches = {} @@ -796,17 +797,18 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: else: raise NotImplementedError() - deliveries = list(csv.parse_dict(f'{in_dir}/TLieferungen.csv')) + deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv')) delivery_dict = {d['LINR']: d for d in deliveries} fixed = fix_deliveries(deliveries) - with open(f'{out_dir}/delivery.csv', 'w+', encoding='utf-8') as f_delivery, \ - open(f'{out_dir}/delivery_part.csv', 'w+', encoding='utf-8') as f_part, \ - open(f'{out_dir}/delivery_part_attribute.csv', 'w+', encoding='utf-8') as f_attr: - f_delivery.write('year;did;date;time;zwstid;lnr;lsnr;mgnr;comment\n') - f_part.write('year;did;dpnr;sortid;weight;kmw;qualid;hkid;kgnr;rdnr;gerebelt;manual_weighing;spl_check;' - 'hand_picked;lesewagen;temperature;acid;scale_id;weighing_id;comment\n') - f_attr.write('year;did;dpnr;attrid\n') + with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \ + utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \ + utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr: + f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment') + f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr', + 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen', + 'temperature', 'acid', 'scale_id', 'weighing_id', 'comment') + f_attr.header('year', 'did', 'dpnr', 'attrid') for lsnr, linrs, date in fixed: if date.year not in seasons: @@ -908,37 +910,33 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: if comment: comments.append(comment) - f_part.write(csv.format_row( + f_part.row( date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, QUAL_MAP[d['QSNR']], HKID, kgnr, rdnr, d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False, hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment - )) + ) for attrid in attributes: - f_attr.write(csv.format_row(date.year, snr, dpnr, attrid)) - f_delivery.write(csv.format_row( - date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'], - '; '.join(comments) or None - )) + f_attr.row(date.year, snr, dpnr, attrid) + f_delivery.row(date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'], + '; '.join(comments) or None) - with open(f'{out_dir}/delivery_part_modifier.csv', 'w+', encoding='utf-8') as f_part_mod: - f_part_mod.write('year;did;dpnr;modid\n') - for m in csv.parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): + with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod: + f_part_mod.header('year', 'did', 'dpnr', 'modid') + for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): if m['LINR'] not in delivery_map: continue nid = delivery_map[m['LINR']] - f_part_mod.write(csv.format_row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id'])) + f_part_mod.row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id']) - with open(f'{out_dir}/season.csv', 'w+', encoding='utf-8') as f_season, \ - open(f'{out_dir}/modifier.csv', 'w+', encoding='utf-8') as f_mod: - f_season.write('year;currency;precision;start_date;end_date\n') - f_mod.write('year;modid;name;abs;rel;standard;quick_select\n') + with utils.csv_open(f'{out_dir}/season.csv') as f_season, \ + utils.csv_open(f'{out_dir}/modifier.csv') as f_mod: + f_season.header('year', 'currency', 'precision', 'start_date', 'end_date') + f_mod.header('year', 'modid', 'name', 'abs', 'rel', 'standard', 'quick_select') for y, s in seasons.items(): - f_season.write(csv.format_row(y, s['currency'], s['precision'], s['start'], s['end'])) + f_season.row(y, s['currency'], s['precision'], s['start'], s['end']) for m in modifiers.values(): abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None - f_mod.write(csv.format_row( - y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl'] - )) + f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl']) def migrate_payments(in_dir: str, out_dir: str) -> None: @@ -946,7 +944,7 @@ def migrate_payments(in_dir: str, out_dir: str) -> None: def migrate_parameters(in_dir: str, out_dir: str) -> None: - params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in csv.parse_dict(f'{in_dir}/TParameter.csv')} + params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')} name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und') shortened = name.replace(' für ', ' f. ').replace(' und ', ' u. ') suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '') @@ -974,10 +972,10 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None: 'WEBSITE': params['MANDANTENHOMEPAGE'], } - with open(f'{out_dir}/client_parameter.csv', 'w+', encoding='utf-8') as f: - f.write('param;value\n') + with utils.csv_open(f'{out_dir}/client_parameter.csv') as f: + f.header('param', 'value') for param, value in new_params.items(): - f.write(csv.format_row(param, value)) + f.row(param, value) def main() -> None: diff --git a/wgmaster/utils.py b/wgmaster/utils.py new file mode 100644 index 0000000..572c1d4 --- /dev/null +++ b/wgmaster/utils.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations +from typing import Iterator, Dict, Any, Tuple, TextIO, List +import re +import datetime +import csv + +RE_INT = re.compile(r'-?[0-9]+') +RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+') +RE_STR_START = re.compile(r'.*,"[^"]*$') +RE_STR_END = re.compile(r'^[^"]*",.*') + + +def cast_value(value: str) -> Any: + if value == '': + return None + elif value[0] == '"' and value[-1] == '"': + return value[1:-1] + elif value == 'T': + return True + elif value == 'F': + return False + elif RE_INT.fullmatch(value): + return int(value) + elif RE_FLOAT.fullmatch(value): + return float(value) + elif len(value) == 10 and value[4] == '-' and value[7] == '-': + return datetime.datetime.strptime(value, '%Y-%m-%d').date() + elif len(value) == 8 and value[2] == ':' and value[5] == ':': + return datetime.time.fromisoformat(value) + else: + raise RuntimeError(f'unable to infer type of value "{value}"') + + +def convert_value(value: Any, table: str = None, column: str = None) -> str: + if value is None: + return '' + if type(value) == str: + return '"' + value.replace('"', '""') + '"' + elif type(value) == bool: + return 'T' if value else 'F' + elif type(value) == datetime.datetime and table is not None and column is not None: + if value.year == 1899 and value.month == 12 and value.day == 30: + return value.strftime('%H:%M:%S') + elif value.hour == 0 and value.minute == 0 and value.second == 0: + return value.strftime('%Y-%m-%d') + return str(value) + + +class CsvFile: + file: TextIO + reader: csv.reader + writer: csv.writer + + def __init__(self, file: TextIO): + self.file = file + self.writer = csv.writer(self.file, doublequote=False, quoting=csv.QUOTE_NONE, escapechar=None) + self.reader = csv.reader(self.file, doublequote=False, quoting=csv.QUOTE_NONE, escapechar=None) + + def __enter__(self) -> CsvFile: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + return self.file.__exit__(exc_type, exc_val, exc_tb) + + def __iter__(self) -> Iterator[List[str]]: + return self.reader.__iter__() + + def row(self, *fields, raw: bool = False): + if not raw: + fields = (convert_value(field) for field in fields) + self.writer.writerow(fields) + + def header(self, *headers): + self.writer.writerow(str(h) for h in headers) + + +def csv_open(filename: str, mode: str = 'w+') -> CsvFile: + return CsvFile(open(filename, mode, encoding='utf-8', newline='')) + + +def csv_parse(filename: str) -> Iterator[Tuple]: + with csv_open(filename, 'r') as f: + rows = f.__iter__() + yield tuple([field.strip() for field in next(rows)]) + yield from (tuple(cast_value(field) for field in row) for row in rows) + + +def csv_parse_dict(filename: str) -> Iterator[Dict[str, Any]]: + rows = csv_parse(filename) + header = next(rows) + return ({header[i]: part for i, part in enumerate(row)} for row in rows)