elwig-backend: Add /auth endpoint

This commit is contained in:
2025-04-28 11:24:58 +02:00
parent bc17b842a4
commit 60be49d31d
2 changed files with 275 additions and 24 deletions

View File

@ -11,6 +11,65 @@ Authorization
------------- -------------
All API endpoints are secured via an HTTP basic authentication. All API endpoints are secured via an HTTP basic authentication.
Additionally, the `/auth` endpoint may be used to obtain a Bearer/JWT token.
### `GET /auth`
#### Example
```http
GET /auth HTTP/1.1
Host: example.com
Accept: application/json
Authorization: Basic dGVzdDp0ZXN0
```
```http
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 185
Access-Control-Allow-Origin: *
Access-Control-Allow-Headers: Authorization
Access-Control-Allow-Methods: HEAD, GET, POST, OPTIONS
```
### `POST /auth`
#### Example
```http
POST /auth HTTP/1.1
Host: example.com
Accept: application/json
Content-Type: application/json
Content-Length: 44
```
```http
POST /auth HTTP/1.1
Host: example.com
Accept: application/json
Content-Type: application/x-www-form-urlencoded
Content-Length: 31
username=test&password=password
```
```http
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 185
Access-Control-Allow-Origin: *
Access-Control-Allow-Headers: Authorization
Access-Control-Allow-Methods: HEAD, GET, POST, OPTIONS
```
Base Data
--------- ---------
### `GET /wine/varieties` ### `GET /wine/varieties`

View File

