export.py: Extract export function
This commit is contained in:
@ -48,9 +48,9 @@ def get_accdb_info(filename: str) -> Tuple[str, datetime.datetime, datetime.date
|
|||||||
stat = os.stat(filename)
|
stat = os.stat(filename)
|
||||||
with open(filename, 'rb', buffering=0) as f:
|
with open(filename, 'rb', buffering=0) as f:
|
||||||
filehash = hashlib.file_digest(f, 'md5').hexdigest()
|
filehash = hashlib.file_digest(f, 'md5').hexdigest()
|
||||||
return filehash,\
|
return filehash, \
|
||||||
datetime.datetime.fromtimestamp(stat.st_ctime),\
|
datetime.datetime.fromtimestamp(stat.st_ctime), \
|
||||||
datetime.datetime.fromtimestamp(stat.st_mtime),\
|
datetime.datetime.fromtimestamp(stat.st_mtime), \
|
||||||
stat.st_size
|
stat.st_size
|
||||||
|
|
||||||
|
|
||||||
@ -103,23 +103,13 @@ def prompt() -> str:
|
|||||||
return file_map[choice]['locations'][0]
|
return file_map[choice]['locations'][0]
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def export(file: str, output: str) -> None:
|
||||||
parser = argparse.ArgumentParser()
|
print(f'Opening {file}...', flush=True)
|
||||||
parser.add_argument('-o', '--output', default='tables')
|
|
||||||
parser.add_argument('-f', '--file', metavar='WGDATEN', required=False)
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
os.makedirs(args.output, exist_ok=True)
|
|
||||||
|
|
||||||
if not args.file:
|
|
||||||
args.file = prompt()
|
|
||||||
|
|
||||||
print(f'Opening {args.file}...', flush=True)
|
|
||||||
|
|
||||||
pypyodbc.lowercase = False
|
pypyodbc.lowercase = False
|
||||||
cnx = pypyodbc.connect(f"Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};Dbq={args.file};")
|
cnx = pypyodbc.connect(f"Driver={{Microsoft Access Driver (*.mdb, *.accdb)}};Dbq={file};")
|
||||||
cur = cnx.cursor()
|
cur = cnx.cursor()
|
||||||
print(f'Opened {args.file}!', flush=True)
|
print(f'Opened {file}!', flush=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print(f'Fetching tables...', flush=True)
|
print(f'Fetching tables...', flush=True)
|
||||||
@ -138,7 +128,7 @@ def main() -> None:
|
|||||||
cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;")
|
cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;")
|
||||||
cols = [t[0] for t in cur.description]
|
cols = [t[0] for t in cur.description]
|
||||||
|
|
||||||
with utils.csv_open(f'{args.output}/{t_name}.csv') as f:
|
with utils.csv_open(f'{output}/{t_name}.csv') as f:
|
||||||
f.header(*cols)
|
f.header(*cols)
|
||||||
for row in cur:
|
for row in cur:
|
||||||
values = (utils.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row))
|
values = (utils.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row))
|
||||||
@ -150,5 +140,17 @@ def main() -> None:
|
|||||||
cnx.close()
|
cnx.close()
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
parser = argparse.ArgumentParser()
|
||||||
|
parser.add_argument('-o', '--output', default='tables')
|
||||||
|
parser.add_argument('-f', '--file', metavar='WGDATEN', required=False)
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
os.makedirs(args.output, exist_ok=True)
|
||||||
|
if not args.file:
|
||||||
|
args.file = prompt()
|
||||||
|
export(args.file, args.output)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
Reference in New Issue
Block a user