Update csv handling

This commit is contained in:
2023-05-04 23:07:06 +02:00
parent 19b2bccae3
commit 3b7a28cc23
5 changed files with 183 additions and 193 deletions

View File

@ -1,101 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Iterator, Dict, Any, Tuple
import re
import datetime
RE_INT = re.compile(r'-?[0-9]+')
RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+')
RE_STR_START = re.compile(r'.*;"[^"]*$')
RE_STR_END = re.compile(r'^[^"]*";.*')
def cast_value(value: str) -> Any:
if value == '':
return None
elif value[0] == '"' and value[-1] == '"':
return value[1:-1]
elif value == 'T':
return True
elif value == 'F':
return False
elif RE_INT.fullmatch(value):
return int(value)
elif RE_FLOAT.fullmatch(value):
return float(value)
elif len(value) == 10 and value[4] == '-' and value[7] == '-':
return datetime.datetime.strptime(value, '%Y-%m-%d').date()
elif len(value) == 8 and value[2] == ':' and value[5] == ':':
return datetime.time.fromisoformat(value)
else:
raise RuntimeError(f'unable to infer type of value "{value}"')
def convert_value(value: Any, table: str = None, column: str = None) -> str:
if value is None:
return ''
if type(value) == str:
return f'"{value}"'
elif type(value) == bool:
return 'T' if value else 'F'
elif type(value) == datetime.datetime and table is not None and column is not None:
if value.year == 1899 and value.month == 12 and value.day == 30:
return value.strftime('%H:%M:%S')
elif value.hour == 0 and value.minute == 0 and value.second == 0:
return value.strftime('%Y-%m-%d')
return str(value)
def parse_line(line_str: str) -> Iterator[str]:
w = None
s = False
for ch in line_str:
if w is None:
if ch == ';':
yield ''
continue
elif ch in (' ', '\t'):
continue
w = ch
s = ch == '"'
continue
elif not s and ch in (';', '\n'):
yield w.strip()
w = None
continue
elif s and ch == '"':
s = False
w += ch
if w is not None:
yield w.strip()
def parse(filename: str) -> Iterator[Tuple]:
with open(filename, 'r', encoding='utf-8') as f:
lines = f.__iter__()
yield tuple([part.strip() for part in next(lines).split(';')])
in_str = False
for cur_line in lines:
if in_str:
line += cur_line
if not RE_STR_END.match(cur_line):
continue
in_str = False
else:
line = cur_line
if RE_STR_START.match(cur_line):
in_str = True
continue
yield tuple([cast_value(part) for part in parse_line(line)])
def parse_dict(filename: str) -> Iterator[Dict[str, Any]]:
rows = parse(filename)
header = next(rows)
for row in rows:
yield {header[i]: part for i, part in enumerate(row)}
def format_row(*values) -> str:
return ';'.join([convert_value(v) for v in values]) + '\n'

View File

@ -11,7 +11,7 @@ import ctypes.wintypes
import hashlib import hashlib
import pypyodbc import pypyodbc
import csv import utils
IGNORED_NAMES = ['Windows', 'Program Files', 'Program Files (x86)', 'AppData'] IGNORED_NAMES = ['Windows', 'Program Files', 'Program Files (x86)', 'AppData']
@ -138,11 +138,10 @@ def main() -> None:
cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;") cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;")
cols = [t[0] for t in cur.description] cols = [t[0] for t in cur.description]
with open(f'{args.output}/{t_name}.csv', 'wb+') as f: with utils.csv_open(f'{args.output}/{t_name}.csv') as f:
f.write((';'.join(cols) + '\n').encode('utf-8')) f.header(cols)
for row in cur: for row in cur:
values = [csv.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)] f.row((utils.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)), raw=True)
f.write((';'.join(values) + '\n').encode('utf-8'))
print(f'Exported {t_name} successfully!', flush=True) print(f'Exported {t_name} successfully!', flush=True)
finally: finally:

View File

