Migrate payments

This commit is contained in:
2023-07-18 12:29:33 +02:00
parent b8faeb636f
commit 7e6e0c0fca
3 changed files with 220 additions and 47 deletions

View File

@ -9,6 +9,7 @@ import sys
import sqlite3
import requests
import datetime
import json
import utils
@ -30,8 +31,11 @@ REED_MAP: Optional[Dict[int, Tuple[int, int, str]]] = None
GROSSLAGE_MAP: Optional[Dict[int, int]] = None
MEMBER_MAP: Optional[Dict[int, Dict[str, Any]]] = None
GROSSLAGE_KG_MAP: Optional[Dict[int, int]] = None
DELIVERY_MAP: Optional[Dict[int, Tuple[int, int, int]]] = None
MODIFIER_MAP: Optional[Dict[str, Dict]] = None
AUSTRIA = 40
WGMASTER_PRECISION = 4
QUAL_MAP: Dict[int, str] = {
0: 'WEI',
@ -1221,14 +1225,17 @@ def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str,
def migrate_deliveries(in_dir: str, out_dir: str) -> None:
global DELIVERY_MAP, MODIFIER_MAP
DELIVERY_MAP, MODIFIER_MAP = {}, {}
modifiers = {m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']}
delivery_map = {}
seasons = {}
branches = {}
for mod in modifiers.values():
name: str = mod['Bezeichnung']
nr: int = mod['ASNR']
MODIFIER_MAP[name] = mod
if WG == 'MATZEN':
mod['id'] = name[-1] if name.startswith('Klasse') else 'TB' if name == 'Treuebonus' else 'PZS'
elif WG == 'GWK':
@ -1254,7 +1261,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
if date.year not in seasons:
seasons[date.year] = {
'currency': 'EUR' if date.year >= 2001 else 'ATS',
'precision': 4,
'precision': WGMASTER_PRECISION,
'start': date,
'end': date,
'nr': 0,
@ -1284,7 +1291,7 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
attributes = set()
for dpnr, linr in enumerate(linrs, start=1):
d = delivery_dict[linr]
delivery_map[linr] = (date.year, snr, dpnr)
DELIVERY_MAP[linr] = (date.year, snr, dpnr)
if lsnr != d['Lieferscheinnummer']:
renumber_delivery(d['Lieferscheinnummer'], lsnr)
@ -1377,10 +1384,10 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
f_part_mod.header('year', 'did', 'dpnr', 'modid')
for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
if m['LINR'] not in delivery_map:
if m['LINR'] not in DELIVERY_MAP:
continue
nid = delivery_map[m['LINR']]
f_part_mod.row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id'])
y, did, dpnr = DELIVERY_MAP[m['LINR']]
f_part_mod.row(y, did, dpnr, modifiers[m['ASNR']]['id'])
with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
@ -1389,13 +1396,159 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
for y, s in seasons.items():
f_season.row(y, s['currency'], s['precision'], s['start'], s['end'])
for m in modifiers.values():
abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
abs_v = round(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'],
m.get('Standard', False), m['Schnellauswahl'])
def migrate_payments(in_dir: str, out_dir: str) -> None:
pass # TODO migrate payments
variant_map: Dict[int, Tuple[int, int]] = {}
variant_year_map: Dict[int, List[Tuple[int, int, int]]] = {}
year_map = {}
az_map = {}
p_sort = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSorten.csv'))
sort_map = {i: [s for s in p_sort if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_sort])}
p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv'))
qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])}
with utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment:
f_payment.header('year', 'avnr', 'name', 'date', 'test_variant',
'bucket_1_name', 'bucket_2_name', 'bucket_3_name', 'comment', 'data')
for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'):
year = p['Lesejahr']
if year is None:
continue
if year not in year_map:
year_map[year] = 0
year_map[year] += 1
variant_map[p['AZNR']] = (year, year_map[year])
var = p.copy()
del var['AZNR']
del var['Datum']
del var['Beschreibung']
del var['Lesejahr']
del var['Titel']
del var['TeilzahlungNr']
data = {
'mode': 'wgmaster',
**var,
'AuszahlungSorten': {},
'AuszahlungSortenQualitätsstufe': {},
}
azs = data['AuszahlungSorten']
for s in sort_map[p['AZNR']]:
del s['AZNR']
del s['ID']
if s['Oechsle'] is None:
continue
snr = s['SNR'].upper()
sanr = s['SANR'] or ''
azs[snr] = azs.get(snr, {})
azs[snr][sanr] = azs[snr].get(sanr, {})
geb = 'Gebunden' if s['gebunden'] else 'NichtGebunden'
azs[snr][sanr][geb] = azs[snr][sanr].get(geb, {})
azs[snr][sanr][geb][s['Oechsle']] = s['Betrag']
for sortid, d1 in azs.items():
for attrid, d2 in d1.items():
for geb, d3 in d2.items():
oe = [d3.get(n, 0.0) for n in range(max(d3.keys()) + 1)]
azs[sortid][attrid][geb] = oe
azq = data['AuszahlungSortenQualitätsstufe']
for q in qual_map.get(p['AZNR'], []):
del q['AZNR']
del q['ID']
qualid = QUAL_MAP[q['QSNR']]
snr = q['SNR']
sanr = q['SANR'] or ''
azq[qualid] = azq.get(qualid, {})
azq[qualid][snr] = azq[qualid].get(snr, {})
azq[qualid][snr][sanr] = q['Betrag']
for qualid, d1 in azq.items():
for sortid, d2 in d1.items():
if len(set(d2.values())) == 1:
azq[qualid][sortid] = list(d2.values())[0]
for qualid, d1 in azq.items():
try:
if len(set(d1.values())) == 1:
azq[qualid] = list(d1.values())[0]
except TypeError:
pass
for k, v in data.copy().items():
if v is None or (type(v) == bool and not v):
del data[k]
az_map[p['AZNR']] = data
test = (p['TeilzahlungNr'] == 7)
if not test:
if year not in variant_year_map:
variant_year_map[year] = []
variant_year_map[year].append((p['AZNR'], year_map[year], p['TeilzahlungNr']))
f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test,
'Gebunden', 'Nicht gebunden', 'Abgewertet', p['Beschreibung'], json.dumps(data))
def get_modifiers(modifiers: str) -> Tuple[int, float]:
if modifiers is None or modifiers == '':
return 0, 0.0
a, r = 0, 0.0
for name in modifiers.split(' / '):
mod = MODIFIER_MAP[name]
if mod['AZASProzent']:
r += mod['AZASProzent']
if mod['AZAS']:
a += round(mod['AZAS'] * pow(10, WGMASTER_PRECISION))
return a, r
def get_prices(aznr: int, sortid: str, attribute: Optional[str], oe: int) -> Tuple[int, int, int]:
az = az_map[aznr]
qs = az['AuszahlungSortenQualitätsstufe']
so = az['AuszahlungSorten']
p1, p2, p3 = 0, 0, 0
if qs:
p3 = qs['WEI']
if type(p3) == dict:
p3 = p3[sortid]
if type(p3) == dict:
p3 = p3[attribute or '']
if sortid.upper() in so:
so = so[sortid.upper()][attribute or '']
p2 = so['NichtGebunden'][oe]
p1 = so['Gebunden'][oe] if 'Gebunden' in so else p2
prec = pow(10, WGMASTER_PRECISION)
return round(p1 * prec), round(p2 * prec), round(p3 * prec)
with utils.csv_open(f'{out_dir}/payment_delivery.csv') as f_del_pay:
f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'bucket_1', 'bucket_2', 'bucket_3', 'amount')
for p in utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'):
if p['LINR'] not in DELIVERY_MAP:
continue
y, did, dpnr = DELIVERY_MAP[p['LINR']]
if y not in variant_year_map:
continue
for aznr, avnr, tznr in variant_year_map[y]:
val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung']
val = round(val * pow(10, WGMASTER_PRECISION))
b1, b2, b3 = 0, 0, 0
# prices = get_prices(aznr, p['SNR'], p['SANR'], int(p['Oechsle']))
# mod = get_modifiers(p['BAbschlaegeString'])
# if not az_map[aznr].get('AbschlägeBerücksichtigen', False):
# mod = 0, 0.0
gew, geb_gew = int(p['Gewicht']), int(p['BGewichtGebunden'])
if QUAL_MAP[p['QSNR']] == 'WEI':
b3 += gew
else:
b2 += gew - geb_gew
b1 += geb_gew
# check_val = b1 * (prices[0] + mod[0]) + b2 * (prices[1] + mod[0]) + b3 * (prices[2] + mod[0])
# check_val *= 1 + mod[1]
# check_val = round(check_val / 100) * 100
# if check_val != val:
# print(p['LINR'], y, did, dpnr, avnr, val, check_val)
# else:
# print("GOOD")
f_del_pay.row(y, did, dpnr, avnr, b1, b2, b3, val)
def migrate_parameters(in_dir: str, out_dir: str) -> None: