Files
elwig-misc/wgmaster/export.py
2023-05-05 00:03:25 +02:00

155 lines
4.9 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Tuple, List, Generator, Dict, Any
import argparse
import os
import string
import datetime
import ctypes
import ctypes.wintypes
import hashlib
import pypyodbc
import utils
IGNORED_NAMES = ['Windows', 'Program Files', 'Program Files (x86)', 'AppData']
def get_drives() -> List[str]:
# only return local file systems
length = (ctypes.wintypes.DWORD * 1)()
mpr = ctypes.WinDLL('mpr')
return [d for d in string.ascii_uppercase
if os.path.exists(f'{d}:') and mpr.WNetGetConnectionW(f'{d}:', None, length) != 0xEA]
def search_accdb(base_path: str = None) -> Generator[str, None, None]:
if base_path is None:
for drive in get_drives():
yield from search_accdb(f'{drive}:')
return
try:
entries = [e for e in os.scandir(f'{base_path}\\')]
for entry in entries:
path = f'{base_path}\\{entry.name}'
if entry.name in IGNORED_NAMES:
continue
elif entry.name.lower().endswith('.accdb') and entry.is_file():
yield path
elif len(entries) <= 100 and entry.is_dir() and not entry.is_symlink():
yield from search_accdb(path)
except PermissionError:
pass
def get_accdb_info(filename: str) -> Tuple[str, datetime.datetime, datetime.datetime, int]:
stat = os.stat(filename)
with open(filename, 'rb', buffering=0) as f:
filehash = hashlib.file_digest(f, 'md5').hexdigest()
return filehash,\
datetime.datetime.fromtimestamp(stat.st_ctime),\
datetime.datetime.fromtimestamp(stat.st_mtime),\
stat.st_size
def get_accdb_files() -> Dict[str, Dict[str, Any]]:
files = {}
for filename in search_accdb():
filehash, ctime, mtime, size = get_accdb_info(filename)
if filehash not in files:
files[filehash] = {'hash': filehash, 'size': size, 'locations': [], 'names': set(),
'ctime': ctime, 'mtime': mtime}
fh = files[filehash]
fh['locations'].append(filename)
fh['names'].add(filename.split('\\')[-1].lower())
if mtime > fh['mtime']:
fh['mtime'] = mtime
if ctime < fh['ctime']:
fh['ctime'] = ctime
return files
def prompt() -> str:
print('Searching for accdb files...', flush=True)
files = get_accdb_files()
file_map = {}
n = 0
for info in sorted([f for f in files.values()], key=lambda f: f['mtime'], reverse=True):
name = tuple(info['names'])[0][:-6]
if name != 'wgdaten':
continue
n += 1
file_map[n] = info
locs = info['locations']
print(f'{n:>2}: {name:<24} {locs[0]}')
print(f' Created: {info["ctime"].date().isoformat()} {locs[1] if len(locs) > 1 else ""}')
print(f' Modified: {info["mtime"].date().isoformat()} {locs[2] if len(locs) > 2 else ""}')
for loc in locs[3:]:
print(' ' * 29 + loc)
print()
while True:
try:
text = input('Choose accdb file (default=1): ')
if text == '':
choice = 1
else:
choice = int(text)
if 1 <= choice <= n:
break
except ValueError:
pass
return file_map[choice]['locations'][0]
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output', default='tables')
parser.add_argument('-f', '--file', metavar='WGDATEN', required=False)
args = parser.parse_args()
os.makedirs(args.output, exist_ok=True)
if not args.file:
args.file = prompt()
print(f'Opening {args.file}...', flush=True)
pypyodbc.lowercase = False
cnx = pypyodbc.connect(f"Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};Dbq={args.file};")
cur = cnx.cursor()
print(f'Opened {args.file}!', flush=True)
try:
print(f'Fetching tables...', flush=True)
tbls = cur.tables(tableType='TABLE')
tbls = tbls.fetchall()
print(f'Successfully fetched {len(tbls)} tables!', flush=True)
for file, _, t_name, t_type, _ in tbls:
print(f'Exporting {t_name}...', flush=True)
cur.execute(f"SELECT TOP 1 * FROM {t_name};")
desc = [(t[0], t[1]) for t in cur.description]
cur.fetchall()
print(desc, flush=True)
cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;")
cols = [t[0] for t in cur.description]
with utils.csv_open(f'{args.output}/{t_name}.csv') as f:
f.header(*cols)
for row in cur:
values = (utils.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row))
f.row(*values, raw=True)
print(f'Exported {t_name} successfully!', flush=True)
finally:
cur.close()
cnx.close()
if __name__ == '__main__':
main()