From 9a240a6e9890fe17d5de3c9050842705bf6e5cac Mon Sep 17 00:00:00 2001 From: Lorenz Stechauner Date: Mon, 8 Jan 2024 19:32:13 +0100 Subject: [PATCH] migrate.py: Change payment variant data for wg master --- wgmaster/migrate.py | 211 ++++++++++++++++++++++++-------------------- 1 file changed, 114 insertions(+), 97 deletions(-) diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py index 8007d1b..dd8aea0 100755 --- a/wgmaster/migrate.py +++ b/wgmaster/migrate.py @@ -1554,7 +1554,66 @@ def migrate_payments(in_dir: str, out_dir: str) -> None: p_qual = list(utils.csv_parse_dict(f'{in_dir}/TAuszahlungSortenQualitätsstufe.csv')) qual_map = {i: [s for s in p_qual if s['AZNR'] == i] for i in set([s['AZNR'] for s in p_qual])} - with utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment: + def collapse_data(data: dict[str, Any]): + rev = {} + for k, v in data.items(): + if k == 'default' or k.startswith('/'): + continue + rev[v] = rev.get(v, []) + rev[v].append(k) + if 'default' not in data.keys(): + if len(rev) == 1: + return set(rev.keys()).pop() + for v, ks in rev.items(): + if len(ks) >= len(data) / 2: + for k in ks: + del data[k] + data['default'] = v + return collapse_data(data) + for idx1 in {'/' + k.split('/')[1] for k in data.keys() if '/' in k and len(k) > 3}: + len1 = len(list(k for k, _ in data.items() if k.endswith(idx1))) + for v, ks in rev.items(): + my_ks = list(k for k in ks if k.endswith(idx1)) + if len(my_ks) > 1 and len(my_ks) >= len1 / 2: + for k in my_ks: + del data[k] + data[idx1] = v + return data + + def collapse_curve(curve: list[float]): + n = {} + d = 0 + for oe, p0, p1, p2 in zip(range(0, len(curve) + 1), [0] + curve, curve, curve[1:] + [curve[len(curve) - 1]]): + d1, d2 = round(p1 - p0, WGMASTER_PRECISION), round(p2 - p1, WGMASTER_PRECISION) + if d1 == d: + continue + d = d2 + if p0 > 0: + n[f'{oe - 1}oe'] = p0 + n[f'{oe}oe'] = p1 + if curve[len(curve) - 1] > 0: + n[f'{len(curve) - 1}oe'] = curve[len(curve) - 1] + keys = list(n.keys()) + vals = list(n.values()) + while len(n) >= 2 and vals[0] == vals[1]: + del n[keys[0]] + del n[keys[1]] + n = {keys[1]: vals[1], **n} + keys = list(n.keys()) + vals = list(n.values()) + while len(n) >= 2 and vals[len(vals) - 1] == vals[len(vals) - 2]: + del n[keys[len(vals) - 1]] + del n[keys[len(vals) - 2]] + n = {**n, keys[len(vals) - 2]: vals[len(vals) - 2]} + keys = list(n.keys()) + vals = list(n.values()) + if len(n) == 0: + n = {'73oe': 0} + elif len(n) == 1: + n = {'73oe': list(n.values())[0]} + return n + + with (utils.csv_open(f'{out_dir}/payment_variant.csv') as f_payment): f_payment.header('year', 'avnr', 'name', 'date', 'test_variant', 'calc_time', 'comment', 'data') for p in utils.csv_parse_dict(f'{in_dir}/TAuszahlung.csv'): year = p['Lesejahr'] @@ -1584,68 +1643,69 @@ def migrate_payments(in_dir: str, out_dir: str) -> None: del s['ID'] if s['Oechsle'] is None: continue - snr = s['SNR'].upper() - sanr = s['SANR'] or '' - azs[snr] = azs.get(snr, {}) - azs[snr][sanr] = azs[snr].get(sanr, {}) - geb = 'Gebunden' if s['gebunden'] else 'NichtGebunden' - azs[snr][sanr][geb] = azs[snr][sanr].get(geb, {}) - azs[snr][sanr][geb][s['Oechsle']] = s['Betrag'] + key = f'{s["SNR"].upper()}/{s["SANR"] or ""}' + azs[key] = azs.get(key, {'Gebunden': {}, 'NichtGebunden': {}}) + azs[key]['Gebunden' if s['gebunden'] else 'NichtGebunden'][s['Oechsle']] = s['Betrag'] curves = [] - for sortid, d1 in azs.items(): - for attrid, d2 in d1.items(): - for geb, d3 in d2.items(): - oe = [d3.get(n, 0.0) for n in range(max(d3.keys()) + 1)] - if oe not in curves: - curves.append(oe) - azs[sortid][attrid][geb] = curves.index(oe) - for i, c in enumerate(curves): - n = {} - d = 0 - for oe, p0, p1, p2 in zip(range(0, len(c) + 1), [0] + c, c, c[1:] + [c[len(c) - 1]]): - d1, d2 = round(p1 - p0, 4), round(p2 - p1, 4) - if d1 == d: - continue - d = d2 - if p0 > 0: - n[f'{oe - 1}oe'] = p0 - n[f'{oe}oe'] = p1 - if c[len(c) - 1] > 0: - n[f'{len(c) - 1}oe'] = c[len(c) - 1] - keys = list(n.keys()) - vals = list(n.values()) - if len(n) >= 2 and vals[0] == vals[1]: - del n[keys[0]] - del n[keys[1]] - n = {keys[1]: vals[1], **n} - if len(n) == 1: - n = {'73oe': list(n.values())[0]} - curves[i] = n - azs['Kurven'] = curves + curve_zero = False + for key, d1 in azs.items(): + oe = [d1['NichtGebunden'].get(n, 0.0) for n in range(max(d1['NichtGebunden'].keys()) + 1)] + if len(d1['Gebunden']) > 0: + oe_geb = [d1['Gebunden'].get(n, 0.0) for n in range(max(d1['Gebunden'].keys()) + 1)] + else: + oe_geb = None + + if len(set(oe)) <= 2 and oe[0] == 0 and \ + (oe_geb is None or (len(set(oe_geb)) <= 2 and oe[len(oe) - 1] == oe_geb[len(oe_geb) - 1])): + azs[key] = oe[len(oe) - 1] + if azs[key] == 0: + azs[key] = 'curve:0' + curve_zero = True + else: + c = (oe, oe_geb) + if c not in curves: + curves.append(c) + azs[key] = f'curve:{curves.index(c) + 1}' + data['AuszahlungSorten'] = collapse_data(azs) + for i, cs in enumerate(curves): + c, c_geb = cs + geb = None + if c_geb is not None: + diff = {round(b - a, WGMASTER_PRECISION) for a, b in zip(c, c_geb)} + diff.remove(0.0) + if len(diff) == 1: + geb = diff.pop() + elif len(diff) > 1: + geb = collapse_curve(c_geb) + curves[i] = { + 'id': i + 1, + 'mode': 'oe', + 'data': collapse_curve(c), + } + if geb is not None: + curves[i]['geb'] = geb + if curve_zero: + curves.insert(0, { + 'id': 0, + 'mode': 'oe', + 'data': data['Grundbetrag'], + 'geb': data['GBZS'] or 0 + }) + data['Kurven'] = curves azq = data['AuszahlungSortenQualitätsstufe'] for q in qual_map.get(p['AZNR'], []): del q['AZNR'] del q['ID'] qualid = QUAL_MAP[q['QSNR']] - snr = q['SNR'] - sanr = q['SANR'] or '' + key = f'{q["SNR"].upper()}/{q["SANR"] or ""}' azq[qualid] = azq.get(qualid, {}) - azq[qualid][snr] = azq[qualid].get(snr, {}) - azq[qualid][snr][sanr] = q['Betrag'] + azq[qualid][key] = q['Betrag'] for qualid, d1 in azq.items(): - for sortid, d2 in d1.items(): - if len(set(d2.values())) == 1: - azq[qualid][sortid] = list(d2.values())[0] - for qualid, d1 in azq.items(): - try: - if len(set(d1.values())) == 1: - azq[qualid] = list(d1.values())[0] - except TypeError: - pass + azq[qualid] = collapse_data(d1) - for k, v in data.copy().items(): - if v is None or (type(v) == bool and not v): - del data[k] + for data_k, data_v in data.copy().items(): + if data_v is None or isinstance(data_v, bool) and not data_v: + del data[data_k] az_map[p['AZNR']] = data test = (p['TeilzahlungNr'] == 7) @@ -1656,37 +1716,6 @@ def migrate_payments(in_dir: str, out_dir: str) -> None: f_payment.row(year, year_map[year], p['Titel'], p['Datum'], test, None, p['Beschreibung'], json.dumps(data)) - def get_modifiers(modifiers: str) -> Tuple[int, float]: - if modifiers is None or modifiers == '': - return 0, 0.0 - a, r = 0, 0.0 - for name in modifiers.split(' / '): - mod = MODIFIER_MAP[name] - if mod['AZASProzent']: - r += mod['AZASProzent'] / 100.0 - if mod['AZAS']: - a += round(mod['AZAS'] * pow(10, WGMASTER_PRECISION)) - return a, r - - def get_prices(aznr: int, sortid: str, attribute: Optional[str], oe: int) -> Tuple[int, int, int]: - az = az_map[aznr] - qs = az['AuszahlungSortenQualitätsstufe'] - so = az['AuszahlungSorten'] - p1, p2, p3 = 0, 0, 0 - if qs: - p3 = qs['WEI'] - if type(p3) == dict: - p3 = p3[sortid] - if type(p3) == dict: - p3 = p3[attribute or ''] - if sortid.upper() in so: - so = so[sortid.upper()][attribute or ''] - p2 = so['NichtGebunden'][oe] - p1 = so['Gebunden'][oe] if 'Gebunden' in so else p2 - prec = pow(10, WGMASTER_PRECISION) - return round(p1 * prec), round(p2 * prec), round(p3 * prec) - - # TODO database migration with utils.csv_open(f'{out_dir}/payment_delivery_part.csv') as f_del_pay, \ utils.csv_open(f'{out_dir}/delivery_part_bucket.csv') as f_bucket: f_del_pay.header('year', 'did', 'dpnr', 'avnr', 'net_amount') @@ -1704,18 +1733,6 @@ def migrate_payments(in_dir: str, out_dir: str) -> None: for aznr, avnr, tznr in variant_year_map[y]: val = p[f'BTeilzahlung{tznr}' if tznr < 6 else 'BEndauszahlung'] val = round(val * pow(10, WGMASTER_PRECISION)) - # prices = get_prices(aznr, p['SNR'], p['SANR'], int(p['Oechsle'])) - # mod = get_modifiers(p['BAbschlaegeString']) - # if not az_map[aznr].get('AbschlägeBerücksichtigen', False): - # mod = 0, 0.0 - - # check_val = b1 * (prices[0] + mod[0]) + b2 * (prices[1] + mod[0]) + b3 * (prices[2] + mod[0]) - # check_val *= 1 + mod[1] - # check_val = round(check_val / 100) * 100 - # if check_val != val: - # print(p['LINR'], y, did, dpnr, avnr, val, check_val) - # else: - # print("GOOD") f_del_pay.row(y, did, dpnr, avnr, val)