migrate.py: First step to migrate baden

This commit is contained in:
2024-02-15 07:27:56 +01:00
parent fa8568af2b
commit 9a7a8e7c29
3 changed files with 255 additions and 71 deletions

View File

@ -2,10 +2,16 @@
INSERT INTO AT_plz_dest (plz, okz, dest)
VALUES (2241, 3560, 'Schönkirchen-Reyersdorf'),
(2165, 5013, 'Drasenhofen'),
(2134, 5115, 'Staatz-Kautendorf');
(2134, 5115, 'Staatz-Kautendorf'),
(2560, 3388, 'Grillenberg');
DELETE FROM AT_plz_dest
WHERE (plz, okz)
IN ((2231, 5011),
(2561, 3388));
UPDATE AT_ort SET name = 'Etzmannsdorf am Kamp' WHERE okz = 3938;
UPDATE AT_ort SET kgnr = 18002 WHERE okz = 3779;
UPDATE AT_ort SET kgnr = 5005 WHERE okz = 3818;
DELETE FROM AT_plz_dest WHERE (plz, okz) = (2231, 5011);
UPDATE AT_ort SET kgnr = 23351 WHERE okz = 5280;
UPDATE AT_ort SET kgnr = 4311 WHERE okz = 3388;

View File

@ -1,3 +1,3 @@
-- This value MUST NOT be changed while other connections are open!
PRAGMA schema_version = 1500;
PRAGMA schema_version = 1600;

View File

