From 3b7a28cc2379f5a70be09ae1964020fcbbc1e333 Mon Sep 17 00:00:00 2001
From: Lorenz Stechauner <lorenz.stechauner@necronda.net>
Date: Thu, 4 May 2023 23:07:06 +0200
Subject: [PATCH] Update csv handling

---
 wgmaster/csv.py     | 101 --------------------------
 wgmaster/export.py  |   9 ++-
 wgmaster/import.py  |   4 +-
 wgmaster/migrate.py | 168 ++++++++++++++++++++++----------------------
 wgmaster/utils.py   |  94 +++++++++++++++++++++++++
 5 files changed, 183 insertions(+), 193 deletions(-)
 delete mode 100644 wgmaster/csv.py
 create mode 100644 wgmaster/utils.py

diff --git a/wgmaster/csv.py b/wgmaster/csv.py
deleted file mode 100644
index 151c37f..0000000
--- a/wgmaster/csv.py
+++ /dev/null
@@ -1,101 +0,0 @@
-#!/usr/bin/env python3
-# -*- coding: utf-8 -*-
-
-from typing import Iterator, Dict, Any, Tuple
-import re
-import datetime
-
-RE_INT = re.compile(r'-?[0-9]+')
-RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+')
-RE_STR_START = re.compile(r'.*;"[^"]*$')
-RE_STR_END = re.compile(r'^[^"]*";.*')
-
-
-def cast_value(value: str) -> Any:
-    if value == '':
-        return None
-    elif value[0] == '"' and value[-1] == '"':
-        return value[1:-1]
-    elif value == 'T':
-        return True
-    elif value == 'F':
-        return False
-    elif RE_INT.fullmatch(value):
-        return int(value)
-    elif RE_FLOAT.fullmatch(value):
-        return float(value)
-    elif len(value) == 10 and value[4] == '-' and value[7] == '-':
-        return datetime.datetime.strptime(value, '%Y-%m-%d').date()
-    elif len(value) == 8 and value[2] == ':' and value[5] == ':':
-        return datetime.time.fromisoformat(value)
-    else:
-        raise RuntimeError(f'unable to infer type of value "{value}"')
-
-
-def convert_value(value: Any, table: str = None, column: str = None) -> str:
-    if value is None:
-        return ''
-    if type(value) == str:
-        return f'"{value}"'
-    elif type(value) == bool:
-        return 'T' if value else 'F'
-    elif type(value) == datetime.datetime and table is not None and column is not None:
-        if value.year == 1899 and value.month == 12 and value.day == 30:
-            return value.strftime('%H:%M:%S')
-        elif value.hour == 0 and value.minute == 0 and value.second == 0:
-            return value.strftime('%Y-%m-%d')
-    return str(value)
-
-
-def parse_line(line_str: str) -> Iterator[str]:
-    w = None
-    s = False
-    for ch in line_str:
-        if w is None:
-            if ch == ';':
-                yield ''
-                continue
-            elif ch in (' ', '\t'):
-                continue
-            w = ch
-            s = ch == '"'
-            continue
-        elif not s and ch in (';', '\n'):
-            yield w.strip()
-            w = None
-            continue
-        elif s and ch == '"':
-            s = False
-        w += ch
-    if w is not None:
-        yield w.strip()
-
-
-def parse(filename: str) -> Iterator[Tuple]:
-    with open(filename, 'r', encoding='utf-8') as f:
-        lines = f.__iter__()
-        yield tuple([part.strip() for part in next(lines).split(';')])
-        in_str = False
-        for cur_line in lines:
-            if in_str:
-                line += cur_line
-                if not RE_STR_END.match(cur_line):
-                    continue
-                in_str = False
-            else:
-                line = cur_line
-                if RE_STR_START.match(cur_line):
-                    in_str = True
-                    continue
-            yield tuple([cast_value(part) for part in parse_line(line)])
-
-
-def parse_dict(filename: str) -> Iterator[Dict[str, Any]]:
-    rows = parse(filename)
-    header = next(rows)
-    for row in rows:
-        yield {header[i]: part for i, part in enumerate(row)}
-
-
-def format_row(*values) -> str:
-    return ';'.join([convert_value(v) for v in values]) + '\n'
diff --git a/wgmaster/export.py b/wgmaster/export.py
index 19ee1cf..a66fdd2 100644
--- a/wgmaster/export.py
+++ b/wgmaster/export.py
@@ -11,7 +11,7 @@ import ctypes.wintypes
 import hashlib
 import pypyodbc
 
