From a04bd162a5ee7af6f9023e30d1b9b8b3d86dbf4b Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Mon, 10 Apr 2023 01:10:19 +0200 Subject: [PATCH] Refactor python files --- wgmaster/csv.py | 28 ++++++++++++++++------------ wgmaster/export.py | 29 ++++++++--------------------- wgmaster/import.py | 38 ++++++++++++++++++++++++-------------- wgmaster/migrate.py | 18 +++++++++++++----- 4 files changed, 61 insertions(+), 52 deletions(-) diff --git a/wgmaster/csv.py b/wgmaster/csv.py index cea487e..ba2307a 100644 --- a/wgmaster/csv.py +++ b/wgmaster/csv.py @@ -30,6 +30,21 @@ def cast_value(value: str) -> Any: raise RuntimeError(f'unable to infer type of value "{value}"') +def convert_value(value: Any, table: str = None, column: str = None) -> str: + if value is None: + return '' + if type(value) == str: + return f'"{value}"' + elif type(value) == bool: + return 'T' if value else 'F' + elif type(value) == datetime.datetime and table is not None and column is not None: + if value.year == 1899 and value.month == 12 and value.day == 30: + return value.strftime('%H:%M:%S') + elif value.hour == 0 and value.minute == 0 and value.second == 0: + return value.strftime('%Y-%m-%d') + return str(value) + + def parse_line(line_str: str) -> Iterator[str]: w = None s = False @@ -70,15 +85,4 @@ def parse_dict(filename: str) -> Iterator[Dict[str, Any]]: def format_row(*values) -> str: - row = '' - for val in values: - if val is None: - pass - elif type(val) == str: - row += f'"{val}"' - elif type(val) == bool: - row += 'T' if val else 'F' - else: - row += str(val) - row += ';' - return f'{row[:-1]}\n' + return ';'.join([convert_value(v) for v in values]) + '\n' diff --git a/wgmaster/export.py b/wgmaster/export.py index 9146423..1129114 100644 --- a/wgmaster/export.py +++ b/wgmaster/export.py @@ -1,32 +1,14 @@ #!/bin/env python3 # -*- coding: utf-8 -*- -from typing import Any import argparse -import datetime import os import pypyodbc - -def convert(tbl: str, name: str, a: Any) -> str: - if type(a) == str: - return f'"{a}"' - elif type(a) == bool: - return "T" if a else "F" - elif a is None: - return "" - elif type(a) == datetime.datetime: - if a.year == 1899 and a.month == 12 and a.day == 30: - return a.strftime('%H:%M:%S') - elif a.hour == 0 and a.minute == 0 and a.second == 0: - return a.strftime('%Y-%m-%d') - else: - return str(a) - else: - return str(a) +import csv -if __name__ == '__main__': +def main() -> None: parser = argparse.ArgumentParser() parser.add_argument('-o', '--output', default='tables') parser.add_argument('wgdaten', metavar='WGDATEN') @@ -61,9 +43,14 @@ if __name__ == '__main__': with open(f'{args.output}/{t_name}.csv', 'wb+') as f: f.write((';'.join(cols) + '\n').encode('utf-8')) for row in cur: - f.write((';'.join([convert(t_name, n, a) for n, a in zip(cols, row)]) + '\n').encode('utf-8')) + values = [csv.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)] + f.write((';'.join(values) + '\n').encode('utf-8')) print(f'Exported {t_name} successfully!', flush=True) finally: cur.close() cnx.close() + + +if __name__ == '__main__': + main() diff --git a/wgmaster/import.py b/wgmaster/import.py index bb92a0d..8431a19 100755 --- a/wgmaster/import.py +++ b/wgmaster/import.py @@ -11,6 +11,8 @@ import datetime import csv +DIR: str + TABLES = ['branch', 'wb_gl', 'wb_kg', 'wb_rd', 'wine_attribute', 'wine_cultivation', 'member', 'member_billing_address', 'contract', 'area_commitment', 'season', 'modifier', 'delivery', 'delivery_part', 'delivery_part_modifier', ] @@ -44,7 +46,7 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]: def import_csv(cur: sqlite3.Cursor, table_name: str) -> None: - rows = csv.parse(f'{args.dir}/{table_name}.csv') + rows = csv.parse(f'{DIR}/{table_name}.csv') names = next(rows) sql = f'INSERT INTO {table_name} ({", ".join(names)}) VALUES ({", ".join(["?"] * len(names))})' @@ -81,7 +83,9 @@ def check_foreign_keys(cur: sqlite3.Cursor) -> bool: return len(rows) == 0 -if __name__ == '__main__': +def main() -> None: + global DIR + parser = argparse.ArgumentParser() parser.add_argument('dir', type=str, metavar='DIR', help='The directory where the migrated csv files are stored') @@ -91,6 +95,8 @@ if __name__ == '__main__': help='Whether the database file should be overwritten or kept') args = parser.parse_args() + DIR = args.dir + if not args.keep: try: os.remove(args.db) @@ -100,28 +106,32 @@ if __name__ == '__main__': sqlite3.register_adapter(datetime.date, lambda d: d.strftime('%Y-%m-%d')) sqlite3.register_adapter(datetime.time, lambda t: t.strftime('%H:%M:%S')) - DB_CNX = sqlite3.connect(args.db) - DB_CNX.create_function('REGEXP', 2, sqlite_regexp) + cnx = sqlite3.connect(args.db) + cnx.create_function('REGEXP', 2, sqlite_regexp) if not args.keep: for file_name in get_sql_files(): with open(file_name, encoding='utf-8') as sql_file: print(f'Executing {file_name}') - DB_CNX.executescript(sql_file.read()) + cnx.executescript(sql_file.read()) try: - DB_CNX.isolation_level = None + cnx.isolation_level = None # Member predecessors may refer to a higher MgNr - DB_CNX.execute("PRAGMA foreign_keys = OFF") - DB_CNX.execute("BEGIN") + cnx.execute("PRAGMA foreign_keys = OFF") + cnx.execute("BEGIN") for table in TABLES: - import_csv(DB_CNX.cursor(), table) - if not check_foreign_keys(DB_CNX.cursor()): + import_csv(cnx.cursor(), table) + if not check_foreign_keys(cnx.cursor()): raise RuntimeError('foreign key constraint failed') - DB_CNX.execute("COMMIT") + cnx.execute("COMMIT") except Exception as err: - DB_CNX.execute("ROLLBACK") + cnx.execute("ROLLBACK") raise err finally: - DB_CNX.execute("PRAGMA foreign_keys = ON") - DB_CNX.close() + cnx.execute("PRAGMA foreign_keys = ON") + cnx.close() + + +if __name__ == '__main__': + main() diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index db4c8e3..6692c63 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -14,6 +14,7 @@ import csv DB_CNX: Optional[sqlite3.Connection] = None +QUIET: bool = False HKID: Optional[str] = None WG: Optional[str] = None @@ -78,7 +79,7 @@ STREET_NAMES: Dict[str, str] = { def success(mgnr: int, key: str, value) -> None: - if not args.quiet: + if not QUIET: print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) @@ -91,7 +92,7 @@ def invalid(mgnr: int, key: str, value) -> None: def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None: - if not args.quiet: + if not QUIET: print(f'\x1B[1m{lsnr_1:<14} -> {lsnr_2:<14}\x1B[0m') @@ -104,13 +105,13 @@ def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None: def convert(mgnr: int, key: str, old_value: str, new_value) -> None: - if not args.quiet: + if not QUIET: print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr) def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str], billing: Optional[str] = None) -> None: - if not args.quiet: + if not QUIET: print(f'\x1B[1m{mgnr:>6}: ' f'{" / ".join([e or "" for e in old_name])} -> ' f'{" / ".join([e or "" for e in new_name])}' @@ -883,7 +884,9 @@ def migrate_payments(in_dir: str, out_dir: str) -> None: pass # TODO migrate payments -if __name__ == '__main__': +def main() -> None: + global DB_CNX, QUIET, HKID, WG + parser = argparse.ArgumentParser() parser.add_argument('in_dir', type=str, help='The input directory where the exported csv files are stored') @@ -902,6 +905,7 @@ if __name__ == '__main__': os.makedirs(args.out_dir, exist_ok=True) + QUIET = args.quiet HKID = args.origin WG = args.genossenschaft @@ -920,3 +924,7 @@ if __name__ == '__main__': migrate_payments(args.in_dir, args.out_dir) DB_CNX.close() + + +if __name__ == '__main__': + main()