1026 lines
41 KiB
Python
Executable File
1026 lines
41 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from typing import Dict, Any, Tuple, Optional, List, Iterable
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
import sqlite3
|
|
import requests
|
|
import datetime
|
|
|
|
import utils
|
|
|
|
|
|
DB_CNX: Optional[sqlite3.Connection] = None
|
|
QUIET: bool = False
|
|
|
|
HKID: Optional[str] = None
|
|
WG: Optional[str] = None
|
|
|
|
USTID_RE = re.compile(r'[A-Z]{2}[A-Z0-9]{2,12}')
|
|
BIC_RE = re.compile(r'[A-Z0-9]{4}[A-Z]{2}[A-Z0-9]{2}([A-Z0-9]{3})?')
|
|
IBAN_RE = re.compile(r'[A-Z]{2}[0-9]{2}[A-Z0-9]{8,30}')
|
|
EMAIL_RE = re.compile(r'[^@\s]+@([a-z0-9_äöüß-]+\.)+[a-z]{2,}')
|
|
|
|
GRADATION_MAP: Optional[Dict[float, float]] = None
|
|
CULTIVATION_MAP: Optional[Dict[int, str]] = None
|
|
BRANCH_MAP: Optional[Dict[int, str]] = None
|
|
GEM_MAP: Optional[Dict[int, List[Tuple[int, int]]]] = None
|
|
REED_MAP: Optional[Dict[int, Tuple[int, int]]] = None
|
|
GROSSLAGE_MAP: Optional[Dict[int, int]] = None
|
|
MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None
|
|
|
|
QUAL_MAP: Dict[int, str] = {
|
|
0: 'WEI',
|
|
1: 'RSW',
|
|
2: 'LDW',
|
|
3: 'QUW',
|
|
4: 'KAB',
|
|
5: 'SPL',
|
|
}
|
|
|
|
STREET_NAMES: Dict[str, str] = {
|
|
'Hans-Wagnerstraße': 'Hans-Wagner-Straße',
|
|
'J.Seitzstraße': 'Josef-Seitz-Straße',
|
|
'Kurhaus-Str.': 'Kurhausstraße',
|
|
'Kurhaus-Straße': 'Kurhausstraße',
|
|
'Pirawartherstraße': 'Pirawarther Straße',
|
|
'Raggendorferstraße': 'Raggendorfer Straße',
|
|
'Matznerstraße': 'Matzner Straße',
|
|
'Stillfriederstraße': 'Stillfrieder Straße',
|
|
'Harraserstraße': 'Harraser Straße',
|
|
'Gänserndorferstraße': 'Gänserdorfer Straße',
|
|
'Hofrat Döltlstraße': 'Hofrat-Döltl-Straße',
|
|
'Sulzerstraße': 'Sulzer Straße',
|
|
'Brünnerstraße': 'Brünner Straße',
|
|
'Flustraße': 'Flurstraße',
|
|
'Wienerstraße': 'Wiener Straße',
|
|
'St.Laurentstraße': 'St.-Laurentstraße',
|
|
'Angernerstraße': 'Angerner Straße',
|
|
'Schweinbartherstraße': 'Schweinbarther Straße',
|
|
'Hohenruppersdorferstraße': 'Hohenruppersdorfer Straße',
|
|
'Gruberhauptstraße': 'Gruber Hauptstraße',
|
|
'Josef Seitzstraße': 'Josef-Seitz-Straße',
|
|
'Auersthalerstraße': 'Auerstahler Straße',
|
|
'Ollersdorferstraße': 'Ollersdorfer Straße',
|
|
'Ritter Zoppelstraße': 'Ritter-Zoppel-Straße',
|
|
'Spannbergerstraße': 'Spannberger Straße',
|
|
'Ritter Zoppel Straße': 'Ritter-Zoppel-Straße',
|
|
'R. Virchow-Straße': 'Rudolf-Virchow-Straße',
|
|
'Ebenthalerstraße': 'Ebenthaler Straße',
|
|
'Bockfließerstraße': 'Bockfließer Straße',
|
|
'Dörfleserstraße': 'Dörfleser Straße',
|
|
'Dörflesserstraße': 'Dörfleser Straße',
|
|
'Grubere Hauptstraße': 'Gruber Hauptstraße',
|
|
'Groß Inzersdorf': 'Großinzersdorf',
|
|
}
|
|
|
|
|
|
def success(mgnr: int, key: str, value) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1;32m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def warning(mgnr: int, key: str, value) -> None:
|
|
print(f'\x1B[1;33m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def invalid(mgnr: int, key: str, value) -> None:
|
|
print(f'\x1B[1;31m{mgnr:>6}: {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def renumber_delivery(lsnr_1: str, lsnr_2: str) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1m{lsnr_1:<14} -> {lsnr_2:<14}\x1B[0m')
|
|
|
|
|
|
def warning_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
|
|
print(f'\x1B[1;33m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def invalid_delivery(lsnr: str, mgnr: int, key: str, value) -> None:
|
|
print(f'\x1B[1;31m{lsnr:<13} ({mgnr:>6}): {key:<12} {value}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def convert(mgnr: int, key: str, old_value: str, new_value) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1m{mgnr:>6}: {key:<12} "{old_value}" -> "{new_value}"\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def convert_name(mgnr: int, old_name: Tuple[str, str], new_name: Tuple[str, str, str, str, str],
|
|
billing: Optional[str] = None) -> None:
|
|
if not QUIET:
|
|
print(f'\x1B[1m{mgnr:>6}: '
|
|
f'{" / ".join([e or "" for e in old_name])} -> '
|
|
f'{" / ".join([e or "" for e in new_name])}'
|
|
f'{"(" + billing + ")" if billing else ""}\x1B[0m', file=sys.stderr)
|
|
|
|
|
|
def check_lfbis_nr(nr: str) -> bool:
|
|
# https://statistik.at/fileadmin/shared/QM/Standarddokumentationen/RW/std_r_land-forstw_register.pdf#page=41
|
|
if len(nr) != 7 or not nr.isdigit():
|
|
return False
|
|
s = 0
|
|
for i, ch in enumerate(nr[:-1]):
|
|
s += int(ch) * (7 - i)
|
|
v = (11 - (s % 11)) % 10
|
|
return v == int(nr[-1])
|
|
|
|
|
|
def check_ustid_at(nr: str) -> bool:
|
|
# http://www.pruefziffernberechnung.de/U/USt-IdNr.shtml
|
|
if not nr.startswith('ATU') or len(nr) != 11 or not nr[3:].isdigit():
|
|
return False
|
|
s = 0
|
|
for i, ch in enumerate(nr[3:-1]):
|
|
s += sum(map(int, str(int(ch) * (i % 2 + 1))))
|
|
v = (96 - s) % 10
|
|
return v == int(nr[-1])
|
|
|
|
|
|
def modulo(a: str, b: int) -> int:
|
|
s = 0
|
|
for ch in a:
|
|
s = (s * 10 + int(ch)) % b
|
|
return s
|
|
|
|
|
|
def check_iban(iban: str) -> bool:
|
|
if not IBAN_RE.fullmatch(iban):
|
|
return False
|
|
s = re.sub('[A-Z]', lambda ch: str(ord(ch.group(0)) - ord('A') + 10), (iban[4:] + iban[:4]))
|
|
return modulo(s, 97) == 1
|
|
|
|
|
|
def normalize_phone_nr(nr: str) -> str:
|
|
nr = re.sub('[ /-]', '', nr)
|
|
if nr[0] == '0':
|
|
nr = '+43' + nr[1:]
|
|
return nr
|
|
|
|
|
|
def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
|
|
r = requests.get(f'https://kataster.bev.gv.at/api/gst/{kgnr:05}/{gstnr}/')
|
|
if r.status_code != 200:
|
|
return None
|
|
data = r.json()
|
|
return sum([n['fl'] for n in data['properties']['nutzungen']])
|
|
|
|
|
|
def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
|
|
members = {}
|
|
for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
|
|
if f['MGNR'] not in members:
|
|
members[f['MGNR']] = {}
|
|
members[f['MGNR']][f['FBNR']] = f
|
|
return members
|
|
|
|
|
|
def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] = None) -> Optional[int]:
|
|
if plz is None or ort is None:
|
|
return None
|
|
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,))
|
|
rows: List[Tuple[int, str, str]] = cur.fetchall()
|
|
cur.close()
|
|
|
|
ort_m = ort.lower().replace('gr.', 'groß').replace(' ', '').replace('-', '').replace('ß', 'ss')
|
|
rows_m = [r[0] for r in rows if ort_m in r[2].lower().replace(' ', '').replace('-', '').replace('ß', 'ss')]
|
|
if len(rows_m) == 1:
|
|
return plz * 100000 + rows_m[0]
|
|
|
|
parts = address.split(' ')
|
|
street = parts[:-1]
|
|
nr = int(parts[-1].split('-')[0])
|
|
|
|
if ort == 'VELM-GÖTZENDORF':
|
|
if street == 'Landstraße' and nr <= 48 \
|
|
or street == 'Winterzeile' and (nr <= 49 or nr in (52, 54, 56)) \
|
|
or street == 'Hauptstraße' and (nr <= 106 or nr in (117, 115, 113, 111, 109, 107)):
|
|
# Velm
|
|
return plz * 100000 + 3572
|
|
else:
|
|
# Götzendorf
|
|
return plz * 100000 + 3571
|
|
|
|
raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})')
|
|
|
|
|
|
def lookup_kgnr(okz: Optional[int]) -> Optional[int]:
|
|
if okz is None:
|
|
return None
|
|
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT k.kgnr FROM AT_ort o JOIN wb_kg k ON k.kgnr = o.kgnr WHERE okz = ?", (okz,))
|
|
rows: List[Tuple[int]] = cur.fetchall()
|
|
cur.close()
|
|
|
|
if len(rows) == 1:
|
|
return rows[0][0]
|
|
return None
|
|
|
|
|
|
def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
|
|
if name.lower() == 'dörfles':
|
|
return [(6004, 30860)]
|
|
elif name.lower() == 'velm-götzendorf':
|
|
return [(6027, 30859), (6007, 30859)]
|
|
elif name.lower() == 'grub':
|
|
name = 'Grub an der March'
|
|
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name "
|
|
"FROM AT_kg k "
|
|
"JOIN AT_gem g ON g.gkz = k.gkz "
|
|
"JOIN wb_gem wg ON wg.gkz = g.gkz "
|
|
"WHERE LOWER(k.name) LIKE (LOWER(?) || '%') AND wg.hkid = 'WLWV'",
|
|
(name.replace('Gr.', 'Groß ').replace('Groß ', 'Groß').replace('-', ''),))
|
|
rows: List[Tuple[int, str, int, str]] = cur.fetchall()
|
|
cur.close()
|
|
|
|
if len(rows) == 1:
|
|
return [(k, g) for k, _, g, _ in rows]
|
|
|
|
print(name, rows)
|
|
raise RuntimeError()
|
|
|
|
|
|
def lookup_kg_name(kgnr: int) -> str:
|
|
cur = DB_CNX.cursor()
|
|
cur.execute("SELECT name FROM AT_kg WHERE kgnr = ?", (kgnr,))
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
return rows[0][0]
|
|
|
|
|
|
def migrate_gradation(in_dir: str, out_dir: str) -> None:
|
|
global GRADATION_MAP
|
|
GRADATION_MAP = {}
|
|
for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'):
|
|
GRADATION_MAP[g['Oechsle']] = g['KW']
|
|
|
|
|
|
def migrate_branches(in_dir: str, out_dir: str) -> None:
|
|
global BRANCH_MAP
|
|
BRANCH_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/branch.csv') as f:
|
|
f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr')
|
|
for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'):
|
|
BRANCH_MAP[b['ZNR']] = b['Kennbst']
|
|
address = b['Straße']
|
|
postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
|
|
f.row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon'])
|
|
|
|
|
|
def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
|
|
global GROSSLAGE_MAP
|
|
GROSSLAGE_MAP = {}
|
|
|
|
glnr = 0
|
|
with utils.csv_open(f'{out_dir}/wb_gl.csv') as f:
|
|
f.header('glnr', 'name')
|
|
for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
|
|
glnr += 1
|
|
GROSSLAGE_MAP[gl['GLNR']] = glnr
|
|
f.row(glnr, gl['Bezeichnung'])
|
|
|
|
|
|
def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
|
|
global GEM_MAP
|
|
GEM_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/wb_kg.csv') as f:
|
|
f.header('kgnr', 'glnr')
|
|
for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'):
|
|
gems = lookup_gem_name(g['Bezeichnung'])
|
|
GEM_MAP[g['GNR']] = gems
|
|
for kgnr, gkz in gems:
|
|
f.row(kgnr, GROSSLAGE_MAP[g['GLNR']])
|
|
|
|
|
|
def migrate_reeds(in_dir: str, out_dir: str) -> None:
|
|
global REED_MAP
|
|
REED_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/wb_rd.csv') as f:
|
|
f.header('kgnr', 'rdnr', 'name')
|
|
for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'):
|
|
name: str = r['Bezeichnung'].strip()
|
|
if name.isupper():
|
|
name = name.title()
|
|
|
|
gem = GEM_MAP[r['GNR']]
|
|
kgnr = gem[0][0]
|
|
if len(gem) != 1:
|
|
print(gem, name, '->', gem[0])
|
|
|
|
rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1
|
|
REED_MAP[r['RNR']] = (kgnr, rdnr)
|
|
f.row(kgnr, rdnr, name)
|
|
|
|
|
|
def migrate_attributes(in_dir: str, out_dir: str) -> None:
|
|
with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
|
|
f.header('attrid', 'name', 'kg_per_ha')
|
|
for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
|
|
f.row(a['SANR'], a['Attribut'], int(a['KgProHa']))
|
|
if WG == 'MATZEN':
|
|
f.row('M', 'Matzen', 10000)
|
|
f.row('HU', 'Huber', 10000)
|
|
|
|
|
|
def migrate_cultivations(in_dir: str, out_dir: str) -> None:
|
|
global CULTIVATION_MAP
|
|
CULTIVATION_MAP = {}
|
|
|
|
with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f:
|
|
f.header('cultid', 'name')
|
|
for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
|
|
name: str = c['Bezeichnung']
|
|
cultid = name[0].upper()
|
|
if name.isupper():
|
|
cultid = name
|
|
elif 'biolog' in name.lower():
|
|
cultid = 'BIO'
|
|
CULTIVATION_MAP[c['BANR']] = cultid
|
|
f.row(cultid, name)
|
|
|
|
|
|
def migrate_members(in_dir: str, out_dir: str) -> None:
|
|
global MEMBER_MAP
|
|
MEMBER_MAP = {}
|
|
|
|
members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')]
|
|
mgnrs = [m['MGNR'] for m in members]
|
|
fbs = parse_flaechenbindungen(in_dir)
|
|
|
|
with utils.csv_open(f'{out_dir}/member.csv') as f_m,\
|
|
utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba,\
|
|
utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg:
|
|
f_m.header(
|
|
'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
|
|
'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
|
|
'lfbis_nr', 'ustid', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic',
|
|
'country', 'postal_dest', 'address',
|
|
'email', 'phone_landline', 'fax', 'phone_mobile_1', 'phone_mobile_2',
|
|
'default_kgnr', 'comment')
|
|
f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
|
|
|
|
for m in members:
|
|
mgnr: int = m['MGNR']
|
|
family_name: str = m['Nachname']
|
|
given_name: str = m['Vorname']
|
|
prefix: Optional[str] = None
|
|
middle_names: Optional[str] = None
|
|
suffix: Optional[str] = None
|
|
billing_name: Optional[str] = None
|
|
funktionaer = False
|
|
|
|
if family_name is None and given_name is None:
|
|
continue
|
|
|
|
given_name = given_name or ''
|
|
if WG == 'MATZEN' and given_name.startswith(' '):
|
|
funktionaer = True
|
|
|
|
family_name = re.sub(r'\s+', ' ', family_name).strip()
|
|
given_name = re.sub(r'\s+', ' ', given_name).strip().replace(', ', ',')
|
|
|
|
if ' ' in family_name or '.' in family_name or ',' in family_name:
|
|
if family_name.endswith(' KG'):
|
|
parts = family_name.split(' ')
|
|
family_name = parts[0].title()
|
|
billing_name = f'{family_name} KG'
|
|
|
|
convert_name(mgnr, (m['Nachname'], m['Vorname']),
|
|
(prefix, given_name, middle_names, family_name, suffix), billing_name)
|
|
elif (' ' in given_name or '.' in given_name or ',' in given_name) and given_name != 'EVA MARIA':
|
|
if ' u. ' in given_name.lower() or ' u ' in given_name.lower() or ' und ' in given_name.lower():
|
|
parts = given_name.split(' ')
|
|
family_name = family_name.title()
|
|
billing_name = f'{family_name} {parts[0].title()} und {parts[-1].title()}'
|
|
given_name = parts[0].title()
|
|
elif given_name.lower().endswith(' gesbr'):
|
|
family_name = family_name.title()
|
|
given_name = given_name.split(' ')[0].title()
|
|
billing_name = f'{family_name} {given_name} GesbR'
|
|
elif given_name.endswith(' KeG.'):
|
|
family_name = family_name.title()
|
|
given_name = given_name.split(' ')[0].title()
|
|
billing_name = f'{family_name} {given_name} KEG'
|
|
elif given_name.lower().endswith(' jun') or given_name.lower().endswith(' jun.') or \
|
|
given_name.lower().endswith(' sen') or given_name.lower().endswith(' sen.'):
|
|
family_name = family_name.title()
|
|
parts = given_name.split(' ')
|
|
suffix = parts[-1].lower()
|
|
if suffix[-1] != '.':
|
|
suffix += '.'
|
|
given_name = parts[0].title()
|
|
elif ',' in given_name:
|
|
family_name = family_name.title()
|
|
parts = given_name.split(',')
|
|
given_name = parts[0].title()
|
|
prefix = ' '.join([p.title() for p in parts[1:]])
|
|
elif given_name.endswith(' DI'):
|
|
family_name = family_name.title()
|
|
given_name = given_name.split(' ')[0].title()
|
|
prefix = 'Dipl.-Ing.'
|
|
elif given_name.lower().endswith(' ing') or given_name.lower().endswith(' ing.') or \
|
|
given_name.lower().endswith(' dr') or given_name.lower().endswith(' dr.'):
|
|
family_name = family_name.title()
|
|
parts = given_name.split(' ')
|
|
given_name = parts[0].title()
|
|
prefix = parts[-1].title()
|
|
if prefix[-1] != '.':
|
|
prefix += '.'
|
|
|
|
convert_name(mgnr, (m['Nachname'], m['Vorname']),
|
|
(prefix, given_name, middle_names, family_name, suffix), billing_name)
|
|
else:
|
|
family_name = family_name.title()
|
|
given_name = given_name.title()
|
|
|
|
bnr: Optional[str] = m['Betriebsnummer'] if m['Betriebsnummer'] != '-' else None
|
|
if bnr is not None:
|
|
bnr = bnr.replace('.', '')
|
|
if len(bnr) == 10:
|
|
bnr = bnr.removesuffix('000')
|
|
elif len(bnr) == 6:
|
|
bnr = '0' + bnr
|
|
if not check_lfbis_nr(bnr):
|
|
if bnr == '1234567':
|
|
warning(mgnr, 'BetriebsNr.', bnr)
|
|
else:
|
|
invalid(mgnr, 'BetriebsNr.', bnr)
|
|
bnr = None
|
|
|
|
ustid: Optional[str] = m['UID']
|
|
if ustid is not None:
|
|
ustid = ustid.replace(' ', '')
|
|
if len(ustid) == 8 and ustid.isdigit():
|
|
ustid = 'ATU' + ustid
|
|
elif not USTID_RE.fullmatch(ustid):
|
|
invalid(mgnr, 'UID', ustid)
|
|
ustid = None
|
|
if ustid and not check_ustid_at(ustid):
|
|
if ustid == 'ATU11111111':
|
|
warning(mgnr, 'UID', ustid)
|
|
else:
|
|
invalid(mgnr, 'UID', ustid)
|
|
ustid = None
|
|
|
|
iban: Optional[str] = m['IBAN']
|
|
bic: Optional[str] = m['BIC']
|
|
blz: Optional[int] = m['BLZ']
|
|
kto_nr: Optional[str] = m['KontoNr']
|
|
|
|
if iban is None:
|
|
pass
|
|
|
|
if iban is not None:
|
|
iban = iban.replace(' ', '')
|
|
if not check_iban(iban):
|
|
invalid(mgnr, 'IBAN', iban)
|
|
iban = None
|
|
|
|
if bic is not None:
|
|
bic = bic.upper()
|
|
if bic == 'RLNWATAUE':
|
|
bic = 'RLNWATWWAUE'
|
|
if not BIC_RE.fullmatch(bic):
|
|
invalid(mgnr, 'BIC', bic)
|
|
bic = None
|
|
if bic is not None:
|
|
if len(bic) == 11 and bic.endswith('XXX'):
|
|
bic = bic[:-3]
|
|
|
|
ort: Optional[str] = m['Ort']
|
|
address: Optional[str] = m['Straße']
|
|
if address is not None:
|
|
address_old = address
|
|
address = re.sub('([0-9])([A-Z])', lambda a: a.group(1) + a.group(2).lower(),
|
|
re.sub(r'\s+', ' ', address).strip().title())
|
|
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
|
|
.replace('Strasse', 'Straße').replace('Str.', 'Straße')\
|
|
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
|
|
.replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\
|
|
.replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\
|
|
.replace('Haupstraße', 'Hauptstraße')
|
|
address = re.sub('([a-z])([0-9])', lambda a: a.group(1) + ' ' + a.group(2), address)
|
|
if address.startswith('Nr. ') or address.startswith('Nr ') or address.isdigit():
|
|
address = ort.title() + ' ' + address.split(' ')[-1]
|
|
elif address.startswith('Ob. '):
|
|
address = address.replace('Ob. ', 'Obere ', 1)
|
|
address = address.replace(' Nr. ', ' ')
|
|
address = re.sub(r'([^0-9]+?)( [0-9])',
|
|
lambda a: STREET_NAMES.get(a.group(1), a.group(1)) + a.group(2), address)
|
|
if address_old != address:
|
|
convert(mgnr, 'Adresse', address_old, address)
|
|
|
|
phone_1: Optional[str] = m['Telefon']
|
|
phone_2: Optional[str] = m['Mobiltelefon']
|
|
email: Optional[str] = m['EMail']
|
|
phone_landline, fax = None, None
|
|
phone_mobile = []
|
|
|
|
if email is not None:
|
|
if email.isupper():
|
|
email = email.lower()
|
|
if not EMAIL_RE.fullmatch(email):
|
|
invalid(mgnr, 'E-Mail', m['EMail'])
|
|
email = None
|
|
|
|
if phone_1:
|
|
phone_1 = normalize_phone_nr(phone_1)
|
|
if len(phone_1) <= 8 or phone_1[0] != '+':
|
|
invalid(mgnr, 'Tel.Nr.', m['Telefon'])
|
|
else:
|
|
if phone_1[3] == '6':
|
|
phone_mobile.append(phone_1)
|
|
else:
|
|
phone_landline = phone_1
|
|
if phone_2:
|
|
phone_2 = normalize_phone_nr(phone_2)
|
|
if len(phone_2) <= 8 or phone_2[0] != '+':
|
|
invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon'])
|
|
else:
|
|
if phone_2[3] == '6':
|
|
if phone_2 not in phone_mobile:
|
|
phone_mobile.append(phone_2)
|
|
elif phone_landline is None:
|
|
phone_landline = phone_2
|
|
elif phone_landline != phone_2:
|
|
invalid(mgnr, 'Tel.Nr.', phone_2)
|
|
|
|
zwstid = m['ZNR'] and BRANCH_MAP[m['ZNR']] or len(BRANCH_MAP) == 1 and list(BRANCH_MAP.values())[0]
|
|
postal_dest = lookup_plz(int(m['PLZ']) if m['PLZ'] else None, m['Ort'], address)
|
|
|
|
#if mgnr in fbs:
|
|
# gems = {v['GNR'] for k, v in fbs[mgnr].items() if v['Bis'] and int(v['Bis']) >= 2020}
|
|
# if len(gems) == 1:
|
|
# print(GEM_MAP[list(gems)[0]])
|
|
|
|
okz = postal_dest % 100000 if postal_dest else None
|
|
kgnr = lookup_kgnr(okz)
|
|
active = m['Aktives Mitglied'] or False
|
|
if kgnr is None:
|
|
invalid(mgnr, 'KGNr.', ort)
|
|
active = False
|
|
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
|
|
glnr = list(GROSSLAGE_MAP.values())[0]
|
|
print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})')
|
|
f_kg.row(kgnr, glnr)
|
|
if 9999 not in GEM_MAP:
|
|
GEM_MAP[9999] = []
|
|
GEM_MAP[9999].append((kgnr, 0))
|
|
|
|
if postal_dest is None:
|
|
invalid(mgnr, 'PLZ', None)
|
|
continue
|
|
|
|
pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None
|
|
f_m.row(
|
|
mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
|
|
m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
|
|
m['BHKontonummer'], zwstid, bnr, ustid,
|
|
m['Volllieferant'] or False, m['Buchführend'] or False, funktionaer, active,
|
|
iban, bic, 'AT', postal_dest, address or '-', email, phone_landline, fax,
|
|
phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None,
|
|
kgnr, m['Anmerkung']
|
|
)
|
|
MEMBER_MAP[mgnr] = {
|
|
'default_kgnr': kgnr
|
|
}
|
|
if billing_name:
|
|
f_mba.row(mgnr, billing_name, 'AT', postal_dest, address or '-')
|
|
|
|
|
|
def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
|
|
def parse_gstnrs(nr_str: str, kgnr: int, mgnr: int) -> List[str]:
|
|
if nr_str is None:
|
|
return []
|
|
elif nr_str.isdigit() and len(nr_str) <= 6:
|
|
return [nr_str]
|
|
elif nr_str.count('/') == 1:
|
|
parts = nr_str.split('/')
|
|
if len(parts[0]) == len(parts[1]) and parts[0].isdigit() and parts[1].isdigit() and len(parts[0]) >= 3:
|
|
return [parts[0], parts[1]]
|
|
elif parts[0].isdigit() and len(parts[0]) <= 6 and parts[1].isdigit() and len(parts[1]) <= 3:
|
|
return [nr_str]
|
|
if nr_str.count('/') > 1:
|
|
parts = nr_str.split('/')
|
|
if all([p.isdigit() for p in parts]):
|
|
if all([len(p) <= 1 for p in parts[1:]]):
|
|
return [f'{parts[0]}/{p}' for p in parts[1:]]
|
|
elif all([len(p) == len(parts[0]) for p in parts]):
|
|
return parts
|
|
if nr_str.startswith(f'{kgnr:05}'):
|
|
return parse_gstnrs(nr_str[5:].strip(), kgnr, mgnr)
|
|
if nr_str.endswith(' 2000'):
|
|
return parse_gstnrs(nr_str[:-5].strip(), kgnr, mgnr)
|
|
parts = re.split(r' *[,;+&] *', nr_str)
|
|
if len(parts) == 1:
|
|
parts = nr_str.split(' / ')
|
|
if len(parts) == 1 and ' ' not in nr_str:
|
|
parts = nr_str.split(' ')
|
|
if len(parts) == 1 and ' ' not in nr_str and '-' not in nr_str:
|
|
parts = nr_str.split(' ')
|
|
if len(parts) > 1:
|
|
return [gst for p in parts for gst in parse_gstnrs(p, kgnr, mgnr)]
|
|
|
|
m = re.fullmatch(r'([0-9]+/)?([0-9]+) *- *([0-9]+)', nr_str)
|
|
if m is not None:
|
|
b = m.group(1)
|
|
f = int(m.group(2))
|
|
t = int(m.group(3))
|
|
if t < f:
|
|
t += f - (f % pow(10, len(m.group(3))))
|
|
if t - f < 50:
|
|
return [
|
|
gst
|
|
for counter in range(f, t + 1)
|
|
for p in [f'{b or ""}{counter}']
|
|
for gst in parse_gstnrs(p, kgnr, mgnr)
|
|
]
|
|
|
|
invalid(mgnr, 'GstNr.', f'{kgnr:05}-{nr_str}')
|
|
return []
|
|
|
|
def replace_nrs(m: re.Match, sep: str) -> str:
|
|
end = m.group(0).endswith(sep)
|
|
parts = [int(p) for p in m.group(0).split(sep)]
|
|
text = ''
|
|
last = None
|
|
for i, p in enumerate(parts):
|
|
if last is not None:
|
|
if last + 1 == p:
|
|
last = p
|
|
continue
|
|
else:
|
|
text += f'{last}{sep}'
|
|
last = None
|
|
if len(parts) > i + 2 and p + 1 == parts[i + 1] and p + 2 == parts[i + 2]:
|
|
last = p
|
|
text += f'{p}-'
|
|
else:
|
|
text += f'{p}{sep}'
|
|
if last is not None:
|
|
text += str(last)
|
|
return text.strip().strip(sep) + (sep if end else '')
|
|
|
|
def format_gstnr(nrs: List[str]) -> Optional[str]:
|
|
if len(nrs) == 0:
|
|
return None
|
|
nrs = [re.sub(r'\b0+', '', nr)
|
|
for nr in sorted([re.sub(r'[0-9]+', lambda m: m.group(0).rjust(6, '0'), nr)
|
|
for nr in nrs])]
|
|
last = None
|
|
text = ''
|
|
for nr in nrs:
|
|
if last is None:
|
|
text += nr
|
|
elif '/' in last and last.split('/')[:-1] == nr.split('/')[:-1]:
|
|
text += f'+{nr.split("/")[-1]}'
|
|
else:
|
|
text += f', {nr}'
|
|
last = nr
|
|
text = re.sub(r'[0-9]+\+[0-9]+(\+[0-9]+)+', lambda m: replace_nrs(m, '+'), text)
|
|
text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text)
|
|
return text
|
|
|
|
with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \
|
|
utils.csv_open(f'{out_dir}/area_commitment_attribute.csv',) as f_attr:
|
|
f_fb.header('fbnr', 'mgnr', 'sortid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr',
|
|
'year_from', 'year_to', 'comment')
|
|
f_attr.header('fbnr', 'attrid')
|
|
|
|
for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
|
|
if fb['Von'] is None and fb['Bis'] is None:
|
|
continue
|
|
parz: str = fb['Parzellennummer']
|
|
fbnr: int = fb['FBNR']
|
|
mgnr: int = fb['MGNR']
|
|
gem = GEM_MAP[fb['GNR']]
|
|
kgnr = gem[0][0]
|
|
if mgnr not in MEMBER_MAP:
|
|
continue
|
|
|
|
area = int(fb['Flaeche'])
|
|
gstnrs = parse_gstnrs(parz, kgnr, fb['MGNR'])
|
|
comment, gstnr = None, None
|
|
if parz is None or parz == '0000':
|
|
invalid(mgnr, 'GstNr.', f'{kgnr or 0:05}-{parz}')
|
|
gstnrs = []
|
|
gstnr = '-'
|
|
if len(gstnrs) == 0:
|
|
comment = f'KG {kgnr:05}: {parz}'
|
|
gstnr = format_gstnr(gstnrs) or gstnr or parz
|
|
if parz != gstnr.replace('+', '/'):
|
|
convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr)
|
|
|
|
rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None
|
|
to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
|
|
f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area,
|
|
kgnr, gstnr, rdnr, fb['Von'], to, comment)
|
|
if fb['SANR']:
|
|
f_attr.row(fbnr, fb['SANR'])
|
|
|
|
|
|
def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]:
|
|
dates = {}
|
|
fixed = {}
|
|
last_dates = {}
|
|
|
|
def add(lsnr: str, linr: int, date: datetime.date, unique: bool = False) -> None:
|
|
if lsnr not in fixed:
|
|
fixed[lsnr] = []
|
|
dates[lsnr] = date
|
|
elif unique:
|
|
return add(lsnr + '/2', linr, date, unique)
|
|
fixed[lsnr].append(linr)
|
|
|
|
def get_lsnr(date: datetime.date, lsnr: str) -> str:
|
|
if date.year < 2000:
|
|
return date.strftime('%y%m%d00') + lsnr[8:]
|
|
else:
|
|
return date.strftime('%Y%m%d') + lsnr[8:]
|
|
|
|
deliveries: List[Tuple[int, str, datetime.date, int, int]] = [
|
|
(d['LINR'], d['Lieferscheinnummer'], d['Datum'], d['ZNR'], d['MGNR'])
|
|
for d in deliveries
|
|
if d['Lieferscheinnummer'] and not d['Storniert']
|
|
]
|
|
|
|
lsnrs = {d[1] for d in deliveries}
|
|
|
|
for lnr, lsnr, date, zwstnr, mgnr in deliveries:
|
|
lsdate = datetime.date(int(lsnr[:4]), int(lsnr[4:6]), int(lsnr[6:8])) if not lsnr.startswith('9') \
|
|
else datetime.date(1900 + int(lsnr[:2]), int(lsnr[2:4]), int(lsnr[4:6]))
|
|
|
|
if len(lsnr) == 12:
|
|
if date != lsdate:
|
|
if date.year == lsdate.year:
|
|
lsnr_n = get_lsnr(date, lsnr)
|
|
if lsnr_n not in lsnrs:
|
|
lsnr = lsnr_n
|
|
else:
|
|
warning_delivery(lsnr, mgnr, 'date', date)
|
|
else:
|
|
date = datetime.date(lsdate.year, date.month, date.day)
|
|
|
|
if zwstnr not in last_dates or not date < last_dates[zwstnr]:
|
|
last_dates[zwstnr] = date
|
|
add(lsnr, lnr, date, unique=True)
|
|
else:
|
|
add(lsnr[:12], lnr, date)
|
|
|
|
return sorted([(f[0], f[1], dates[f[0]]) for f in fixed.items()],
|
|
key=lambda f: f[0] if not f[0].startswith('9') else '19' + f[0])
|
|
|
|
|
|
def migrate_deliveries(in_dir: str, out_dir: str) -> None:
|
|
modifiers = {m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']}
|
|
delivery_map = {}
|
|
seasons = {}
|
|
branches = {}
|
|
|
|
for mod in modifiers.values():
|
|
name: str = mod['Bezeichnung']
|
|
if WG is None:
|
|
mod['id'] = str(mod['ASNR'])
|
|
elif WG == 'MATZEN':
|
|
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS'
|
|
else:
|
|
raise NotImplementedError()
|
|
|
|
deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
|
|
delivery_dict = {d['LINR']: d for d in deliveries}
|
|
fixed = fix_deliveries(deliveries)
|
|
|
|
with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \
|
|
utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \
|
|
utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr:
|
|
f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment')
|
|
f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr',
|
|
'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen',
|
|
'temperature', 'acid', 'scale_id', 'weighing_id', 'comment')
|
|
f_attr.header('year', 'did', 'dpnr', 'attrid')
|
|
|
|
for lsnr, linrs, date in fixed:
|
|
if date.year not in seasons:
|
|
seasons[date.year] = {
|
|
'currency': 'EUR' if date.year >= 2001 else 'ATS',
|
|
'precision': 4,
|
|
'start': date,
|
|
'end': date,
|
|
'nr': 0,
|
|
}
|
|
s = seasons[date.year]
|
|
if date > s['end']:
|
|
s['end'] = date
|
|
s['nr'] += 1
|
|
snr = s['nr']
|
|
|
|
znr = delivery_dict[linrs[0]]['ZNR']
|
|
if znr not in branches:
|
|
branches[znr] = {}
|
|
if date not in branches[znr]:
|
|
branches[znr][date] = 0
|
|
branches[znr][date] += 1
|
|
lnr = branches[znr][date]
|
|
|
|
comments = []
|
|
attributes = set()
|
|
for dpnr, linr in enumerate(linrs, start=1):
|
|
d = delivery_dict[linr]
|
|
delivery_map[linr] = (date.year, snr, dpnr)
|
|
if lsnr != d['Lieferscheinnummer']:
|
|
renumber_delivery(d['Lieferscheinnummer'], lsnr)
|
|
|
|
oe = d['OechsleOriginal'] or d['Oechsle']
|
|
kmw = GRADATION_MAP[oe]
|
|
sortid = d['SNR'].upper()
|
|
if d['SANR']:
|
|
attributes.add(d['SANR'])
|
|
if len(sortid) != 2:
|
|
attributes.add(sortid[2:])
|
|
sortid = sortid[:2]
|
|
|
|
if WG == 'MATZEN':
|
|
if sortid == 'HU':
|
|
# Gr.Veltliner (Huber)
|
|
sortid = 'GV'
|
|
attributes.add('HU')
|
|
elif sortid == 'SV':
|
|
sortid = 'SW'
|
|
elif sortid == 'WC':
|
|
# WEIẞBURGUNDER/CHARDONNAY
|
|
sortid = 'SW'
|
|
if 'H' in attributes:
|
|
attributes.remove('H')
|
|
attributes.add('HK')
|
|
if 'W' in attributes:
|
|
attributes.remove('W')
|
|
|
|
if d['SNR'] != sortid:
|
|
print(f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{attributes}')
|
|
|
|
kgnr, rdnr = None, None
|
|
if d['GNR']:
|
|
gem = GEM_MAP[d['GNR']]
|
|
if len(gem) == 1:
|
|
kgnr = gem[0][0]
|
|
if d['RNR']:
|
|
rd = REED_MAP[d['RNR']]
|
|
# TODO reed nr
|
|
if kgnr is None:
|
|
m = MEMBER_MAP[d['MGNR']]
|
|
kgnr = m['default_kgnr']
|
|
if kgnr is None:
|
|
warning_delivery(lsnr, d['MGNR'], 'KGNr.', None)
|
|
elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
|
|
warning_delivery(lsnr, d['MGNR'], 'KGNr.', kgnr)
|
|
kgnr = None
|
|
|
|
waage = d['Waagentext']
|
|
scale_id, weighing_id = None, None
|
|
if waage:
|
|
waage = re.split(r' +', waage)
|
|
scale_id = waage[1]
|
|
weighing_id = waage[3]
|
|
|
|
comment: Optional[str] = d['Anmerkung']
|
|
acid = d['Säure']
|
|
hand, lesemaschine = None, None
|
|
if comment:
|
|
comment = comment.replace('Söure', 'Säure')
|
|
if comment.startswith('Säure'):
|
|
acid = float(comment.split(' ')[-1].replace(',', '.'))
|
|
comment = None
|
|
elif comment == 'Maschine':
|
|
hand = False
|
|
comment = None
|
|
elif comment == 'Hand':
|
|
hand = True
|
|
comment = None
|
|
if comment:
|
|
comments.append(comment)
|
|
|
|
f_part.row(
|
|
date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, QUAL_MAP[d['QSNR']], HKID, kgnr, rdnr,
|
|
d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False,
|
|
hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment
|
|
)
|
|
for attrid in attributes:
|
|
f_attr.row(date.year, snr, dpnr, attrid)
|
|
f_delivery.row(date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'],
|
|
'; '.join(comments) or None)
|
|
|
|
with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
|
|
f_part_mod.header('year', 'did', 'dpnr', 'modid')
|
|
for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
|
|
if m['LINR'] not in delivery_map:
|
|
continue
|
|
nid = delivery_map[m['LINR']]
|
|
f_part_mod.row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id'])
|
|
|
|
with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
|
|
utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
|
|
f_season.header('year', 'currency', 'precision', 'start_date', 'end_date')
|
|
f_mod.header('year', 'modid', 'name', 'abs', 'rel', 'standard', 'quick_select')
|
|
for y, s in seasons.items():
|
|
f_season.row(y, s['currency'], s['precision'], s['start'], s['end'])
|
|
for m in modifiers.values():
|
|
abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
|
|
f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl'])
|
|
|
|
|
|
def migrate_payments(in_dir: str, out_dir: str) -> None:
|
|
pass # TODO migrate payments
|
|
|
|
|
|
def migrate_parameters(in_dir: str, out_dir: str) -> None:
|
|
params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')}
|
|
name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und')
|
|
shortened = name.replace(' für ', ' f. ').replace(' und ', ' u. ')
|
|
suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '')
|
|
suffixes = {
|
|
'reggenmbh': 'reg. Gen.m.b.H.'
|
|
}
|
|
|
|
new_params: Dict[str, Optional[str]] = {
|
|
'NAME_TOKEN': None,
|
|
'NAME_SHORT': None,
|
|
'NAME_SHORTENED': shortened,
|
|
'NAME': name,
|
|
'NAME_SUFFIX': suffixes[suffix],
|
|
'PLZ': params['MANDANTENPLZ'],
|
|
'ORT': params['MANDANTENORT'],
|
|
'ADDRESS': params['MANDANTENSTRASSE'],
|
|
'DOCUMENT_SENDER': params['ABSENDERTEXT2'],
|
|
'IBAN': None,
|
|
'BIC': None,
|
|
'USTID': params['MANDANTENUID'].replace(' ', ''),
|
|
'LFBISNR': params['MANDANTENBETRIEBSNUMMER'],
|
|
'PHONE': params['MANDANTENTELEFON'],
|
|
'FAX': params['MANDANTENTELEFAX'],
|
|
'EMAIL': params['MANDANTENEMAIL'],
|
|
'WEBSITE': params['MANDANTENHOMEPAGE'],
|
|
}
|
|
|
|
with utils.csv_open(f'{out_dir}/client_parameter.csv') as f:
|
|
f.header('param', 'value')
|
|
for param, value in new_params.items():
|
|
f.row(param, value)
|
|
|
|
|
|
def main() -> None:
|
|
global DB_CNX, QUIET, HKID, WG
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('in_dir', type=str,
|
|
help='The input directory where the exported csv files are stored')
|
|
parser.add_argument('out_dir', type=str,
|
|
help='The output directory where the migrated csv file should be stored')
|
|
parser.add_argument('-q', '--quiet', action='store_true', default=False,
|
|
help='Be less verbose')
|
|
parser.add_argument('-d', '--database', metavar='DB', required=True,
|
|
help='The sqlite database file to look up information')
|
|
parser.add_argument('-g', '--genossenschaft', metavar='WG', required=False, type=str,
|
|
choices=('MATZEN', 'WOLKERSDORF'))
|
|
parser.add_argument('-o', '--origin', metavar='HKID', required=True,
|
|
help='The default wine origin identifier '
|
|
'(consider that the origin is ALWAYS set according to the KGNr if available)')
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.out_dir, exist_ok=True)
|
|
|
|
QUIET = args.quiet
|
|
HKID = args.origin
|
|
WG = args.genossenschaft
|
|
|
|
DB_CNX = sqlite3.connect(args.database)
|
|
|
|
migrate_gradation(args.in_dir, args.out_dir)
|
|
migrate_branches(args.in_dir, args.out_dir)
|
|
migrate_grosslagen(args.in_dir, args.out_dir)
|
|
migrate_gemeinden(args.in_dir, args.out_dir)
|
|
migrate_reeds(args.in_dir, args.out_dir)
|
|
migrate_attributes(args.in_dir, args.out_dir)
|
|
migrate_cultivations(args.in_dir, args.out_dir)
|
|
migrate_members(args.in_dir, args.out_dir)
|
|
migrate_area_commitments(args.in_dir, args.out_dir)
|
|
migrate_deliveries(args.in_dir, args.out_dir)
|
|
migrate_payments(args.in_dir, args.out_dir)
|
|
migrate_parameters(args.in_dir, args.out_dir)
|
|
|
|
DB_CNX.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|