@ -19,6 +19,8 @@ import utils
class WG(Enum):
MATZEN = 1
WINZERKELLER = 2
WEINLAND = 3
BADEN = 4
@classmethod
def from_str(cls, name: str):
@ -45,6 +47,7 @@ GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None
DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None
MODIFIER_MAP: Optional[Dict[str, Dict]] = None
SORT_MAP: Optional[Dict[str, str]] = None
ATTRIBUTE_MAP: Optional[Dict[str, str]] = None
PARAMETERS: Optional[Dict[str, str]] = None
AUSTRIA = 40
@ -112,6 +115,25 @@ ORT_NAMES: Dict[str, Optional[str]] = {
'Obersdorf': None,
'Sechshaus': None,
'Rußbach': None,
'Pfaffstätten': 'Pfaffstättner',
'Berndorf': None,
'Teesdorf': None,
'Baden': 'Badner',
'Dornau': None,
'Pottendorf': None,
'Möllersdorf': None,
'Wienersdorf': None,
'Münchendorf': None,
'Hernstein': None,
'Großau': None,
'Oberwaltersdorf': None,
'Vöslau': None,
'Tribuswinkel': 'Tribuswinkler',
'Sollenau': None,
'Gutenbrunn': None,
'Kottingbrunn': None,
'Siebenhaus': None,
'Mariazell': None,
}
STREET_NAMES: Dict[str, str] = {
@ -152,6 +174,40 @@ STREET_NAMES: Dict[str, str] = {
'U. Weißgasse Straße': 'Untere Weißgerberstraße',
'Dr. Josef-Levit-Straße': 'Dr.-Josef-Levit-Straße',
'Karl Katschthaler-Straße': 'Karl-Katschthaler-Straße',
'Pfaffstättnerstraße': 'Paffstättner Straße',
'Badnerstraße': 'Badner Straße',
'Anton Krennstraße': 'Anton-Krenn-Straße',
'Fr.Jonasstraße': 'Franz-Jonas-Straße',
'Wr.Neustädterstraße': 'Wiener Neustädter Straße',
'Wr. Neustädterstraße': 'Wiener Neustädter Straße',
'Wr. Neustäderstraße': 'Wiener Neustädter Straße',
'Ob.Ödlitzerstraße': 'Obere Ödlitzer Straße',
'Obere Ödlitzerstraße': 'Obere Ödlitzer Straße',
'Triesterstraße': 'Triester Straße',
'Dr. Dolpstraße': 'Dr.-Dolp-Straße',
'Wienersd.Hauptstr.': 'Wienersdorfer Hauptstraße',
'Wienersd.Hauptstraße': 'Wienersdorfer Hauptstraße',
'Tr.Bundesstr.': 'Triester Bundesstraße',
'Tr.Bundesstraße': 'Triester Bundesstraße',
'J.Brunastraße': 'Josef-Bruna-Straße',
'J. Brunastraße': 'Josef-Bruna-Straße',
'Ferdinand Pichlergasse': 'Ferdinand-Pichler-Gasse',
'Dr. Figlstraße': 'Dr.-Figl-Straße',
'Franz Broschekplatz': 'Franz-Borschek-Platz',
'Tribuswinklerstraße': 'Tribuswinkler Straße',
'Rudolf Kaspargasse': 'Rudolf-Kaspar-Gasse',
'Traiskirchnerstraße': 'Traiskirchner Straße',
'Dr. Theodor Körnerstraße': 'Dr.-Theodor-Körner-Straße',
'Richard Klingerstraße': 'Richard-Klinger-Straße',
'Karl Langegasse': 'Karl-Lange-Gasse',
'Leopold Hörbingerstraße': 'Leopold-Hörbiger-Straße',
'Leopold Hörbinger Straße': 'Leopold-Hörbiger-Straße',
'Rudolf Zöllnergasse': 'Rudolf-Zöllner-Gasse',
'Anton Rauchstraße': 'Anton-Rauch-Straße',
'Isabellestraße': 'Erzherzogin-Isabelle-Straße',
'Erzherzogin Isabelle Straße': 'Erzherzogin-Isabelle-Straße',
'E. Penzig Franz Straße': 'Edgar-Penzing-Franz-Straße',
'Hernsteinerstr Straße': 'Hernsteiner Straße',
}
@ -249,16 +305,14 @@ def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]:
nr = '+' + nr
elif CLIENT == WG.WINZERKELLER and ort:
ort = ort.upper().strip()
if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF',
'EIBESBRUNN'):
if ort in ('PILLICHSDORF', 'OBERSDORF', 'WOLKERSDORF', 'WOLFPASSING', 'PUTZING', 'GROSSENGERSDORF', 'EIBESBRUNN'):
nr = f'+43 2245 {nr}'
elif ort in ('ALBERNDORF', 'HAUGSDORF', 'AUGGENTHAL', 'HAUGSDORF'):
nr = f'+43 2944 {nr}'
elif ort in ('HADRES'):
nr = f'+43 2943 {nr}'
else:
print(nr, ort)
raise RuntimeError()
raise RuntimeError(f'Unable to find telephone number of "{ort}" ({nr})')
if nr.startswith('+43'):
if nr[4] == '6':
nr = nr.replace(' ', '')
@ -272,6 +326,22 @@ def normalize_phone_nr(nr: Optional[str], ort: str = None) -> Optional[str]:
return nr.strip()
def check_phone_nr(nr: str, mgnr: int, active: Optional[bool]) -> Tuple[Optional[str], Optional[str], Optional[str]]:
m = re.fullmatch(r'(.*?) ([A-Za-zäöüÄÖÜßẞ]+)$', nr)
if m is not None:
nr = m.group(1)
comment = m.group(2).strip()
if comment == 'Fi':
comment = 'Firma'
else:
comment = None
nnr = normalize_phone_nr(nr)
if len(nnr) <= 10 or nnr[0] != '+' or re.fullmatch(r'[+0-9 \-]+', nnr) is None:
invalid(mgnr, 'Tel.Nr.', nr, active)
return nnr, None, None
return nnr, 'mobile' if nnr[4] == '6' else 'landline', comment
def fix_street_name(name: str) -> str:
if name in STREET_NAMES:
return STREET_NAMES[name]
@ -322,6 +392,22 @@ def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] =
plz = 2023
elif ort.upper() == 'NIEDERSCHLEINZ' and plz == 3721:
plz = 3714
elif ort.upper() == 'OEYNHAUSEN' and plz == 2500:
plz = 2512
elif ort.upper() == 'MÖLLERSDORF' and plz == 2513:
plz = 2514
elif ort.upper() == 'SOOSS' and plz == 2500:
ort = 'SOOẞ'
plz = 2504
elif ort.upper() == 'ÖDLITZ' and plz == 2560:
ort = 'BERNDORF'
elif ort.upper() == 'ST.VEIT' and plz == 2562:
ort = 'BERNDORF'
plz = 2560
elif ort.upper() == 'SCHÖNAU/TRIESTING' and plz == 2525:
ort = 'SCHÖNAU AN DER TRIESTING'
elif ort.upper() == 'BAD FISCHAU - BRUNN' and plz == 2721:
ort = 'BAD FISCHAU-BRUNN'
cur = DB_CNX.cursor()
cur.execute("SELECT o.okz, p.dest, o.name FROM AT_plz_dest p JOIN AT_ort o ON o.okz = p.okz WHERE plz = ?", (plz,))
@ -347,6 +433,11 @@ def lookup_plz(plz: Optional[int], ort: Optional[str], address: Optional[str] =
else:
# Götzendorf
return plz * 100000 + 3571
elif ort == 'BAD FISCHAU-BRUNN':
if 'viaduktstraße' in address.lower():
return plz * 100000 + 6560
elif 'teichplatz' in address.lower():
return plz * 100000 + 6560
raise RuntimeError(f'PLZ not found ({plz} {ort}, {address})')
@ -439,6 +530,38 @@ def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
gem_name = 'Poysdorf'
elif name.lower() == 'nappersdorf-kammersdorf':
return [(9008, 31028), (9026, 31028), (9032, 31028), (9037, 31028), (9051, 31028), (9067, 31028)]
elif CLIENT == WG.WEINLAND:
hkid = "'WLWV'"
elif CLIENT == WG.BADEN:
hkid = "'WLTH'"
if name.lower() == 'baden':
gem_name = 'Baden'
elif name.lower() in ('bad fischau-brunn', 'bad fischau - brunn'):
return [(23402, 32301), (23401, 32301)]
elif name.lower() == 'bad vöslau':
return [(4005, 30603), (4009, 30603), (4035, 30603)]
elif name.lower() == 'berndorf':
return [(4303, 30605), (4304, 30605), (4032, 30605), (4305, 30605)]
elif name.lower() in ('berndorf-ödlitz', 'ödlitz'):
return [(4304, 30605)]
elif name.lower() == 'eggendorf':
return [(23437, 32305), (23426, 32305)]
elif name.lower() == 'purkersdorf':
return []
elif name.lower() == 'schönau':
gem_name = 'Schönau an der Triesting'
elif name.lower() == 'siegersdorf':
gem_name = 'Pottendorf'
elif name.lower() == 'sooss':
name = 'Sooß'
elif name.lower() == 'st.veit':
return [(4303, 30605)]
elif name.lower() == 'wien':
return []
elif name.lower() == 'gramatneusiedl':
return []
else:
raise NotImplementedError(f'Gemeinde lookup for {CLIENT} not yet implemented')
cur = DB_CNX.cursor()
cur.execute("SELECT k.kgnr, k.name, g.gkz, g.name "
@ -456,8 +579,7 @@ def lookup_gem_name(name: str) -> List[Tuple[int, int]]:
if len(rows) == 1:
return [(k, g) for k, _, g, _ in rows]
print(name, rows)
raise RuntimeError()
raise RuntimeError(f'Unable to find Gemeinde "{name}" ({rows})')
def lookup_kg_name(kgnr: int) -> str:
@ -477,8 +599,10 @@ def lookup_hkid(kgnr: Optional[int], qualid: str) -> str:
if qualid in ('WEI', 'RSW'):
return 'OEST'
elif kgnr is None:
if CLIENT in (WG.MATZEN, WG.WINZERKELLER):
if CLIENT in (WG.MATZEN, WG.WINZERKELLER, WG.BADEN):
hkid = 'WLNO'
else:
raise NotImplementedError(f'Default hkid for {CLIENT} not implemented yet')
else:
cur = DB_CNX.cursor()
cur.execute("SELECT wb.hkid FROM AT_kg kg JOIN AT_gem g ON g.gkz = kg.gkz JOIN wb_gem wb ON wb.gkz = g.gkz "
@ -605,6 +729,8 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None:
def migrate_attributes(in_dir: str, out_dir: str) -> None:
global ATTRIBUTE_MAP
ATTRIBUTE_MAP = {}
with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
f.header('attrid', 'name', 'active', 'max_kg_per_ha', 'strict', 'fill_lower')
for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
@ -613,10 +739,21 @@ def migrate_attributes(in_dir: str, out_dir: str) -> None:
max_kg = int(a['KgProHa']) if a['KgProHa'] is not None else None
if max_kg == 10_000:
max_kg = None
f.row(a['SANR'], a['Attribut'], True, max_kg, False, 0)
attrid = a['SANR']
if attrid == 'BIO':
attrid = 'B'
ATTRIBUTE_MAP[a['SANR']] = attrid
f.row(attrid, a['Attribut'], True, max_kg, False, 0)
if CLIENT == WG.MATZEN:
f.row('M', 'Matzen', False, None, False, 0)
f.row('HU', 'Huber', False, None, False, 0)
ATTRIBUTE_MAP['M'] = 'M'
ATTRIBUTE_MAP['HU'] = 'HU'
elif CLIENT == WG.BADEN:
f.row('D', 'DAC', False, 7500, False, 0)
f.row('K', 'Kabinett', False, None, False, 0)
ATTRIBUTE_MAP['D'] = 'D'
ATTRIBUTE_MAP['K'] = 'K'
def migrate_cultivations(in_dir: str, out_dir: str) -> None:
@ -645,7 +782,8 @@ def migrate_area_commitment_types(in_dir: str, out_dir: str) -> None:
if not sortid or sortid == 'SV':
continue
menge = int(t['ErwarteteLiefermengeProHa'])
f.row(sortid + (t['SANR'] or ''), sortid[:2], t['SANR'] or sortid[2:] or None, None, menge,
attrid = ATTRIBUTE_MAP[t['SANR']] if t['SANR'] else None
f.row(sortid + (attrid or ''), sortid[:2], attrid or sortid[2:] or None, None, menge,
None, None, None)
bio = []
if CLIENT == WG.MATZEN:
@ -671,6 +809,9 @@ def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Op
return None, None, None, None, None, 'Landwirtschaftliche Fachschule Mistelbach'
elif 'lagerhaus' in family_name.lower() and given_name == 'HOLLABRUNN-HORN':
return None, None, None, None, None, 'Raiffeisen-Lagerhaus Hollabrunn-Horn eGen'
elif CLIENT == WG.BADEN:
if family_name in ('Marktgemeinde', 'Weinbauverein'):
return None, None, None, None, None, f'{family_name} {given_name}'
if given_name.lower() not in ('kg', 'gesbr', 'gesnbr') and \
len(family_name) > 0 and len(given_name) > 0 and is_alpha(family_name) and is_alpha(given_name):
@ -689,7 +830,8 @@ def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Op
given_name = given_name.replace('+', ' + ').replace('JOS ', 'JOS. ')
given_name = re.sub(r' ?\((.+?)(, ?(.*?))?\)',
lambda m: f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name)
lambda m: m.group(0) if m.group(1) == 'FH' else
f' + {m.group(1)}{" + " + m.group(3) if m.group(2) else ""}', given_name)
given_name = re.sub(r' u\. ?| und ', ' + ', given_name, flags=re.IGNORECASE)
titles = ''
@ -703,35 +845,52 @@ def normalize_name(family_name: str, given_name: str) -> Tuple[Optional[str], Op
case 'dr': titles += 'Dr. '
case 'mag': titles += 'Mag. '
case 'ing': titles += 'Ing. '
case 'dipling': titles += 'Dipl.-Ing. '
case 'di': titles += 'Dipl.-Ing. '
case 'di(fh)': titles += 'DI (FH) '
case 'dipling': titles += 'DI '
case 'dipli': titles += 'DI '
case 'di': titles += 'DI '
case 'dkfm': titles += 'Dipl.-Kfm. '
case 'ökrat': titles += 'ÖkR '
case 'lkr': titles += 'ÖkR '
return ' '
title_re = re.compile(r',?\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?ing|di|ök\.rat|lkr)\b\.?', re.IGNORECASE)
title_re = re.compile(r',?((di ?\(fh\))|\b(dr|ing|mag|jun|sen|dkfm|dipl\. ?-?i(ng)?|di|ök\.rat|lkr)\b)\.?', re.IGNORECASE)
given_name = utils.remove_spaces(re.sub(title_re, repl_title, given_name))
family_name = utils.remove_spaces(re.sub(title_re, repl_title, family_name))
if titles:
prefix = titles.strip()
if given_name.lower() in ('weingut', 'weinbau'):
parts = family_name.split(' ')
return prefix, None, middle_names, ' '.join(parts[:-1]), suffix, given_name + ' ' + ' '.join(parts)
elif given_name.lower().startswith('weinbau ') or given_name.startswith('weingut '):
parts = given_name.split(' ')
return prefix, None, middle_names, family_name, suffix, ' '.join(parts[:-1]) + ' ' + family_name + ' ' + parts[-1]
elif family_name.lower() in ('weingut', 'weinbau'):
parts = given_name.split(' ')
return prefix, None, middle_names, ' '.join(parts[:-1]), suffix, family_name + ' ' + ' '.join(parts)
family_parts = family_name.split(' ')
last = family_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr', 'og', 'gmbh'):
family_name = ' '.join(family_parts[:-1])
if ' ' not in family_name and len(family_name) > 4:
family_name = family_name.title()
billing_name = family_name + ' ' + ('KG' if last == 'kg' else 'KEG' if last == 'keg.' else 'GesbR')
if is_alpha(given_name):
if family_name.startswith('Gem.'):
family_name = 'GeM ' + family_name[5:]
billing_name = family_name + ' ' + {'kg': 'KG', 'keg.': 'KEG', 'og': 'OG', 'gmbh': 'GmbH'}.get(last, 'GesbR')
if given_name.count(' ') == 1:
parts = given_name.split(' ')
return None, parts[0], None, parts[1], None, billing_name
elif is_alpha(given_name):
return prefix, given_name.title(), middle_names, family_name, suffix, billing_name
given_parts = given_name.split(' ')
last = given_parts[-1].lower()
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr'):
if last in ('kg', 'keg.', 'gesbr', 'gnbr', 'gesnbr', 'gsbr', 'og', 'gmbh'):
given_name = ' '.join(given_parts[:-1]).title()
family_name = family_name.title()
billing_name = f'{family_name} {"KG" if last == "kg" else "KEG" if last == "keg." else "GesbR"}'
billing_name = family_name + ' ' + {'kg': 'KG', 'keg.': 'KEG', 'og': 'OG', 'gmbh': 'GmbH'}.get(last, 'GesbR')
return prefix, given_name, middle_names, family_name, suffix, billing_name
if ' ' in family_name or '.' in family_name:
@ -767,11 +926,11 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
mgnrs = [m['MGNR'] for m in members]
fbs = parse_flaechenbindungen(in_dir)
with utils.csv_open(f'{out_dir}/member.csv') as f_m, \
with (utils.csv_open(f'{out_dir}/member.csv') as f_m, \
utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba, \
utils.csv_open(f'{out_dir}/member_telephone_number.csv') as f_tel, \
utils.csv_open(f'{out_dir}/member_email_address.csv') as f_email, \
utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg:
utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg):
f_m.header(
'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
@ -789,6 +948,10 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
if family_name is None and given_name is None:
continue
elif m['Anmerkung'] == 'Musterbetrieb':
continue
elif CLIENT == WG.BADEN and family_name == 'Winzergenoss.':
continue
given_name = given_name or ''
if CLIENT == WG.MATZEN and given_name.startswith(' '):
@ -861,6 +1024,14 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
bic = 'RLNWATWWAUE'
elif bic == 'RLNWATWMIB':
bic = 'RLNWATWWMIB'
elif bic == 'VBÖEATWW':
bic = 'VBOEATWW'
elif bic == 'RLNWATBAD':
bic = 'RLNWATWWBAD'
elif bic == 'SPBDATXXX':
bic = 'SPBDAT21'
elif bic == 'IBAATWWXXX':
bic = 'GIBAATWW'
if not BIC_RE.fullmatch(bic):
invalid(mgnr, 'BIC', bic, active)
bic = None
@ -881,8 +1052,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
ort = ' '.join(parts[:-1])
new_address = parts[-1]
if address is not None and address != ' ' and address != new_address:
print(address, new_address)
raise RuntimeError()
raise RuntimeError(f'Unable to rewrite address: "{address}" -> "{new_address}"')
address = parts[-1]
if CLIENT == WG.WINZERKELLER and ort == 'JETZELDORF':
ort = 'JETZELSDORF'
@ -898,9 +1068,9 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
address.startswith('Nr ') or \
(len(address) > 0 and address[0].isdigit()):
address = ort.title() + ' ' + address.split(' ')[-1]
address = address.replace('strasse', 'straße').replace('strassse', 'straße')\
.replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße')\
.replace('str.', 'straße').replace('ster.', 'straße').replace('g. ', 'gasse ')\
address = address.replace('Pelz.', 'Pelzg.').replace('strasse', 'straße').replace('strassse', 'straße')\
.replace('Strasse', 'Straße').replace('Str.', 'Straße').replace('stasse', 'straße').replace('st.', 'straße ')\
.replace('str.', 'straße').replace('ster.', 'straße').replace('g.', 'gasse ').replace('pl.', 'platz ')\
.replace('Gross', 'Groß').replace('Bockfliess', 'Bockfließ').replace('Weiss', 'Weiß')\
.replace('Preussen', 'Preußen').replace('Schloss', 'Schloß').replace('luss', 'luß')\
.replace('Haupstraße', 'Hauptstraße').replace('Russ', 'Ruß').replace('Ross', 'Roß')
@ -908,7 +1078,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
if address.startswith('Ob. '):
address = address.replace('Ob. ', 'Obere ', 1)
address = address.replace(' Nr. ', ' ')
address = re.sub(r'([^0-9]+?)( [0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address)
address = re.sub(r'([^0-9]+?)( +[0-9])', lambda a: fix_street_name(a.group(1)) + a.group(2), address)
address = utils.remove_spaces(address)
if address_old != address:
convert(mgnr, 'Adresse', address_old, address)
@ -916,7 +1086,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
email: Optional[str] = m['EMail']
emails = []
if email is not None:
for email in email.split(' + '):
for email in (email.split(' ') if CLIENT == WG.BADEN else email.split(' + ')):
if email.isupper():
email = email.lower()
if not EMAIL_RE.fullmatch(email):
@ -967,6 +1137,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
phone_1: Optional[str] = m['Telefon']
phone_2: Optional[str] = m['Telefax']
phone_3: Optional[str] = m['Mobiltelefon']
phone_4: Optional[str] = m['EMail'] if m['EMail'] and '@' not in m['EMail'] else None
numbers = []
if CLIENT == WG.WINZERKELLER:
@ -1063,35 +1234,27 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
f_tel.row(mgnr, count, 'fax', nr, data['comment'])
else:
if phone_1:
phone_1 = normalize_phone_nr(phone_1)
if len(phone_1) <= 10 or phone_1[0] != '+':
invalid(mgnr, 'Tel.Nr.', m['Telefon'], active)
else:
phone_1, t, c = check_phone_nr(phone_1, mgnr, active)
if t is not None:
numbers.append(phone_1)
if phone_1[4] == '6':
f_tel.row(mgnr, len(numbers), 'mobile', phone_1, None)
else:
f_tel.row(mgnr, len(numbers), 'landline', phone_1, None)
f_tel.row(mgnr, len(numbers), t, phone_1, c)
if phone_2:
phone_2 = normalize_phone_nr(phone_2)
if len(phone_2) <= 8 or phone_2[0] != '+':
invalid(mgnr, 'Fax.Nr.', m['Telefax'], active)
else:
phone_2, t, c = check_phone_nr(phone_2, mgnr, active)
if t is not None:
numbers.append(phone_2)
if phone_2[4] == '6':
f_tel.row(mgnr, len(numbers), 'mobile', phone_2, None)
else:
f_tel.row(mgnr, len(numbers), 'fax', phone_2, None)
f_tel.row(mgnr, len(numbers), 'fax' if t == 'landline' else t, phone_2, c)
if phone_3:
phone_3 = normalize_phone_nr(phone_3)
if len(phone_3) <= 10 or phone_3[0] != '+':
invalid(mgnr, 'Tel.Nr.', m['Mobiltelefon'], active)
elif phone_3 not in numbers:
if phone_3.startswith('Handy'):
phone_3 = phone_3[5:].strip(':').strip()
phone_3, t, c = check_phone_nr(phone_3, mgnr, active)
if t is not None and phone_3 not in numbers:
numbers.append(phone_3)
if phone_3[4] == '6':
f_tel.row(mgnr, len(numbers), 'mobile', phone_3, None)
else:
f_tel.row(mgnr, len(numbers), 'landline', phone_3, None)
f_tel.row(mgnr, len(numbers), t, phone_3, c)
if phone_4:
phone_4, t, c = check_phone_nr(phone_4, mgnr, active)
if t is not None and phone_4 not in numbers:
numbers.append(phone_4)
f_tel.row(mgnr, len(numbers), t, phone_4, c)
for i, email in enumerate(emails):
f_email.row(mgnr, i + 1, email, None)
@ -1267,7 +1430,8 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
convert(mgnr, f'GstNr. ({fbnr})', parz, gstnr)
to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
f_fb.row(fbnr, mgnr, fb['SNR'] + (fb['SANR'] or ''), CULTIVATION_MAP[fb['BANR'] or 1], area,
attrid = ATTRIBUTE_MAP[fb['SANR']] if fb['SANR'] else None
f_fb.row(fbnr, mgnr, fb['SNR'] + (attrid or ''), CULTIVATION_MAP[fb['BANR'] or 1], area,
kgnr, gstnr, rdnr, fb['Von'], to, comment)
@ -1355,8 +1519,10 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
5: 'VT', 6: 'MV', 7: 'UP', 8: 'VL',
9: 'DN', 10: 'SA', 11: 'DA', 12: 'EG',
}[nr]
elif CLIENT == WG.BADEN:
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'UE'
else:
raise NotImplementedError()
raise NotImplementedError(f'Modifier migration for {CLIENT} not yet implemented')
deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
delivery_dict = {d['LINR']: d for d in deliveries}
@ -1412,8 +1578,9 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
oe = d['OechsleOriginal'] or d['Oechsle']
kmw = GRADATION_MAP[oe]
sortid = d['SNR'].upper()
if d['SANR']:
attributes.add(d['SANR'])
attrid = ATTRIBUTE_MAP[d['SANR']] if d['SANR'] else None
if attrid:
attributes.add(attrid)
if len(sortid) != 2:
attributes.add(sortid[2:])
sortid = sortid[:2]
@ -1437,10 +1604,13 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
elif CLIENT == WG.WINZERKELLER:
if 'F' in attributes:
attributes.remove('F')
elif CLIENT == WG.BADEN:
if sortid == 'GO':
sortid = 'SO'
if d['SNR'] != sortid:
SORT_MAP[f'{d["SNR"]}/{d["SANR"] or ""}'] = f'{sortid}/{",".join(list(attributes)) or ""}'
line = f'{d["SNR"]}/{d["SANR"]} -> {sortid}/{",".join(list(attributes)) or None}'
SORT_MAP[f'{d["SNR"]}/{attrid or ""}'] = f'{sortid}/{",".join(list(attributes)) or ""}'
line = f'{d["SNR"]}/{attrid} -> {sortid}/{",".join(list(attributes)) or None}'
if line not in updated_varieties:
updated_varieties[line] = 0
updated_varieties[line] += 1
@ -1481,9 +1651,10 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
if waage:
# Waagenr: 1 ID: 19
# Waagennummer: 1 Speichernummer: 9166
# 1
waage = re.split(r' +', waage)
scale_id = waage[1]
weighing_id = waage[3] if waage[2] == 'Speichernummer:' else f'{date}/{waage[3]}'
scale_id = waage[1] if len(waage) > 2 else '1'
weighing_id = waage[-1] if len(waage) > 2 and waage[2] == 'Speichernummer:' else f'{date}/{waage[-1]}'
elif len(glob_waage) == 0 and not handwiegung:
handwiegung = True
@ -1639,17 +1810,20 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
'AuszahlungSortenQualitätsstufe': {},
}
gb = data['Grundbetrag'] or 0
gbzs = data['GBZS']
azs = data['AuszahlungSorten']
for s in sort_map.get(p['AZNR'], []):
del s['AZNR']
del s['ID']
if s['Oechsle'] is None:
continue
key = SORT_MAP.get(f'{s["SNR"]}/{s["SANR"] or ""}', f'{s["SNR"].upper()}/{s["SANR"] or ""}')
attrid = ATTRIBUTE_MAP[s['SANR']] if s['SANR'] else None
key = SORT_MAP.get(f'{s["SNR"]}/{attrid or ""}', f'{s["SNR"].upper()}/{attrid or ""}')
if key is None or len(key) < 3:
continue
azs[key] = azs.get(key, {'Gebunden': {}, 'NichtGebunden': {}})
azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = s['Betrag']
azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = round(s['Betrag'] + gb, WGMASTER_PRECISION)
curves = []
curve_zero = False
for key, d1 in azs.items():
@ -1692,8 +1866,8 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
curves.insert(0, {
'id': 0,
'mode': 'oe',
'data': data['Grundbetrag'],
'geb': data['GBZS'] or 0
'data': gb,
'geb': gbzs or 0
})
data['Kurven'] = curves
azq = data['AuszahlungSortenQualitätsstufe']
@ -1701,11 +1875,12 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
del q['AZNR']
del q['ID']
qualid = QUAL_MAP[q['QSNR']]
key = SORT_MAP.get(f'{q["SNR"]}/{q["SANR"] or ""}', f'{q["SNR"].upper()}/{q["SANR"] or ""}')
attrid = ATTRIBUTE_MAP[s['SANR']] if s['SANR'] else None
key = SORT_MAP.get(f'{q["SNR"]}/{attrid or ""}', f'{q["SNR"].upper()}/{attrid or ""}')
if key is None or len(key) < 3:
continue
azq[qualid] = azq.get(qualid, {})
azq[qualid][key] = q['Betrag'] or 0
azq[qualid][key] = round((q['Betrag'] or 0) + gb, WGMASTER_PRECISION)
for qualid, d1 in azq.items():
azq[qualid] = collapse_data(d1)
@ -1735,7 +1910,8 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
b1 = gew - geb_gew
b2 = geb_gew
f_bucket.row(y, did, dpnr, 0, '_', b1)
f_bucket.row(y, did, dpnr, 1, p['SANR'] or '', b2)
attrid = ATTRIBUTE_MAP[p['SANR']] if p['SANR'] else None
f_bucket.row(y, did, dpnr, 1, attrid or '', b2)
for aznr, avnr, tznr in variant_year_map[y]:
val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung']
val = round(val * pow(10, WGMASTER_PRECISION))
@ -1752,7 +1928,9 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None:
}
tokens = {
WG.MATZEN: ('WGM', 'WG Matzen'),
WG.WINZERKELLER: ('WKW', 'Winzerkeller')
WG.WINZERKELLER: ('WKW', 'Winzerkeller'),
WG.WEINLAND: ('WGW', 'WG Weinland'),
WG.BADEN: ('WGB', 'WG Baden')
}.get(CLIENT, (None, None))
ort = PARAMETERS['MANDANTENORT'].title()