Refactor python files

This commit is contained in:
2023-04-10 01:10:19 +02:00
parent 9ce344763b
commit a04bd162a5
4 changed files with 61 additions and 52 deletions

View File

@ -30,6 +30,21 @@ def cast_value(value: str) -> Any:
raise RuntimeError(f'unable to infer type of value "{value}"')
def convert_value(value: Any, table: str = None, column: str = None) -> str:
if value is None:
return ''
if type(value) == str:
return f'"{value}"'
elif type(value) == bool:
return 'T' if value else 'F'
elif type(value) == datetime.datetime and table is not None and column is not None:
if value.year == 1899 and value.month == 12 and value.day == 30:
return value.strftime('%H:%M:%S')
elif value.hour == 0 and value.minute == 0 and value.second == 0:
return value.strftime('%Y-%m-%d')
return str(value)
def parse_line(line_str: str) -> Iterator[str]:
w = None
s = False
@ -70,15 +85,4 @@ def parse_dict(filename: str) -> Iterator[Dict[str, Any]]:
def format_row(*values) -> str:
row = ''
for val in values:
if val is None:
pass
elif type(val) == str:
row += f'"{val}"'
elif type(val) == bool:
row += 'T' if val else 'F'
else:
row += str(val)
row += ';'
return f'{row[:-1]}\n'
return ';'.join([convert_value(v) for v in values]) + '\n'

View File

@ -1,32 +1,14 @@
#!/bin/env python3
# -*- coding: utf-8 -*-
from typing import Any
import argparse
import datetime
import os
import pypyodbc
def convert(tbl: str, name: str, a: Any) -> str:
if type(a) == str:
return f'"{a}"'
elif type(a) == bool:
return "T" if a else "F"
elif a is None:
return ""
elif type(a) == datetime.datetime:
if a.year == 1899 and a.month == 12 and a.day == 30:
return a.strftime('%H:%M:%S')
elif a.hour == 0 and a.minute == 0 and a.second == 0:
return a.strftime('%Y-%m-%d')
else:
return str(a)
else:
return str(a)
import csv
if __name__ == '__main__':
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output', default='tables')
parser.add_argument('wgdaten', metavar='WGDATEN')
@ -61,9 +43,14 @@ if __name__ == '__main__':
with open(f'{args.output}/{t_name}.csv', 'wb+') as f:
f.write((';'.join(cols) + '\n').encode('utf-8'))
for row in cur:
f.write((';'.join([convert(t_name, n, a) for n, a in zip(cols, row)]) + '\n').encode('utf-8'))
values = [csv.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)]
f.write((';'.join(values) + '\n').encode('utf-8'))
print(f'Exported {t_name} successfully!', flush=True)
finally:
cur.close()
cnx.close()
if __name__ == '__main__':
main()

View File

@ -11,6 +11,8 @@ import datetime
import csv
DIR: str
TABLES = ['branch', 'wb_gl', 'wb_kg', 'wb_rd', 'wine_attribute', 'wine_cultivation',
'member', 'member_billing_address', 'contract', 'area_commitment',
'season', 'modifier', 'delivery', 'delivery_part', 'delivery_part_modifier', ]
@ -44,7 +46,7 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]:
def import_csv(cur: sqlite3.Cursor, table_name: str) -> None:
rows = csv.parse(f'{args.dir}/{table_name}.csv')
rows = csv.parse(f'{DIR}/{table_name}.csv')
names = next(rows)
sql = f'INSERT INTO {table_name} ({", ".join(names)}) VALUES ({", ".join(["?"] * len(names))})'
@ -81,7 +83,9 @@ def check_foreign_keys(cur: sqlite3.Cursor) -> bool:
return len(rows) == 0
if __name__ == '__main__':
def main() -> None:
global DIR
parser = argparse.ArgumentParser()
parser.add_argument('dir', type=str, metavar='DIR',
help='The directory where the migrated csv files are stored')
@ -91,6 +95,8 @@ if __name__ == '__main__':
help='Whether the database file should be overwritten or kept')
args = parser.parse_args()
DIR = args.dir
if not args.keep:
try:
os.remove(args.db)
@ -100,28 +106,32 @@ if __name__ == '__main__':
sqlite3.register_adapter(datetime.date, lambda d: d.strftime('%Y-%m-%d'))
sqlite3.register_adapter(datetime.time, lambda t: t.strftime('%H:%M:%S'))
DB_CNX = sqlite3.connect(args.db)
DB_CNX.create_function('REGEXP', 2, sqlite_regexp)
cnx = sqlite3.connect(args.db)
cnx.create_function('REGEXP', 2, sqlite_regexp)
if not args.keep:
for file_name in get_sql_files():
with open(file_name, encoding='utf-8') as sql_file:
print(f'Executing {file_name}')
DB_CNX.executescript(sql_file.read())
cnx.executescript(sql_file.read())
try:
DB_CNX.isolation_level = None
cnx.isolation_level = None
# Member predecessors may refer to a higher MgNr
DB_CNX.execute("PRAGMA foreign_keys = OFF")
DB_CNX.execute("BEGIN")
cnx.execute("PRAGMA foreign_keys = OFF")
cnx.execute("BEGIN")
for table in TABLES:
import_csv(DB_CNX.cursor(), table)
if not check_foreign_keys(DB_CNX.cursor()):
import_csv(cnx.cursor(), table)
if not check_foreign_keys(cnx.cursor()):
raise RuntimeError('foreign key constraint failed')
DB_CNX.execute("COMMIT")
cnx.execute("COMMIT")
except Exception as err:
DB_CNX.execute("ROLLBACK")
cnx.execute("ROLLBACK")
raise err
finally:
DB_CNX.execute("PRAGMA foreign_keys = ON")
DB_CNX.close()
cnx.execute("PRAGMA foreign_keys = ON")
cnx.close()
if __name__ == '__main__':
main()

View File

@ -14,6 +14,7 @@ import csv
DB_CNX: Optional[sqlite3.Connection] = None
QUIET: bool = False
HKID: Optional[str] = None
WG: Optional[str] = None
@ -78,7 +79,7 @@ STREET_NAMES: Dict[str, str] = {
def success(mgnr: int, key: str, value) -> None:
if not args.quiet:
if not QUIET:
print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
@ -91,7 +92,7 @@ def invalid(mgnr: int, key: str, value) -> None:
def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None:
if not args.quiet:
if not QUIET:
print(f'\x1B[1m{lsnr_1:<14} -> {lsnr_2:<14}\x1B[0m')
@ -104,13 +105,13 @@ def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
def convert(mgnr: int, key: str, old_value: str, new_value) -> None:
if not args.quiet:
if not QUIET:
print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str],
billing: Optional[str] = None) -> None:
if not args.quiet:
if not QUIET:
print(f'\x1B[1m{mgnr:>6}: '
f'{" / ".join([e or "" for e in old_name])} -> '
f'{" / ".join([e or "" for e in new_name])}'
@ -883,7 +884,9 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
pass # TODO migrate payments
if __name__ == '__main__':
def main() -> None:
global DB_CNX, QUIET, HKID, WG
parser = argparse.ArgumentParser()
parser.add_argument('in_dir', type=str,
help='The input directory where the exported csv files are stored')
@ -902,6 +905,7 @@ if __name__ == '__main__':
os.makedirs(args.out_dir, exist_ok=True)
QUIET = args.quiet
HKID = args.origin
WG = args.genossenschaft
@ -920,3 +924,7 @@ if __name__ == '__main__':
migrate_payments(args.in_dir, args.out_dir)
DB_CNX.close()
if __name__ == '__main__':
main()