Refactor csv.py
This commit is contained in:
104
wgmaster/csv.py
104
wgmaster/csv.py
@ -1,64 +1,72 @@
|
||||
#!/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from typing import Iterator, Dict, Any, Optional, Tuple
|
||||
from typing import Iterator, Dict, Any, Tuple
|
||||
import re
|
||||
import datetime
|
||||
|
||||
RE_INT = re.compile(r'-?[0-9]+')
|
||||
RE_FLOAT = re.compile(r'-?[0-9]+\.[0-9]+')
|
||||
|
||||
def parse(filename: str) -> Iterator[Dict[str, Any]]:
|
||||
def parse_line(line_str: str) -> Iterator[str]:
|
||||
w = None
|
||||
s = False
|
||||
for ch in line_str:
|
||||
if w is None:
|
||||
if ch == ';':
|
||||
yield ''
|
||||
continue
|
||||
elif ch in (' ', '\t'):
|
||||
continue
|
||||
w = ch
|
||||
s = ch == '"'
|
||||
|
||||
def cast_value(value: str) -> Any:
|
||||
if value == '':
|
||||
return None
|
||||
elif value[0] == '"' and value[-1] == '"':
|
||||
return value[1:-1]
|
||||
elif value == 'T':
|
||||
return True
|
||||
elif value == 'F':
|
||||
return False
|
||||
elif RE_INT.fullmatch(value):
|
||||
return int(value)
|
||||
elif RE_FLOAT.fullmatch(value):
|
||||
return float(value)
|
||||
elif len(value) == 10 and value[4] == '-' and value[7] == '-':
|
||||
return datetime.datetime.strptime(value, '%Y-%m-%d').date()
|
||||
elif len(value) == 8 and value[2] == ':' and value[5] == ':':
|
||||
return datetime.time.fromisoformat(value)
|
||||
else:
|
||||
raise RuntimeError(f'unable to infer type of value "{value}"')
|
||||
|
||||
|
||||
def parse_line(line_str: str) -> Iterator[str]:
|
||||
w = None
|
||||
s = False
|
||||
for ch in line_str:
|
||||
if w is None:
|
||||
if ch == ';':
|
||||
yield ''
|
||||
continue
|
||||
elif not s and ch in (';', '\n'):
|
||||
yield w.strip()
|
||||
w = None
|
||||
elif ch in (' ', '\t'):
|
||||
continue
|
||||
elif s and ch == '"':
|
||||
s = False
|
||||
w += ch
|
||||
if w is not None:
|
||||
w = ch
|
||||
s = ch == '"'
|
||||
continue
|
||||
elif not s and ch in (';', '\n'):
|
||||
yield w.strip()
|
||||
w = None
|
||||
continue
|
||||
elif s and ch == '"':
|
||||
s = False
|
||||
w += ch
|
||||
if w is not None:
|
||||
yield w.strip()
|
||||
|
||||
|
||||
def parse(filename: str) -> Iterator[Tuple]:
|
||||
with open(filename, 'r', encoding='utf-8') as f:
|
||||
header: Optional[Tuple[str]] = None
|
||||
for line in f:
|
||||
if header is None:
|
||||
header = tuple([e.strip() for e in line.strip().split(';')])
|
||||
continue
|
||||
lines = f.__iter__()
|
||||
yield tuple([part.strip() for part in next(lines).split(';')])
|
||||
for line in lines:
|
||||
yield tuple([cast_value(part) for part in parse_line(line)])
|
||||
|
||||
obj = {}
|
||||
for i, part in enumerate(parse_line(line)):
|
||||
if part == '':
|
||||
part = None
|
||||
elif part[0] == '"' and part[-1] == '"':
|
||||
part = part[1:-1]
|
||||
elif part == 'T':
|
||||
part = True
|
||||
elif part == 'F':
|
||||
part = False
|
||||
elif re.fullmatch(r'-?[0-9]+', part):
|
||||
part = int(part)
|
||||
elif re.fullmatch(r'-?[0-9]+\.[0-9]+', part):
|
||||
part = float(part)
|
||||
elif len(part) == 10 and part[4] == '-' and part[7] == '-':
|
||||
part = datetime.datetime.strptime(part, '%Y-%m-%d').date()
|
||||
elif len(part) == 8 and part[2] == ':' and part[5] == ':':
|
||||
part = datetime.time.fromisoformat(part)
|
||||
else:
|
||||
raise RuntimeError(f'unable to infer type of value "{part}"')
|
||||
obj[header[i]] = part
|
||||
yield obj
|
||||
|
||||
def parse_dict(filename: str) -> Iterator[Dict[str, Any]]:
|
||||
rows = parse(filename)
|
||||
header = next(rows)
|
||||
for row in rows:
|
||||
yield {header[i]: part for i, part in enumerate(row)}
|
||||
|
||||
|
||||
def format_row(*values) -> str:
|
||||
|
Reference in New Issue
Block a user