@ -6,6 +6,7 @@ from typing import Callable, Optional
from http.server import BaseHTTPRequestHandler, HTTPServer from http.server import BaseHTTPRequestHandler, HTTPServer
import argparse import argparse
import datetime import datetime
import time
import traceback import traceback
import re import re
import base64 import base64
@ -13,15 +14,21 @@ import json
import sqlite3 import sqlite3
import urllib.parse import urllib.parse
import gzip import gzip
import hashlib
import hmac
VERSION: str = '0.0.3' VERSION: str = '0.0.4'
CNX: sqlite3.Cursor CNX: sqlite3.Cursor
USER_FILE: str USER_FILE: str
JWT_SECRET: bytes
JWT_ISSUER: str
JWT_INVALIDATE_BEFORE: int = 0
JWT_USER_INVALIDATE_BEFORE: dict[str, int] = {}
class HttpError(Exception): class HttpError(BaseException):
status_code: int status_code: int
def __init__(self, status_code: int, message: str): def __init__(self, status_code: int, message: str):
@ -39,6 +46,11 @@ class UnauthorizedError(HttpError):
super().__init__(401, message) super().__init__(401, message)
class ForbiddenError(HttpError):
def __init__(self, message: str = 'Forbidden'):
super().__init__(403, message)
class NotFoundError(HttpError): class NotFoundError(HttpError):
def __init__(self, message: str = 'Not found'): def __init__(self, message: str = 'Not found'):
super().__init__(404, message) super().__init__(404, message)
@ -108,6 +120,19 @@ def jdmp(value, is_bool: bool = False) -> str:
return json.dumps(value, ensure_ascii=False) return json.dumps(value, ensure_ascii=False)
def check_password(stored_pwd: str, check_pwd: str) -> bool:
return stored_pwd == check_pwd
def check_user_password(username: str, password: str) -> tuple[str, str]:
with open(USER_FILE, 'r') as file:
for line in file:
(u, r, i, p) = line.strip().split(':', 3)
if u == username and check_password(p, password):
return u, r
raise UnauthorizedError()
def get_delivery_schedule_filter_clauses(filters: list[Filter]) -> list[str]: def get_delivery_schedule_filter_clauses(filters: list[Filter]) -> list[str]:
clauses = [] clauses = []
for f in filters: for f in filters:
@ -128,6 +153,9 @@ class ElwigApi(BaseHTTPRequestHandler):
self.send_response(status_code) self.send_response(status_code)
self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Headers', 'Authorization') self.send_header('Access-Control-Allow-Headers', 'Authorization')
if self.path in ('/auth',):
self.send_header('Access-Control-Allow-Methods', 'HEAD, GET, POST, OPTIONS')
else:
self.send_header('Access-Control-Allow-Methods', 'HEAD, GET, OPTIONS') self.send_header('Access-Control-Allow-Methods', 'HEAD, GET, OPTIONS')
if 300 <= status_code < 400 and status_code != 304 and url: if 300 <= status_code < 400 and status_code != 304 and url:
self.send_header('Location', url) self.send_header('Location', url)
@ -150,24 +178,72 @@ class ElwigApi(BaseHTTPRequestHandler):
def see_other(self, url: str) -> None: def see_other(self, url: str) -> None:
self.send(f'{{"url": {jdmp(url)}}}\n', status_code=303, url=url) self.send(f'{{"url": {jdmp(url)}}}\n', status_code=303, url=url)
def authorize(self) -> tuple[str, str]: def authorize(self) -> tuple[str, str, str]:
try:
auth = self.headers.get('Authorization') auth = self.headers.get('Authorization')
if auth is None or not auth.startswith('Basic '): if auth and auth.startswith('Basic '):
u, r = ElwigApi.authorize_basic(auth[6:])
return u, r, 'Basic'
elif auth and auth.startswith('Bearer '):
u, r = ElwigApi.authorize_bearer(auth[7:])
return u, r, 'Bearer'
raise UnauthorizedError() raise UnauthorizedError()
auth = base64.b64decode(auth[6:]).split(b':', 1)
if len(auth) != 2: @staticmethod
raise UnauthorizedError('Invalid Authorization header') def authorize_basic(auth: str) -> tuple[str, str]:
username, password = auth[0].decode('utf-8'), auth[1].decode('utf-8') try:
username, password = base64.b64decode(auth.strip() + '==').decode('utf-8').split(':', 1)
except: except:
raise UnauthorizedError('Invalid Authorization header') raise BadRequestError('Invalid Authorization header')
with open(USER_FILE, 'r') as file: return check_user_password(username, password)
for line in file:
(u, r, p) = line.strip().split(':', 2) @staticmethod
if u == username: def authorize_bearer(token: str) -> tuple[str, str]:
if p == password: try:
return u, r hdr_r, payload_r, sig = token.strip().split('.')
raise UnauthorizedError() hdr = json.loads(base64.urlsafe_b64decode(hdr_r + '==').decode('utf-8'))
payload = json.loads(base64.urlsafe_b64decode(payload_r + '==').decode('utf-8'))
if hdr['typ'] != 'JWT':
raise ValueError()
mac = hmac.new(JWT_SECRET, digestmod={
'HS224': hashlib.sha224,
'HS256': hashlib.sha256,
'HS384': hashlib.sha384,
'HS512': hashlib.sha512,
}[hdr['alg']])
mac.update((hdr_r + '.' + payload_r).encode('ascii'))
digest = mac.digest()
except Exception:
raise BadRequestError('Invalid Authorization header')
try:
if digest != base64.urlsafe_b64decode(sig + '=='):
raise UnauthorizedError('Invalid JWT signature')
elif payload['iss'] != JWT_ISSUER:
raise UnauthorizedError('Invalid JWT issuer')
elif 'exp' in payload and payload['exp'] < int(time.time()):
raise UnauthorizedError('JWT token expired')
elif 'nbf' in payload and payload['nbf'] > int(time.time()):
raise UnauthorizedError('JWT token not yet valid')
elif payload['iat'] < JWT_INVALIDATE_BEFORE:
raise UnauthorizedError('Invalidated JWT token')
elif payload['iat'] < JWT_USER_INVALIDATE_BEFORE.get(payload['sub'], 0):
raise UnauthorizedError('Invalidated JWT token')
return payload['sub'], payload['rol']
except Exception:
raise UnauthorizedError('Invalid JWT token')
@staticmethod
def issue_jwt(username: str, role: str) -> str:
hdr = base64.urlsafe_b64encode(b'{"typ":"JWT","alg":"HS256"}').strip(b'=')
payload = base64.urlsafe_b64encode(json.dumps({
'iss': JWT_ISSUER,
'sub': username,
'rol': role,
'iat': int(time.time()),
}, ensure_ascii=False, separators=(',', ':')).encode('utf-8')).strip(b'=')
mac = hmac.new(JWT_SECRET, digestmod=hashlib.sha256)
mac.update(hdr + b'.' + payload)
sig = base64.urlsafe_b64encode(mac.digest()).strip(b'=')
return (hdr + b'.' + payload + b'.' + sig).decode('ascii')
def exec_collection(self, sql_query: str, fmt: Callable, filters: list[Filter], def exec_collection(self, sql_query: str, fmt: Callable, filters: list[Filter],
offset: int = None, limit: int = None, offset: int = None, limit: int = None,
@ -304,7 +380,7 @@ class ElwigApi(BaseHTTPRequestHandler):
self.send(OPEN_API_DOC) self.send(OPEN_API_DOC)
return return
username, role = self.authorize() username, role, auth_method = self.authorize()
parts = self.path.split('?', 1) parts = self.path.split('?', 1)
if len(parts) == 1: if len(parts) == 1:
@ -320,7 +396,11 @@ class ElwigApi(BaseHTTPRequestHandler):
raise BadRequestError('Invalid integer value in query') raise BadRequestError('Invalid integer value in query')
order = query['order'] if 'order' in query else None order = query['order'] if 'order' in query else None
if path == '/wine/varieties': if path == '/auth':
if auth_method == 'Bearer':
raise ForbiddenError('Tokens must not be renewed')
self.send( f'{{"token":{jdmp(ElwigApi.issue_jwt(username, role))}}}\n')
elif path == '/wine/varieties':
self.exec_collection( self.exec_collection(
"SELECT sortid, type, name, comment FROM wine_variety", "SELECT sortid, type, name, comment FROM wine_variety",
lambda r: f'{{"sortid":{jdmp(r[0])},"type":{jdmp(r[1])},"name":{jdmp(r[2])},"comment":{jdmp(r[3])}}}', lambda r: f'{{"sortid":{jdmp(r[0])},"type":{jdmp(r[1])},"name":{jdmp(r[2])},"comment":{jdmp(r[3])}}}',
@ -355,21 +435,77 @@ class ElwigApi(BaseHTTPRequestHandler):
traceback.print_exception(e) traceback.print_exception(e)
self.error(500, str(e)) self.error(500, str(e))
def do_POST(self) -> None:
try:
parts = self.path.split('?', 1)
path = parts[0]
if path == '/auth':
content_type = [p.strip() for p in self.headers.get('Content-Type', '').split(';')][0]
content_len = self.headers.get('Content-Length')
if content_len is None:
raise HttpError(411, 'Length required')
content_len = int(content_len)
if content_len > 4096 or content_len < 0:
raise HttpError(413, 'Content too large')
elif content_type == 'application/x-www-form-urlencoded':
payload = self.rfile.read(content_len)
try:
data = {urllib.parse.unquote(s[0]): urllib.parse.unquote(s[-1])
for s in [p.split(b'=', 1) for p in payload.split(b'&')]}
username, password = data['username'], data['password']
except Exception:
raise BadRequestError('Invalid URL encoded payload')
elif content_type == 'application/json':
payload = self.rfile.read(content_len)
try:
data = json.loads(payload.decode('utf-8'))
username, password = data['username'], data['password']
except Exception:
raise BadRequestError('Invalid JSON object')
else:
raise HttpError(415, 'Unsupported media type')
username, role = check_user_password(username, password)
self.send(f'{{"token":{jdmp(ElwigApi.issue_jwt(username, role))}}}\n')
elif path in ('/wine/varieties', '/wine/quality_levels', '/wine/attributes', '/wine/cultivations', '/modifiers', '/delivery_schedules'):
raise MethodNotAllowedError()
else:
raise NotFoundError('Invalid path')
except HttpError as e:
self.error(e.status_code, str(e))
except Exception as e:
traceback.print_exception(e)
self.error(500, str(e))
def main() -> None: def main() -> None:
global CNX global CNX
global USER_FILE global USER_FILE
global JWT_ISSUER
global JWT_SECRET
global JWT_INVALIDATE_BEFORE
sqlite3.register_adapter(datetime.date, lambda d: d.strftime('%Y-%m-%d')) sqlite3.register_adapter(datetime.date, lambda d: d.strftime('%Y-%m-%d'))
sqlite3.register_adapter(datetime.time, lambda t: t.strftime('%H:%M:%S')) sqlite3.register_adapter(datetime.time, lambda t: t.strftime('%H:%M:%S'))
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('db', type=str, metavar='DB') parser.add_argument('db', type=str, metavar='DB')
parser.add_argument('jwt_file', type=str, metavar='JWT-FILE')
parser.add_argument('user_file', type=str, metavar='USER-FILE') parser.add_argument('user_file', type=str, metavar='USER-FILE')
parser.add_argument('-p', '--port', type=int, default=8080) parser.add_argument('-p', '--port', type=int, default=8080)
args = parser.parse_args() args = parser.parse_args()
jwt_file = args.jwt_file
with open(jwt_file, 'rb') as file:
iss, dt, JWT_SECRET = file.readline().strip().split(b':', 2)
JWT_ISSUER = iss.decode('utf-8')
JWT_INVALIDATE_BEFORE = int(datetime.datetime.fromisoformat(dt.decode('ascii')).timestamp()) if dt else 0
USER_FILE = args.user_file USER_FILE = args.user_file
with open(USER_FILE, 'r') as file:
for line in file:
(u, r, i, p) = line.strip().split(':', 3)
JWT_USER_INVALIDATE_BEFORE[u.strip()] = int(datetime.datetime.fromisoformat(i).timestamp()) if i else 0
CNX = sqlite3.connect(f'file:{args.db}?mode=ro', uri=True) CNX = sqlite3.connect(f'file:{args.db}?mode=ro', uri=True)
CNX.create_function('REGEXP', 2, sqlite_regexp, deterministic=True) CNX.create_function('REGEXP', 2, sqlite_regexp, deterministic=True)
@ -396,9 +532,65 @@ OPEN_API_DOC: str = '''{
"url": "https://wgm.elwig.at/elwig/api/v1", "url": "https://wgm.elwig.at/elwig/api/v1",
"description": "WG Matzen" "description": "WG Matzen"
}], }],
"components": {"securitySchemes": {"basicAuth": {"type": "http", "scheme": "basic"}}}, "components": {
"security": [{"basicAuth": []}], "securitySchemes": {
"basicAuth": {"type": "http", "scheme": "basic"},
"bearerAuth": {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"}
}
},
"security": [{"basicAuth": []}, {"bearerAuth": []}],
"paths": { "paths": {
"/auth": {
"get": {
"tags": ["Authentication"],
"summary": "Authentication",
"security": [{"basicAuth": []}],
"responses": {
"200": {
"description": "Success",
"content": {
"application/json": {
"schema": {"type": "object", "required": ["token"], "properties": {"token": {"type": "string"}}},
"examples": {"simple": {"value": {"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJleGFtcGxlLmNvbSIsInN1YiI6InRlc3QiLCJyb2wiOiJleHRlcm5hbCIsImlhdCI6MTc0NTgzMTQzN30.VMLz20aWI8nSd7ocT2W750Cy80OJs8OMiQCtq5Df0rE"}}}
}
}
},
"400": {"description": "Bad Request", "content": {"application/json": {"schema": {"type": "object", "required": ["message"], "properties": {"message": {"oneOf": [{"type": "string"}, {"type": "null"}]}}}, "examples": {"simple": {"value": {"message": "Invalid Authorization header"}}}}}},
"401": {"description": "Unauthorized", "content": {"application/json": {"schema": {"type": "object", "required": ["message"], "properties": {"message": {"oneOf": [{"type": "string"}, {"type": "null"}]}}}, "examples": {"simple": {"value": {"message": "Unauthorized"}}}}}},
"500": {"description": "Internal Server Error", "content": {"application/json": {"schema": {"type": "object", "required": ["message"], "properties": {"message": {"oneOf": [{"type": "string"}, {"type": "null"}]}}}, "examples": {"simple": {"value": {"message": "Unknown error"}}}}}}
}
},
"post": {
"tags": ["Authentication"],
"summary": "Authentication",
"security": [],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {"type": "object", "required": ["username", "password"], "properties": {"username": {"type": "string"}, "password": {"type": "string"}}}
},
"application/x-www-form-urlencoded": {
"schema": {"type": "object", "required": ["username", "password"], "properties": {"username": {"type": "string"}, "password": {"type": "string"}}}
}
}
},
"responses": {
"200": {
"description": "Success",
"content": {
"application/json": {
"schema": {"type": "object", "required": ["token"], "properties": {"token": {"type": "string"}}},
"examples": {"simple": {"value": {"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpc3MiOiJleGFtcGxlLmNvbSIsInN1YiI6InRlc3QiLCJyb2wiOiJleHRlcm5hbCIsImlhdCI6MTc0NTgzMTQzN30.VMLz20aWI8nSd7ocT2W750Cy80OJs8OMiQCtq5Df0rE"}}}
}
}
},
"400": {"description": "Bad Request", "content": {"application/json": {"schema": {"type": "object", "required": ["message"], "properties": {"message": {"oneOf": [{"type": "string"}, {"type": "null"}]}}}, "examples": {"simple": {"value": {"message": "Invalid Authorization header"}}}}}},
"401": {"description": "Unauthorized", "content": {"application/json": {"schema": {"type": "object", "required": ["message"], "properties": {"message": {"oneOf": [{"type": "string"}, {"type": "null"}]}}}, "examples": {"simple": {"value": {"message": "Unauthorized"}}}}}},
"500": {"description": "Internal Server Error", "content": {"application/json": {"schema": {"type": "object", "required": ["message"], "properties": {"message": {"oneOf": [{"type": "string"}, {"type": "null"}]}}}, "examples": {"simple": {"value": {"message": "Unknown error"}}}}}}
}
}
},
"/wine/varieties": { "/wine/varieties": {
"get": { "get": {
"tags": ["Base Data"], "tags": ["Base Data"],