-import csv
+import utils
 
 
 IGNORED_NAMES = ['Windows', 'Program Files', 'Program Files (x86)', 'AppData']
@@ -138,11 +138,10 @@ def main() -> None:
             cur.execute(f"SELECT * FROM {t_name} ORDER BY `{desc[0][0]}`;")
             cols = [t[0] for t in cur.description]
 
-            with open(f'{args.output}/{t_name}.csv', 'wb+') as f:
-                f.write((';'.join(cols) + '\n').encode('utf-8'))
+            with utils.csv_open(f'{args.output}/{t_name}.csv') as f:
+                f.header(cols)
                 for row in cur:
-                    values = [csv.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)]
-                    f.write((';'.join(values) + '\n').encode('utf-8'))
+                    f.row((utils.convert_value(val, table=t_name, column=col) for col, val in zip(cols, row)), raw=True)
 
             print(f'Exported {t_name} successfully!', flush=True)
     finally:
diff --git a/wgmaster/import.py b/wgmaster/import.py
index 149e9dc..48ad95e 100755
--- a/wgmaster/import.py
+++ b/wgmaster/import.py
@@ -8,7 +8,7 @@ import os
 import re
 import datetime
 
-import csv
+import utils
 
 
 DIR: str
@@ -46,7 +46,7 @@ def sqlite_regexp(pattern: str, value: Optional[str]) -> Optional[bool]:
 
 
 def import_csv(cur: sqlite3.Cursor, table_name: str) -> None:
-    rows = csv.parse(f'{DIR}/{table_name}.csv')
+    rows = utils.csv_parse(f'{DIR}/{table_name}.csv')
     names = next(rows)
 
     sql = f'INSERT INTO {table_name} ({", ".join(names)}) VALUES ({", ".join(["?"] * len(names))})'
diff --git a/wgmaster/migrate.py b/wgmaster/migrate.py
index d097a75..ca2bc4f 100755
--- a/wgmaster/migrate.py
+++ b/wgmaster/migrate.py
@@ -10,7 +10,7 @@ import sqlite3
 import requests
 import datetime
 
-import csv
+import utils
 
 
 DB_CNX: Optional[sqlite3.Connection] = None
@@ -170,9 +170,8 @@ def get_bev_gst_size(kgnr: int, gstnr: str) -> Optional[int]:
 
 
 def parse_flaechenbindungen(in_dir: str) -> Dict[int, Dict[int, Dict[str, Any]]]:
-    fbs = csv.parse_dict(f'{in_dir}/TFlaechenbindungen.csv')
     members = {}
-    for f in fbs:
+    for f in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
         if f['MGNR'] not in members:
             members[f['MGNR']] = {}
         members[f['MGNR']][f['FBNR']] = f
@@ -260,7 +259,7 @@ def lookup_kg_name(kgnr: int) -> str:
 def migrate_gradation(in_dir: str, out_dir: str) -> None:
     global GRADATION_MAP
     GRADATION_MAP = {}
-    for g in csv.parse_dict(f'{in_dir}/TUmrechnung.csv'):
+    for g in utils.csv_parse_dict(f'{in_dir}/TUmrechnung.csv'):
         GRADATION_MAP[g['Oechsle']] = g['KW']
 
 
@@ -268,13 +267,13 @@ def migrate_branches(in_dir: str, out_dir: str) -> None:
     global BRANCH_MAP
     BRANCH_MAP = {}
 
-    with open(f'{out_dir}/branch.csv', 'w+', encoding='utf-8') as f:
-        f.write('zwstid;name;country;postal_dest;address;phone_nr\n')
-        for b in csv.parse_dict(f'{in_dir}/TZweigstellen.csv'):
+    with utils.csv_open(f'{out_dir}/branch.csv') as f:
+        f.header('zwstid', 'name', 'country', 'postal_dest', 'address', 'phone_nr')
+        for b in utils.csv_parse_dict(f'{in_dir}/TZweigstellen.csv'):
             BRANCH_MAP[b['ZNR']] = b['Kennbst']
             address = b['Straße']
             postal_dest = lookup_plz(int(b['PLZ']) if b['PLZ'] else None, b['Ort'], address)
-            f.write(csv.format_row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon']))
+            f.row(b['Kennbst'], b['Name'].strip().title(), 'AT', postal_dest, address, b['Telefon'])
 
 
 def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
@@ -282,34 +281,34 @@ def migrate_grosslagen(in_dir: str, out_dir: str) -> None:
     GROSSLAGE_MAP = {}
 
     glnr = 0
-    with open(f'{out_dir}/wb_gl.csv', 'w+', encoding='utf-8') as f:
-        f.write('glnr;name\n')
-        for gl in csv.parse_dict(f'{in_dir}/TGrosslagen.csv'):
+    with utils.csv_open(f'{out_dir}/wb_gl.csv') as f:
+        f.header('glnr', 'name')
+        for gl in utils.csv_parse_dict(f'{in_dir}/TGrosslagen.csv'):
             glnr += 1
             GROSSLAGE_MAP[gl['GLNR']] = glnr
-            f.write(csv.format_row(glnr, gl['Bezeichnung']))
+            f.row(glnr, gl['Bezeichnung'])
 
 
 def migrate_gemeinden(in_dir: str, out_dir: str) -> None:
     global GEM_MAP
     GEM_MAP = {}
 
-    with open(f'{out_dir}/wb_kg.csv', 'w+', encoding='utf-8') as f:
-        f.write('kgnr;glnr\n')
-        for g in csv.parse_dict(f'{in_dir}/TGemeinden.csv'):
+    with utils.csv_open(f'{out_dir}/wb_kg.csv') as f:
+        f.header('kgnr', 'glnr')
+        for g in utils.csv_parse_dict(f'{in_dir}/TGemeinden.csv'):
             gems = lookup_gem_name(g['Bezeichnung'])
             GEM_MAP[g['GNR']] = gems
             for kgnr, gkz in gems:
-                f.write(csv.format_row(kgnr, GROSSLAGE_MAP[g['GLNR']]))
+                f.row(kgnr, GROSSLAGE_MAP[g['GLNR']])
 
 
 def migrate_reeds(in_dir: str, out_dir: str) -> None:
     global REED_MAP
     REED_MAP = {}
 
-    with open(f'{out_dir}/wb_rd.csv', 'w+', encoding='utf-8') as f:
-        f.write('kgnr;rdnr;name\n')
-        for r in csv.parse_dict(f'{in_dir}/TRiede.csv'):
+    with utils.csv_open(f'{out_dir}/wb_rd.csv') as f:
+        f.header('kgnr', 'rdnr', 'name')
+        for r in utils.csv_parse_dict(f'{in_dir}/TRiede.csv'):
             name: str = r['Bezeichnung'].strip()
             if name.isupper():
                 name = name.title()
@@ -321,26 +320,26 @@ def migrate_reeds(in_dir: str, out_dir: str) -> None:
 
             rdnr = max([n for k, n in REED_MAP.values() if k == kgnr] or [0]) + 1
             REED_MAP[r['RNR']] = (kgnr, rdnr)
-            f.write(csv.format_row(kgnr, rdnr, name))
+            f.row(kgnr, rdnr, name)
 
 
 def migrate_attributes(in_dir: str, out_dir: str) -> None:
-    with open(f'{out_dir}/wine_attribute.csv', 'w+', encoding='utf-8') as f:
-        f.write('attrid;name;kg_per_ha\n')
-        for a in csv.parse_dict(f'{in_dir}/TSortenAttribute.csv'):
-            f.write(csv.format_row(a['SANR'], a['Attribut'], int(a['KgProHa'])))
+    with utils.csv_open(f'{out_dir}/wine_attribute.csv') as f:
+        f.header('attrid', 'name', 'kg_per_ha')
+        for a in utils.csv_parse_dict(f'{in_dir}/TSortenAttribute.csv'):
+            f.row(a['SANR'], a['Attribut'], int(a['KgProHa']))
         if WG == 'MATZEN':
-            f.write(csv.format_row('M', 'Matzen', 10000))
-            f.write(csv.format_row('HU', 'Huber', 10000))
+            f.row('M', 'Matzen', 10000)
+            f.row('HU', 'Huber', 10000)
 
 
 def migrate_cultivations(in_dir: str, out_dir: str) -> None:
     global CULTIVATION_MAP
     CULTIVATION_MAP = {}
 
-    with open(f'{out_dir}/wine_cultivation.csv', 'w+', encoding='utf-8') as f:
-        f.write('cultid;name\n')
-        for c in csv.parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
+    with utils.csv_open(f'{out_dir}/wine_cultivation.csv') as f:
+        f.header('cultid', 'name')
+        for c in utils.csv_parse_dict(f'{in_dir}/TBewirtschaftungsarten.csv'):
             name: str = c['Bezeichnung']
             cultid = name[0].upper()
             if name.isupper():
@@ -348,27 +347,28 @@ def migrate_cultivations(in_dir: str, out_dir: str) -> None:
             elif 'biolog' in name.lower():
                 cultid = 'BIO'
             CULTIVATION_MAP[c['BANR']] = cultid
-            f.write(csv.format_row(cultid, name))
+            f.row(cultid, name)
 
 
 def migrate_members(in_dir: str, out_dir: str) -> None:
     global MEMBER_MAP
     MEMBER_MAP = {}
 
-    members = [m for m in csv.parse_dict(f'{in_dir}/TMitglieder.csv')]
+    members = [m for m in utils.csv_parse_dict(f'{in_dir}/TMitglieder.csv')]
     mgnrs = [m['MGNR'] for m in members]
     fbs = parse_flaechenbindungen(in_dir)
 
-    with open(f'{out_dir}/member.csv', 'w+', encoding='utf-8') as f_m,\
-            open(f'{out_dir}/member_billing_address.csv', 'w+', encoding='utf-8') as f_mba,\
-            open(f'{out_dir}/wb_kg.csv', 'a', encoding='utf-8') as f_kg:
-        f_m.write('mgnr;predecessor_mgnr;prefix;given_name;middle_names;family_name;suffix;'
-                  'birthday;entry_date;exit_date;business_shares;accounting_nr;zwstid;'
-                  'lfbis_nr;ustid;volllieferant;buchführend;funktionär;active;iban;bic;'
-                  'country;postal_dest;address;'
-                  'email;phone_landline;phone_mobile_1;phone_mobile_2;'
-                  'default_kgnr;comment\n')
-        f_mba.write('mgnr;name;country;postal_dest;address\n')
+    with utils.csv_open(f'{out_dir}/member.csv') as f_m,\
+            utils.csv_open(f'{out_dir}/member_billing_address.csv') as f_mba,\
+            utils.csv_open(f'{out_dir}/wb_kg.csv', 'a') as f_kg:
+        f_m.header(
+            'mgnr', 'predecessor_mgnr', 'prefix', 'given_name', 'middle_names', 'family_name', 'suffix',
+            'birthday', 'entry_date', 'exit_date', 'business_shares', 'accounting_nr', 'zwstid',
+            'lfbis_nr', 'ustid', 'volllieferant', 'buchführend', 'funktionär', 'active', 'iban', 'bic',
+            'country', 'postal_dest', 'address',
+            'email', 'phone_landline', 'phone_mobile_1', 'phone_mobile_2',
+            'default_kgnr', 'comment')
+        f_mba.header('mgnr', 'name', 'country', 'postal_dest', 'address')
 
         for m in members:
             mgnr: int = m['MGNR']
@@ -573,7 +573,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
             elif kgnr not in [kg[0] for gem in GEM_MAP.values() for kg in gem]:
                 glnr = list(GROSSLAGE_MAP.values())[0]
                 print(f'New KG: {lookup_kg_name(kgnr)} ({kgnr}, GL {glnr})')
-                f_kg.write(csv.format_row(kgnr, glnr))
+                f_kg.row(kgnr, glnr)
                 if 9999 not in GEM_MAP:
                     GEM_MAP[9999] = []
                 GEM_MAP[9999].append((kgnr, 0))
@@ -583,7 +583,7 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
                 continue
 
             pred = m['MGNR-Vorgänger'] if m['MGNR-Vorgänger'] in mgnrs else None
-            f_m.write(csv.format_row(
+            f_m.row(
                 mgnr, pred, prefix, given_name, middle_names, family_name, suffix,
                 m['Geburtsjahr'], m['Eintrittsdatum'], m['Austrittsdatum'], m['Geschäftsanteile1'] or 0,
                 m['BHKontonummer'], zwstid, bnr, ustid,
@@ -591,12 +591,12 @@ def migrate_members(in_dir: str, out_dir: str) -> None:
                 iban, bic, 'AT', postal_dest, address or '-', email, phone_landline,
                 phone_mobile[0] if len(phone_mobile) > 0 else None, phone_mobile[1] if len(phone_mobile) > 1 else None,
                 kgnr, m['Anmerkung']
-            ))
+            )
             MEMBER_MAP[mgnr] = {
                 'default_kgnr': kgnr
             }
             if billing_name:
-                f_mba.write(csv.format_row(mgnr, billing_name, 'AT', postal_dest, address or '-'))
+                f_mba.row(mgnr, billing_name, 'AT', postal_dest, address or '-')
 
 
 def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
@@ -692,12 +692,13 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
         text = re.sub(r'([0-9]+(, |$)){3,}', lambda m: replace_nrs(m, ', '), text)
         return text
 
-    with open(f'{out_dir}/area_commitment.csv', 'w+', encoding='utf-8') as f_fb, \
-            open(f'{out_dir}/area_commitment_attribute.csv', 'w+', encoding='utf-8') as f_attr:
-        f_fb.write('fbnr;mgnr;sortid;cultid;area;kgnr;gstnr;rdnr;year_from;year_to;comment\n')
-        f_attr.write('fbnr;attrid\n')
+    with utils.csv_open(f'{out_dir}/area_commitment.csv') as f_fb, \
+            utils.csv_open(f'{out_dir}/area_commitment_attribute.csv',) as f_attr:
+        f_fb.header('fbnr', 'mgnr', 'sortid', 'cultid', 'area', 'kgnr', 'gstnr', 'rdnr',
+                    'year_from', 'year_to', 'comment')
+        f_attr.header('fbnr', 'attrid')
 
-        for fb in csv.parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
+        for fb in utils.csv_parse_dict(f'{in_dir}/TFlaechenbindungen.csv'):
             if fb['Von'] is None and fb['Bis'] is None:
                 continue
             parz: str = fb['Parzellennummer']
@@ -723,10 +724,10 @@ def migrate_area_commitments(in_dir: str, out_dir: str) -> None:
 
             rdnr = REED_MAP[fb['RNR']][1] if fb['RNR'] else None
             to = fb['Bis'] if fb['Bis'] and fb['Bis'] < 3000 else None
-            f_fb.write(csv.format_row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area,
-                                      kgnr, gstnr, rdnr, fb['Von'], to, comment))
+            f_fb.row(fbnr, mgnr, fb['SNR'], CULTIVATION_MAP[fb['BANR']], area,
+                     kgnr, gstnr, rdnr, fb['Von'], to, comment)
             if fb['SANR']:
-                f_attr.write(csv.format_row(fbnr, fb['SANR']))
+                f_attr.row(fbnr, fb['SANR'])
 
 
 def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str, List[int], datetime.date]]:
@@ -782,7 +783,7 @@ def fix_deliveries(deliveries: Iterable[Dict[str, Any]]) -> Iterable[Tuple[str,
 
 
 def migrate_deliveries(in_dir: str, out_dir: str) -> None:
-    modifiers = {m['ASNR']: m for m in csv.parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']}
+    modifiers = {m['ASNR']: m for m in utils.csv_parse_dict(f'{in_dir}/TAbschlaege.csv') if m['Bezeichnung']}
     delivery_map = {}
     seasons = {}
     branches = {}
@@ -796,17 +797,18 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
         else:
             raise NotImplementedError()
 
-    deliveries = list(csv.parse_dict(f'{in_dir}/TLieferungen.csv'))
+    deliveries = list(utils.csv_parse_dict(f'{in_dir}/TLieferungen.csv'))
     delivery_dict = {d['LINR']: d for d in deliveries}
     fixed = fix_deliveries(deliveries)
 
-    with open(f'{out_dir}/delivery.csv', 'w+', encoding='utf-8') as f_delivery, \
-            open(f'{out_dir}/delivery_part.csv', 'w+', encoding='utf-8') as f_part, \
-            open(f'{out_dir}/delivery_part_attribute.csv', 'w+', encoding='utf-8') as f_attr:
-        f_delivery.write('year;did;date;time;zwstid;lnr;lsnr;mgnr;comment\n')
-        f_part.write('year;did;dpnr;sortid;weight;kmw;qualid;hkid;kgnr;rdnr;gerebelt;manual_weighing;spl_check;'
-                     'hand_picked;lesewagen;temperature;acid;scale_id;weighing_id;comment\n')
-        f_attr.write('year;did;dpnr;attrid\n')
+    with utils.csv_open(f'{out_dir}/delivery.csv') as f_delivery, \
+            utils.csv_open(f'{out_dir}/delivery_part.csv') as f_part, \
+            utils.csv_open(f'{out_dir}/delivery_part_attribute.csv') as f_attr:
+        f_delivery.header('year', 'did', 'date', 'time', 'zwstid', 'lnr', 'lsnr', 'mgnr', 'comment')
+        f_part.header('year', 'did', 'dpnr', 'sortid', 'weight', 'kmw', 'qualid', 'hkid', 'kgnr', 'rdnr',
+                      'gerebelt', 'manual_weighing', 'spl_check', 'hand_picked', 'lesewagen',
+                      'temperature', 'acid', 'scale_id', 'weighing_id', 'comment')
+        f_attr.header('year', 'did', 'dpnr', 'attrid')
 
         for lsnr, linrs, date in fixed:
             if date.year not in seasons:
@@ -908,37 +910,33 @@ def migrate_deliveries(in_dir: str, out_dir: str) -> None:
                 if comment:
                     comments.append(comment)
 
-                f_part.write(csv.format_row(
+                f_part.row(
                     date.year, snr, dpnr, sortid, int(d['Gewicht']), kmw, QUAL_MAP[d['QSNR']], HKID, kgnr, rdnr,
                     d['Gerebelt'] or False, d['Handwiegung'] or False, d['Spaetlese-Ueberpruefung'] or False,
                     hand, lesemaschine, d['Temperatur'], acid, scale_id, weighing_id, comment
-                ))
+                )
                 for attrid in attributes:
-                    f_attr.write(csv.format_row(date.year, snr, dpnr, attrid))
-            f_delivery.write(csv.format_row(
-                date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'],
-                '; '.join(comments) or None
-            ))
+                    f_attr.row(date.year, snr, dpnr, attrid)
+            f_delivery.row(date.year, snr, date, d['Uhrzeit'], BRANCH_MAP[d['ZNR']], lnr, lsnr, d['MGNR'],
+                           '; '.join(comments) or None)
 
-    with open(f'{out_dir}/delivery_part_modifier.csv', 'w+', encoding='utf-8') as f_part_mod:
-        f_part_mod.write('year;did;dpnr;modid\n')
-        for m in csv.parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
+    with utils.csv_open(f'{out_dir}/delivery_part_modifier.csv') as f_part_mod:
+        f_part_mod.header('year', 'did', 'dpnr', 'modid')
+        for m in utils.csv_parse_dict(f'{in_dir}/TLieferungAbschlag.csv'):
             if m['LINR'] not in delivery_map:
                 continue
             nid = delivery_map[m['LINR']]
-            f_part_mod.write(csv.format_row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id']))
+            f_part_mod.row(nid[0], nid[1], nid[2], modifiers[m['ASNR']]['id'])
 
-    with open(f'{out_dir}/season.csv', 'w+', encoding='utf-8') as f_season, \
-            open(f'{out_dir}/modifier.csv', 'w+', encoding='utf-8') as f_mod:
-        f_season.write('year;currency;precision;start_date;end_date\n')
-        f_mod.write('year;modid;name;abs;rel;standard;quick_select\n')
+    with utils.csv_open(f'{out_dir}/season.csv') as f_season, \
+            utils.csv_open(f'{out_dir}/modifier.csv') as f_mod:
+        f_season.header('year', 'currency', 'precision', 'start_date', 'end_date')
+        f_mod.header('year', 'modid', 'name', 'abs', 'rel', 'standard', 'quick_select')
         for y, s in seasons.items():
-            f_season.write(csv.format_row(y, s['currency'], s['precision'], s['start'], s['end']))
+            f_season.row(y, s['currency'], s['precision'], s['start'], s['end'])
             for m in modifiers.values():
                 abs_v = int(m['AZAS'] * pow(10, s['precision'])) if m['AZAS'] is not None else None
-                f_mod.write(csv.format_row(
-                    y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl']
-                ))
+                f_mod.row(y, m['id'], m['Bezeichnung'], abs_v, m['AZASProzent'], m['Standard'], m['Schnellauswahl'])
 
 
 def migrate_payments(in_dir: str, out_dir: str) -> None:
@@ -946,7 +944,7 @@ def migrate_payments(in_dir: str, out_dir: str) -> None:
 
 
 def migrate_parameters(in_dir: str, out_dir: str) -> None:
-    params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in csv.parse_dict(f'{in_dir}/TParameter.csv')}
+    params: Dict[str, str] = {p['Bezeichnung']: p['Wert'] for p in utils.csv_parse_dict(f'{in_dir}/TParameter.csv')}
     name = params['MANDANTENNAME1'].title().replace('F.', 'für').replace('U.', 'und')
     shortened = name.replace(' für ', ' f. ').replace(' und ', ' u. ')
     suffix = params['MANDANTENNAME2'].lower().replace(' ', '').replace('.', '')
@@ -974,10 +972,10 @@ def migrate_parameters(in_dir: str, out_dir: str) -> None:
         'WEBSITE': params['MANDANTENHOMEPAGE'],
     }
 
-    with open(f'{out_dir}/client_parameter.csv', 'w+', encoding='utf-8') as f:
-        f.write('param;value\n')
+    with utils.csv_open(f'{out_dir}/client_parameter.csv') as f:
+        f.header('param', 'value')
         for param, value in new_params.items():
-            f.write(csv.format_row(param, value))
+            f.row(param, value)
 
 
 def main() -> None:
diff --git a/wgmaster/utils.py b/wgmaster/utils.py
new file mode 100644
index 0000000..572c1d4
--- /dev/null
+++ b/wgmaster/utils.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from __future__ import annotations
+from typing import Iterator, Dict, Any, Tuple, TextIO, List
+import re
+import datetime
+import csv
+
+RE_INT = re.compile(r'-?[0-9]+')
+RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+')
+RE_STR_START = re.compile(r'.*,"[^"]*$')
+RE_STR_END = re.compile(r'^[^"]*",.*')
+
+
+def cast_value(value: str) -> Any:
+    if value == '':
+        return None
+    elif value[0] == '"' and value[-1] == '"':
+        return value[1:-1]
+    elif value == 'T':
+        return True
+    elif value == 'F':
+        return False
+    elif RE_INT.fullmatch(value):
+        return int(value)
+    elif RE_FLOAT.fullmatch(value):
+        return float(value)
+    elif len(value) == 10 and value[4] == '-' and value[7] == '-':
+        return datetime.datetime.strptime(value, '%Y-%m-%d').date()
+    elif len(value) == 8 and value[2] == ':' and value[5] == ':':
+        return datetime.time.fromisoformat(value)
+    else:
+        raise RuntimeError(f'unable to infer type of value "{value}"')
+
+
+def convert_value(value: Any, table: str = None, column: str = None) -> str:
+    if value is None:
+        return ''
+    if type(value) == str:
+        return '"' + value.replace('"', '""') + '"'
+    elif type(value) == bool:
+        return 'T' if value else 'F'
+    elif type(value) == datetime.datetime and table is not None and column is not None:
+        if value.year == 1899 and value.month == 12 and value.day == 30:
+            return value.strftime('%H:%M:%S')
+        elif value.hour == 0 and value.minute == 0 and value.second == 0:
+            return value.strftime('%Y-%m-%d')
+    return str(value)
+
+
+class CsvFile:
+    file: TextIO
+    reader: csv.reader
+    writer: csv.writer
+
+    def __init__(self, file: TextIO):
+        self.file = file
+        self.writer = csv.writer(self.file, doublequote=False, quoting=csv.QUOTE_NONE, escapechar=None)
+        self.reader = csv.reader(self.file, doublequote=False, quoting=csv.QUOTE_NONE, escapechar=None)
+
+    def __enter__(self) -> CsvFile:
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
+        return self.file.__exit__(exc_type, exc_val, exc_tb)
+
+    def __iter__(self) -> Iterator[List[str]]:
+        return self.reader.__iter__()
+
+    def row(self, *fields, raw: bool = False):
+        if not raw:
+            fields = (convert_value(field) for field in fields)
+        self.writer.writerow(fields)
+
+    def header(self, *headers):
+        self.writer.writerow(str(h) for h in headers)
+
+
+def csv_open(filename: str, mode: str = 'w+') -> CsvFile:
+    return CsvFile(open(filename, mode, encoding='utf-8', newline=''))
+
+
+def csv_parse(filename: str) -> Iterator[Tuple]:
+    with csv_open(filename, 'r') as f:
+        rows = f.__iter__()
+        yield tuple([field.strip() for field in next(rows)])
+        yield from (tuple(cast_value(field) for field in row) for row in rows)
+
+
+def csv_parse_dict(filename: str) -> Iterator[Dict[str, Any]]:
+    rows = csv_parse(filename)
+    header = next(rows)
+    return ({header[i]: part for i, part in enumerate(row)} for row in rows)