From 6311b1917786d3a8fbbdc2d8eab2272cb249a194 Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Sat, 8 Apr 2023 18:02:15 +0200 Subject: [PATCH] Check foreign key constraints on inport in import.py --- wgmaster/import.py | 28 ++++++++++++++++++++++++++++ wgmaster/migrate.py | 10 ++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/wgmaster/import.py b/wgmaster/import.py index 0298f63..775039d 100755 --- a/wgmaster/import.py +++ b/wgmaster/import.py @@ -59,6 +59,32 @@ def import_csv(cur: sqlite3.Cursor, table_name: str) -> None: cur.close() +def check_foreign_keys(cur: sqlite3.Cursor) -> bool: + cur.execute("PRAGMA foreign_key_check") + rows = cur.fetchall() + table_names = {r[0] for r in rows} + tables = {} + for n in table_names: + cur.execute(f"PRAGMA foreign_key_list({n})") + keys = cur.fetchall() + tables[n] = {k[0]: k for k in keys} + + cases = {} + for row in rows: + fk = tables[row[0]][row[3]] + cur.execute(f"SELECT {fk[3]} FROM {row[0]} WHERE _ROWID_ = ?", (row[1],)) + value = cur.fetchall() + string = f'{row[0]}({fk[3]}) -> {fk[2]}({fk[4]}) - {value[0][0]}' + if string not in cases: + cases[string] = 0 + cases[string] += 1 + for case, n in cases.items(): + print(case + (f' ({n} times)' if n > 1 else '')) + + cur.close() + return len(rows) == 0 + + if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('dir', type=str, metavar='DIR', @@ -94,6 +120,8 @@ if __name__ == '__main__': DB_CNX.execute("BEGIN") for table in TABLES: import_csv(DB_CNX.cursor(), table) + if not check_foreign_keys(DB_CNX.cursor()): + raise RuntimeError('foreign key constraint failed') DB_CNX.execute("COMMIT") except Exception as err: DB_CNX.execute("ROLLBACK") diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index e270221..34ed31c 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -89,6 +89,11 @@ def invalid(mgnr: int, key: str, value) -> None: print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr) +def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None: + if not args.quiet: + print(f'\x1B[1m{lsnr_1:<14} -> {lsnr_2:<14}\x1B[0m') + + def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None: print(f'\x1B[1;33m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr) @@ -704,7 +709,6 @@ def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, if date.year == lsdate.year: lsnr_n = get_lsnr(date, lsnr) if lsnr_n not in lsnrs: - print(f'{lsnr} -> {lsnr_n}') lsnr = lsnr_n else: warning_delivery(lsnr, mgnr, 'date', date) @@ -764,10 +768,12 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None: for dpnr, linr in enumerate(linrs, start=1): d = delivery_dict[linr] delivery_map[linr] = (date.year, snr, dpnr) + if lsnr != d['Lieferscheinnummer']: + renumber_delivery(d['Lieferscheinnummer'], lsnr) oe = d['OechsleOriginal'] or d['Oechsle'] kmw = GRADATION_MAP[oe] - sortid, attrid = d['SNR'], d['SANR'] + sortid, attrid = d['SNR'].upper(), d['SANR'] or None if len(sortid) != 2: attrid = sortid[-1] sortid = sortid[:2]