@ -8,7 +8,7 @@ import os
import re import re
import datetime import datetime
import csv import utils
DIR: str DIR: str
@ -46,7 +46,7 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]:
def import_csv(cur: sqlite3.Cursor, table_name: str) -> None: def import_csv(cur: sqlite3.Cursor, table_name: str) -> None:
rows = csv.parse(f'{DIR}/{table_name}.csv') rows = utils.csv_parse(f'{DIR}/{table_name}.csv')
names = next(rows) names = next(rows)
sql = f'INSERT INTO {table_name} ({", ".join(names)}) VALUES ({", ".join(["?"] * len(names))})' sql = f'INSERT INTO {table_name} ({", ".join(names)}) VALUES ({", ".join(["?"] * len(names))})'

View File

@ -10,7 +10,7 @@ import sqlite3
import requests import requests
import datetime import datetime
import csv import utils
DB_CNX: Optional[sqlite3.Connection] = None DB_CNX: Optional[sqlite3.Connection] = None
@ -170,9 +170,8 @@ def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]: def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
fbs = csv.parse_dict(f'{in_dir}/TFlaechenbindungen.csv')
members = {} members = {}
for f in fbs: for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
if f['MGNR'] not in members: if f['MGNR'] not in members:
members[f['MGNR']] = {} members[f['MGNR']] = {}
members[f['MGNR']][f['FBNR']] = f members[f['MGNR']][f['FBNR']] = f
@ -260,7 +259,7 @@ def lookup_kg_name(kgnr: int) -> str:
def migrate_gradation(in_dir: str, out_dir: str) -> None: def migrate_gradation(in_dir: str, out_dir: str) -> None:
global GRADATION_MAP global GRADATION_MAP
GRADATION_MAP = {} GRADATION_MAP = {}
for g in csv.parse_dict(f'{in_dir}/TUmrechnung.csv'): for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'):
GRADATION_MAP[g['Oechsle']] = g['KW'] GRADATION_MAP[g['Oechsle']] = g['KW']
@ -268,13 +267,13 @@ def migrate_branches(in_dir: str, out_dir: str) -> None:
global BRANCH_MAP global BRANCH_MAP
BRANCH_MAP = {} BRANCH_MAP = {}
with open(f'{out_dir}/branch.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/branch.csv') as f:
f.write('zwstid;name;country;postal_dest;address;phone_nr\n') f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr')
for b in csv.parse_dict(f'{in_dir}/TZweigstellen.csv'): for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'):
BRANCH_MAP[b['ZNR']] = b['Kennbst'] BRANCH_MAP[b['ZNR']] = b['Kennbst']
address = b['Straße'] address = b['Straße']
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address) postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
f.write(csv.format_row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon'])) f.row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon'])
def migrate_grosslagen(in_dir: str, out_dir: str) -> None: def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
@ -282,34 +281,34 @@ def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
GROSSLAGE_MAP = {} GROSSLAGE_MAP = {}
glnr = 0 glnr = 0
with open(f'{out_dir}/wb_gl.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/wb_gl.csv') as f:
f.write('glnr;name\n') f.header('glnr', 'name')
for gl in csv.parse_dict(f'{in_dir}/TGrosslagen.csv'): for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
glnr += 1 glnr += 1
GROSSLAGE_MAP[gl['GLNR']] = glnr GROSSLAGE_MAP[gl['GLNR']] = glnr
f.write(csv.format_row(glnr, gl['Bezeichnung'])) f.row(glnr, gl['Bezeichnung'])
def migrate_gemeinden(in_dir: str, out_dir: str) -> None: def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
global GEM_MAP global GEM_MAP
GEM_MAP = {} GEM_MAP = {}
with open(f'{out_dir}/wb_kg.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/wb_kg.csv') as f:
f.write('kgnr;glnr\n') f.header('kgnr', 'glnr')
for g in csv.parse_dict(f'{in_dir}/TGemeinden.csv'): for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'):
gems = lookup_gem_name(g['Bezeichnung']) gems = lookup_gem_name(g['Bezeichnung'])
GEM_MAP[g['GNR']] = gems GEM_MAP[g['GNR']] = gems
for kgnr, gkz in gems: for kgnr, gkz in gems:
f.write(csv.format_row(kgnr, GROSSLAGE_MAP[g['GLNR']])) f.row(kgnr, GROSSLAGE_MAP[g['GLNR']])
def migrate_reeds(in_dir: str, out_dir: str) -> None: def migrate_reeds(in_dir: str, out_dir: str) -> None:
global REED_MAP global REED_MAP
REED_MAP = {} REED_MAP = {}
with open(f'{out_dir}/wb_rd.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/wb_rd.csv') as f:
f.write('kgnr;rdnr;name\n') f.header('kgnr', 'rdnr', 'name')
for r in csv.parse_dict(f'{in_dir}/TRiede.csv'): for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'):
name: str = r['Bezeichnung'].strip() name: str = r['Bezeichnung'].strip()
if name.isupper(): if name.isupper():
name = name.title() name = name.title()
@ -321,26 +320,26 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None:
rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1 rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1
REED_MAP[r['RNR']] = (kgnr, rdnr) REED_MAP[r['RNR']] = (kgnr, rdnr)
f.write(csv.format_row(kgnr, rdnr, name)) f.row(kgnr, rdnr, name)
def migrate_attributes(in_dir: str, out_dir: str) -> None: def migrate_attributes(in_dir: str, out_dir: str) -> None:
with open(f'{out_dir}/wine_attribute.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
f.write('attrid;name;kg_per_ha\n') f.header('attrid', 'name', 'kg_per_ha')
for a in csv.parse_dict(f'{in_dir}/TSortenAttribute.csv'): for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
f.write(csv.format_row(a['SANR'], a['Attribut'], int(a['KgProHa']))) f.row(a['SANR'], a['Attribut'], int(a['KgProHa']))
if WG == 'MATZEN': if WG == 'MATZEN':
f.write(csv.format_row('M', 'Matzen', 10000)) f.row('M', 'Matzen', 10000)
f.write(csv.format_row('HU', 'Huber', 10000)) f.row('HU', 'Huber', 10000)
def migrate_cultivations(in_dir: str, out_dir: str) -> None: def migrate_cultivations(in_dir: str, out_dir: str) -> None:
global CULTIVATION_MAP global CULTIVATION_MAP
CULTIVATION_MAP = {} CULTIVATION_MAP = {}
with open(f'{out_dir}/wine_cultivation.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f:
f.write('cultid;name\n') f.header('cultid', 'name')
for c in csv.parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'): for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
name: str = c['Bezeichnung'] name: str = c['Bezeichnung']
cultid = name[0].upper() cultid = name[0].upper()
if name.isupper(): if name.isupper():
@ -348,27 +347,28 @@ def migrate_cultivations(in_dir: str, out_dir: str) -> None:
elif 'biolog' in name.lower(): elif 'biolog' in name.lower():
cultid = 'BIO' cultid = 'BIO'
CULTIVATION_MAP[c['BANR']] = cultid CULTIVATION_MAP[c['BANR']] = cultid
f.write(csv.format_row(cultid, name)) f.row(cultid, name)
def migrate_members(in_dir: str, out_dir: str) -> None: def migrate_members(in_dir: str, out_dir: str) -> None:
global MEMBER_MAP global MEMBER_MAP
MEMBER_MAP = {} MEMBER_MAP = {}
members = [m for m in csv.parse_dict(f'{in_dir}/TMitglieder.csv')] members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')]
mgnrs = [m['MGNR'] for m in members] mgnrs = [m['MGNR'] for m in members]
fbs = parse_flaechenbindungen(in_dir) fbs = parse_flaechenbindungen(in_dir)
with open(f'{out_dir}/member.csv', 'w+', encoding='utf-8') as f_m,\ with utils.csv_open(f'{out_dir}/member.csv') as f_m,\
open(f'{out_dir}/member_billing_address.csv', 'w+', encoding='utf-8') as f_mba,\ utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba,\
open(f'{out_dir}/wb_kg.csv', 'a', encoding='utf-8') as f_kg: utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg:
f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;' f_m.header(
'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;' 'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;' 'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
'country;postal_dest;address;' 'lfbis_nr', 'ustid', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic',
'email;phone_landline;phone_mobile_1;phone_mobile_2;' 'country', 'postal_dest', 'address',
'default_kgnr;comment\n') 'email', 'phone_landline', 'phone_mobile_1', 'phone_mobile_2',
f_mba.write('mgnr;name;country;postal_dest;address\n') 'default_kgnr', 'comment')
f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
for m in members: for m in members:
mgnr: int = m['MGNR'] mgnr: int = m['MGNR']
@ -573,7 +573,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]: elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
glnr = list(GROSSLAGE_MAP.values())[0] glnr = list(GROSSLAGE_MAP.values())[0]
print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})') print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})')
f_kg.write(csv.format_row(kgnr, glnr)) f_kg.row(kgnr, glnr)
if 9999 not in GEM_MAP: if 9999 not in GEM_MAP:
GEM_MAP[9999] = [] GEM_MAP[9999] = []
GEM_MAP[9999].append((kgnr, 0)) GEM_MAP[9999].append((kgnr, 0))
@ -583,7 +583,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
continue continue
pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None
f_m.write(csv.format_row( f_m.row(
mgnr, pred, prefix, given_name, middle_names, family_name, suffix, mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0, m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
m['BHKontonummer'], zwstid, bnr, ustid, m['BHKontonummer'], zwstid, bnr, ustid,
@ -591,12 +591,12 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
iban, bic, 'AT', postal_dest, address or '-', email, phone_landline, iban, bic, 'AT', postal_dest, address or '-', email, phone_landline,
phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None, phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None,
kgnr, m['Anmerkung'] kgnr, m['Anmerkung']
)) )
MEMBER_MAP[mgnr] = { MEMBER_MAP[mgnr] = {
'default_kgnr': kgnr 'default_kgnr': kgnr
} }
if billing_name: if billing_name:
f_mba.write(csv.format_row(mgnr, billing_name, 'AT', postal_dest, address or '-')) f_mba.row(mgnr, billing_name, 'AT', postal_dest, address or '-')
def migrate_area_commitments(in_dir: str, out_dir: str) -> None: def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
@ -692,12 +692,13 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text) text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text)
return text return text
with open(f'{out_dir}/area_commitment.csv', 'w+', encoding='utf-8') as f_fb, \ with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \
open(f'{out_dir}/area_commitment_attribute.csv', 'w+', encoding='utf-8') as f_attr: utils.csv_open(f'{out_dir}/area_commitment_attribute.csv',) as f_attr:
f_fb.write('fbnr;mgnr;sortid;cultid;area;kgnr;gstnr;rdnr;year_from;year_to;comment\n') f_fb.header('fbnr', 'mgnr', 'sortid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr',
f_attr.write('fbnr;attrid\n') 'year_from', 'year_to', 'comment')
f_attr.header('fbnr', 'attrid')
for fb in csv.parse_dict(f'{in_dir}/TFlaechenbindungen.csv'): for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
if fb['Von'] is None and fb['Bis'] is None: if fb['Von'] is None and fb['Bis'] is None:
continue continue
parz: str = fb['Parzellennummer'] parz: str = fb['Parzellennummer']
@ -723,10 +724,10 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None
to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
f_fb.write(csv.format_row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area, f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area,
kgnr, gstnr, rdnr, fb['Von'], to, comment)) kgnr, gstnr, rdnr, fb['Von'], to, comment)
if fb['SANR']: if fb['SANR']:
f_attr.write(csv.format_row(fbnr, fb['SANR'])) f_attr.row(fbnr, fb['SANR'])
def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]: def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]:
@ -782,7 +783,7 @@ def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str,
def migrate_deliveries(in_dir: str, out_dir: str) -> None: def migrate_deliveries(in_dir: str, out_dir: str) -> None:
modifiers = {m['ASNR']: m for m in csv.parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']} modifiers = {m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']}
delivery_map = {} delivery_map = {}
seasons = {} seasons = {}
branches = {} branches = {}
@ -796,17 +797,18 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
else: else:
raise NotImplementedError() raise NotImplementedError()
deliveries = list(csv.parse_dict(f'{in_dir}/TLieferungen.csv')) deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
delivery_dict = {d['LINR']: d for d in deliveries} delivery_dict = {d['LINR']: d for d in deliveries}
fixed = fix_deliveries(deliveries) fixed = fix_deliveries(deliveries)
with open(f'{out_dir}/delivery.csv', 'w+', encoding='utf-8') as f_delivery, \ with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \
open(f'{out_dir}/delivery_part.csv', 'w+', encoding='utf-8') as f_part, \ utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \
open(f'{out_dir}/delivery_part_attribute.csv', 'w+', encoding='utf-8') as f_attr: utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr:
f_delivery.write('year;did;date;time;zwstid;lnr;lsnr;mgnr;comment\n') f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment')
f_part.write('year;did;dpnr;sortid;weight;kmw;qualid;hkid;kgnr;rdnr;gerebelt;manual_weighing;spl_check;' f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr',
'hand_picked;lesewagen;temperature;acid;scale_id;weighing_id;comment\n') 'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen',
f_attr.write('year;did;dpnr;attrid\n') 'temperature', 'acid', 'scale_id', 'weighing_id', 'comment')
f_attr.header('year', 'did', 'dpnr', 'attrid')
for lsnr, linrs, date in fixed: for lsnr, linrs, date in fixed:
if date.year not in seasons: if date.year not in seasons:
@ -908,37 +910,33 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
if comment: if comment:
comments.append(comment) comments.append(comment)
f_part.write(csv.format_row( f_part.row(
date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, QUAL_MAP[d['QSNR']], HKID, kgnr, rdnr, date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, QUAL_MAP[d['QSNR']], HKID, kgnr, rdnr,
d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False, d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False,
hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment
)) )
for attrid in attributes: for attrid in attributes:
f_attr.write(csv.format_row(date.year, snr, dpnr, attrid)) f_attr.row(date.year, snr, dpnr, attrid)
f_delivery.write(csv.format_row( f_delivery.row(date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'],
date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'], '; '.join(comments) or None)
'; '.join(comments) or None
))
with open(f'{out_dir}/delivery_part_modifier.csv', 'w+', encoding='utf-8') as f_part_mod: with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
f_part_mod.write('year;did;dpnr;modid\n') f_part_mod.header('year', 'did', 'dpnr', 'modid')
for m in csv.parse_dict(f'{in_dir}/TLieferungAbschlag.csv'): for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
if m['LINR'] not in delivery_map: if m['LINR'] not in delivery_map:
continue continue
nid = delivery_map[m['LINR']] nid = delivery_map[m['LINR']]
f_part_mod.write(csv.format_row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id'])) f_part_mod.row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id'])
with open(f'{out_dir}/season.csv', 'w+', encoding='utf-8') as f_season, \ with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
open(f'{out_dir}/modifier.csv', 'w+', encoding='utf-8') as f_mod: utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
f_season.write('year;currency;precision;start_date;end_date\n') f_season.header('year', 'currency', 'precision', 'start_date', 'end_date')
f_mod.write('year;modid;name;abs;rel;standard;quick_select\n') f_mod.header('year', 'modid', 'name', 'abs', 'rel', 'standard', 'quick_select')
for y, s in seasons.items(): for y, s in seasons.items():
f_season.write(csv.format_row(y, s['currency'], s['precision'], s['start'], s['end'])) f_season.row(y, s['currency'], s['precision'], s['start'], s['end'])
for m in modifiers.values(): for m in modifiers.values():
abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
f_mod.write(csv.format_row( f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl'])
y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl']
))
def migrate_payments(in_dir: str, out_dir: str) -> None: def migrate_payments(in_dir: str, out_dir: str) -> None:
@ -946,7 +944,7 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
def migrate_parameters(in_dir: str, out_dir: str) -> None: def migrate_parameters(in_dir: str, out_dir: str) -> None:
params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in csv.parse_dict(f'{in_dir}/TParameter.csv')} params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')}
name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und') name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und')
shortened = name.replace(' für ', ' f. ').replace(' und ', ' u. ') shortened = name.replace(' für ', ' f. ').replace(' und ', ' u. ')
suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '') suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '')
@ -974,10 +972,10 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None:
'WEBSITE': params['MANDANTENHOMEPAGE'], 'WEBSITE': params['MANDANTENHOMEPAGE'],
} }
with open(f'{out_dir}/client_parameter.csv', 'w+', encoding='utf-8') as f: with utils.csv_open(f'{out_dir}/client_parameter.csv') as f:
f.write('param;value\n') f.header('param', 'value')
for param, value in new_params.items(): for param, value in new_params.items():
f.write(csv.format_row(param, value)) f.row(param, value)
def main() -> None: def main() -> None:

94
wgmaster/utils.py Normal file
View File

@ -0,0 +1,94 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
from typing import Iterator, Dict, Any, Tuple, TextIO, List
import re
import datetime
import csv
RE_INT = re.compile(r'-?[0-9]+')
RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+')
RE_STR_START = re.compile(r'.*,"[^"]*$')
RE_STR_END = re.compile(r'^[^"]*",.*')
def cast_value(value: str) -> Any:
if value == '':
return None
elif value[0] == '"' and value[-1] == '"':
return value[1:-1]
elif value == 'T':
return True
elif value == 'F':
return False
elif RE_INT.fullmatch(value):
return int(value)
elif RE_FLOAT.fullmatch(value):
return float(value)
elif len(value) == 10 and value[4] == '-' and value[7] == '-':
return datetime.datetime.strptime(value, '%Y-%m-%d').date()
elif len(value) == 8 and value[2] == ':' and value[5] == ':':
return datetime.time.fromisoformat(value)
else:
raise RuntimeError(f'unable to infer type of value "{value}"')
def convert_value(value: Any, table: str = None, column: str = None) -> str:
if value is None:
return ''
if type(value) == str:
return '"' + value.replace('"', '""') + '"'
elif type(value) == bool:
return 'T' if value else 'F'
elif type(value) == datetime.datetime and table is not None and column is not None:
if value.year == 1899 and value.month == 12 and value.day == 30:
return value.strftime('%H:%M:%S')
elif value.hour == 0 and value.minute == 0 and value.second == 0:
return value.strftime('%Y-%m-%d')
return str(value)
class CsvFile:
file: TextIO
reader: csv.reader
writer: csv.writer
def __init__(self, file: TextIO):
self.file = file
self.writer = csv.writer(self.file, doublequote=False, quoting=csv.QUOTE_NONE, escapechar=None)
self.reader = csv.reader(self.file, doublequote=False, quoting=csv.QUOTE_NONE, escapechar=None)
def __enter__(self) -> CsvFile:
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
return self.file.__exit__(exc_type, exc_val, exc_tb)
def __iter__(self) -> Iterator[List[str]]:
return self.reader.__iter__()
def row(self, *fields, raw: bool = False):
if not raw:
fields = (convert_value(field) for field in fields)
self.writer.writerow(fields)
def header(self, *headers):
self.writer.writerow(str(h) for h in headers)
def csv_open(filename: str, mode: str = 'w+') -> CsvFile:
return CsvFile(open(filename, mode, encoding='utf-8', newline=''))
def csv_parse(filename: str) -> Iterator[Tuple]:
with csv_open(filename, 'r') as f:
rows = f.__iter__()
yield tuple([field.strip() for field in next(rows)])
yield from (tuple(cast_value(field) for field in row) for row in rows)
def csv_parse_dict(filename: str) -> Iterator[Dict[str, Any]]:
rows = csv_parse(filename)
header = next(rows)
return ({header[i]: part for i, part in enumerate(row)} for row in rows)