Search for accdb files in export.py
This commit is contained in:
@ -1,27 +1,120 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from typing import Tuple, List, Generator, Dict, Any
|
||||||
import argparse
|
import argparse
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
import string
|
||||||
|
import datetime
|
||||||
|
import hashlib
|
||||||
import pypyodbc
|
import pypyodbc
|
||||||
|
|
||||||
import csv
|
import csv
|
||||||
|
|
||||||
|
|
||||||
|
IGNORED_PATHS = ['C:\\Windows']
|
||||||
|
IGNORED_NAMES = ['Program Files', 'Program Files (x86)', 'AppData']
|
||||||
|
|
||||||
|
|
||||||
|
def get_drives() -> List[str]:
|
||||||
|
return [d for d in string.ascii_uppercase if os.path.exists(f'{d}:')]
|
||||||
|
|
||||||
|
|
||||||
|
def search_accdb(base_path: str = None) -> Generator[str, None, None]:
|
||||||
|
if base_path is None:
|
||||||
|
for drive in get_drives():
|
||||||
|
yield from search_accdb(f'{drive}:')
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
for entry in os.scandir(f'{base_path}\\'):
|
||||||
|
path = f'{base_path}\\{entry.name}'
|
||||||
|
if path in IGNORED_PATHS or entry.name in IGNORED_NAMES:
|
||||||
|
continue
|
||||||
|
if entry.is_file() and entry.name.lower().endswith('.accdb'):
|
||||||
|
yield path
|
||||||
|
elif entry.is_dir() and not entry.is_symlink():
|
||||||
|
yield from search_accdb(path)
|
||||||
|
except PermissionError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def get_accdb_info(filename: str) -> Tuple[str, datetime.datetime, datetime.datetime, int]:
|
||||||
|
stat = os.stat(filename)
|
||||||
|
with open(filename, 'rb', buffering=0) as f:
|
||||||
|
filehash = hashlib.file_digest(f, 'md5').hexdigest()
|
||||||
|
return filehash,\
|
||||||
|
datetime.datetime.fromtimestamp(stat.st_ctime),\
|
||||||
|
datetime.datetime.fromtimestamp(stat.st_mtime),\
|
||||||
|
stat.st_size
|
||||||
|
|
||||||
|
|
||||||
|
def get_accdb_files() -> Dict[str, Dict[str, Any]]:
|
||||||
|
files = {}
|
||||||
|
for filename in search_accdb():
|
||||||
|
filehash, ctime, mtime, size = get_accdb_info(filename)
|
||||||
|
if filehash not in files:
|
||||||
|
files[filehash] = {'hash': filehash, 'size': size, 'locations': [], 'names': set(),
|
||||||
|
'ctime': ctime, 'mtime': mtime}
|
||||||
|
fh = files[filehash]
|
||||||
|
fh['locations'].append(filename)
|
||||||
|
fh['names'].add(filename.split('\\')[-1].lower())
|
||||||
|
if mtime > fh['mtime']:
|
||||||
|
fh['mtime'] = mtime
|
||||||
|
if ctime < fh['ctime']:
|
||||||
|
fh['ctime'] = ctime
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def prompt() -> str:
|
||||||
|
print('Searching for accdb files...', flush=True)
|
||||||
|
files = get_accdb_files()
|
||||||
|
file_map = {}
|
||||||
|
n = 0
|
||||||
|
for info in sorted([f for f in files.values()], key=lambda f: f['mtime'], reverse=True):
|
||||||
|
name = tuple(info['names'])[0][:-6]
|
||||||
|
if name != 'wgdaten':
|
||||||
|
continue
|
||||||
|
n += 1
|
||||||
|
file_map[n] = info
|
||||||
|
locs = info['locations']
|
||||||
|
print(f'{n:>2}: {name:<24} {locs[0]}')
|
||||||
|
print(f' Created: {info["ctime"].date().isoformat()} {locs[1] if len(locs) > 1 else ""}')
|
||||||
|
print(f' Modified: {info["mtime"].date().isoformat()} {locs[2] if len(locs) > 2 else ""}')
|
||||||
|
for loc in locs[3:]:
|
||||||
|
print(' ' * 29 + loc)
|
||||||
|
print()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
text = input('Choose accdb file (default=1): ')
|
||||||
|
if text == '':
|
||||||
|
choice = 1
|
||||||
|
else:
|
||||||
|
choice = int(text)
|
||||||
|
if 1 <= choice <= n:
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
return file_map[choice]['locations'][0]
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument('-o', '--output', default='tables')
|
parser.add_argument('-o', '--output', default='tables')
|
||||||
parser.add_argument('wgdaten', metavar='WGDATEN')
|
parser.add_argument('-f', '--file', metavar='WGDATEN', required=False)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
os.makedirs(args.output, exist_ok=True)
|
os.makedirs(args.output, exist_ok=True)
|
||||||
|
|
||||||
print(f'Opening {args.wgdaten}...', flush=True)
|
if not args.file:
|
||||||
|
args.file = prompt()
|
||||||
|
|
||||||
|
print(f'Opening {args.file}...', flush=True)
|
||||||
|
|
||||||
pypyodbc.lowercase = False
|
pypyodbc.lowercase = False
|
||||||
cnx = pypyodbc.connect(f"Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};Dbq={args.wgdaten};")
|
cnx = pypyodbc.connect(f"Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};Dbq={args.file};")
|
||||||
cur = cnx.cursor()
|
cur = cnx.cursor()
|
||||||
print(f'Opened {args.wgdaten}!', flush=True)
|
print(f'Opened {args.file}!', flush=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print(f'Fetching tables...', flush=True)
|
print(f'Fetching tables...', flush=True)
|
||||||
|
Reference in New Issue
Block a user