feat: multi-user auth — per-user spaces, pbkdf2 passwords, session tokens, login page
This commit is contained in:
+59
-23
@@ -1,6 +1,8 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
|
from typing import Optional
|
||||||
|
|
||||||
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException, Header
|
||||||
|
|
||||||
from api.state import state, Status
|
from api.state import state, Status
|
||||||
from config import load as load_config
|
from config import load as load_config
|
||||||
@@ -11,28 +13,54 @@ _ws_clients: list[WebSocket] = []
|
|||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Auth stub — replaced by auth.py in Task 13.
|
# Auth dependency
|
||||||
# current_user() returns a dict with at least {"username": str, "output_dir": str}.
|
|
||||||
# Until auth is wired up, every request runs as an anonymous guest using the
|
|
||||||
# global output path from config.
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def _guest_user() -> dict:
|
async def current_user(authorization: Optional[str] = Header(None)) -> dict:
|
||||||
cfg = load_config()
|
from auth import get_user_for_token
|
||||||
return {"username": "guest", "output_dir": cfg["output"]["path"]}
|
token = None
|
||||||
|
if authorization and authorization.startswith("Bearer "):
|
||||||
|
token = authorization[7:]
|
||||||
def current_user(user: dict = Depends(_guest_user)) -> dict:
|
if not token:
|
||||||
|
raise HTTPException(status_code=401, detail="Nicht angemeldet")
|
||||||
|
user = get_user_for_token(token)
|
||||||
|
if not user:
|
||||||
|
raise HTTPException(status_code=401, detail="Ungültiger oder abgelaufener Token")
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Endpoints
|
# Auth endpoints (no current_user dependency — these are unauthenticated)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@router.post("/login")
|
||||||
|
async def login(body: dict):
|
||||||
|
from auth import authenticate
|
||||||
|
username = body.get("username", "")
|
||||||
|
password = body.get("password", "")
|
||||||
|
if not username or not password:
|
||||||
|
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
|
||||||
|
token = authenticate(username, password)
|
||||||
|
if not token:
|
||||||
|
raise HTTPException(status_code=401, detail="Ungültige Anmeldedaten")
|
||||||
|
return {"token": token, "username": username}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/logout")
|
||||||
|
async def logout(authorization: Optional[str] = Header(None)):
|
||||||
|
from auth import invalidate_token
|
||||||
|
if authorization and authorization.startswith("Bearer "):
|
||||||
|
invalidate_token(authorization[7:])
|
||||||
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Protected endpoints
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
@router.get("/status")
|
@router.get("/status")
|
||||||
async def get_status():
|
async def get_status(user: dict = Depends(current_user)):
|
||||||
return {"status": state.status}
|
return {"status": state.status, "username": user["username"]}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/toggle")
|
@router.post("/toggle")
|
||||||
@@ -46,7 +74,7 @@ async def toggle_recording(user: dict = Depends(current_user)):
|
|||||||
state._recorder = AudioRecorder()
|
state._recorder = AudioRecorder()
|
||||||
state._recorder.start()
|
state._recorder.start()
|
||||||
state.recording_user = user["username"]
|
state.recording_user = user["username"]
|
||||||
state._recording_output_dir = user["output_dir"]
|
state._recording_output_dir = os.path.join(user["output_dir"], user["username"])
|
||||||
state._recording_instructions = user.get("instructions", "")
|
state._recording_instructions = user.get("instructions", "")
|
||||||
await state.set_status(Status.RECORDING)
|
await state.set_status(Status.RECORDING)
|
||||||
return {"action": "started"}
|
return {"action": "started"}
|
||||||
@@ -55,40 +83,48 @@ async def toggle_recording(user: dict = Depends(current_user)):
|
|||||||
|
|
||||||
@router.post("/instructions")
|
@router.post("/instructions")
|
||||||
async def set_instructions(body: dict, user: dict = Depends(current_user)):
|
async def set_instructions(body: dict, user: dict = Depends(current_user)):
|
||||||
# Instructions are per-user; stored on state only for the active recording.
|
|
||||||
# Full per-user persistence comes in Task 13.
|
|
||||||
user["instructions"] = body.get("instructions", "")
|
user["instructions"] = body.get("instructions", "")
|
||||||
return {"ok": True}
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/transcripts")
|
@router.get("/transcripts")
|
||||||
async def get_transcripts(user: dict = Depends(current_user)):
|
async def get_transcripts(user: dict = Depends(current_user)):
|
||||||
return list_transcripts(user["output_dir"])
|
user_dir = os.path.join(user["output_dir"], user["username"])
|
||||||
|
return list_transcripts(user_dir)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/config")
|
@router.get("/config")
|
||||||
async def get_config():
|
async def get_config(user: dict = Depends(current_user)):
|
||||||
return load_config()
|
return load_config()
|
||||||
|
|
||||||
|
|
||||||
@router.put("/config")
|
@router.put("/config")
|
||||||
async def put_config(body: dict):
|
async def put_config(body: dict, user: dict = Depends(current_user)):
|
||||||
|
if not user.get("is_admin"):
|
||||||
|
raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern")
|
||||||
cfg = load_config()
|
cfg = load_config()
|
||||||
cfg.update(body)
|
cfg.update(body)
|
||||||
return cfg
|
return cfg
|
||||||
|
|
||||||
|
|
||||||
@router.post("/open")
|
@router.post("/open")
|
||||||
async def open_file(body: dict):
|
async def open_file(body: dict, user: dict = Depends(current_user)):
|
||||||
import subprocess
|
import subprocess
|
||||||
path = body.get("path", "")
|
path = body.get("path", "")
|
||||||
if path and os.path.exists(path):
|
# Only allow opening files within the user's own output directory
|
||||||
|
user_dir = os.path.join(user["output_dir"], user["username"])
|
||||||
|
if path and os.path.exists(path) and os.path.abspath(path).startswith(os.path.abspath(user_dir)):
|
||||||
subprocess.Popen(["xdg-open", path])
|
subprocess.Popen(["xdg-open", path])
|
||||||
return {"ok": True}
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
@router.websocket("/ws")
|
@router.websocket("/ws")
|
||||||
async def websocket_endpoint(ws: WebSocket):
|
async def websocket_endpoint(ws: WebSocket, token: str = ""):
|
||||||
|
from auth import get_user_for_token
|
||||||
|
user = get_user_for_token(token)
|
||||||
|
if not user:
|
||||||
|
await ws.close(code=4001)
|
||||||
|
return
|
||||||
await ws.accept()
|
await ws.accept()
|
||||||
_ws_clients.append(ws)
|
_ws_clients.append(ws)
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -0,0 +1,128 @@
|
|||||||
|
import getpass
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
|
import secrets
|
||||||
|
import tomllib
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import tomli_w
|
||||||
|
|
||||||
|
USERS_PATH = os.path.expanduser("~/.config/tueit-transcriber/users.toml")
|
||||||
|
|
||||||
|
# In-memory session store: token → username
|
||||||
|
# Users must re-login after server restart — acceptable for a desktop app.
|
||||||
|
_sessions: dict[str, str] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _hash_password(password: str) -> str:
|
||||||
|
salt = secrets.token_hex(16)
|
||||||
|
key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex()
|
||||||
|
return f"{salt}:{key}"
|
||||||
|
|
||||||
|
|
||||||
|
def _verify_password(password: str, stored: str) -> bool:
|
||||||
|
try:
|
||||||
|
salt, key = stored.split(":", 1)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
new_key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex()
|
||||||
|
return secrets.compare_digest(new_key, key)
|
||||||
|
|
||||||
|
|
||||||
|
# ── User store ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def has_users() -> bool:
|
||||||
|
return bool(_load_users())
|
||||||
|
|
||||||
|
|
||||||
|
def _load_users() -> dict:
|
||||||
|
if not os.path.exists(USERS_PATH):
|
||||||
|
return {}
|
||||||
|
with open(USERS_PATH, "rb") as f:
|
||||||
|
return tomllib.load(f).get("users", {})
|
||||||
|
|
||||||
|
|
||||||
|
def _save_users(users: dict):
|
||||||
|
os.makedirs(os.path.dirname(USERS_PATH), exist_ok=True)
|
||||||
|
with open(USERS_PATH, "wb") as f:
|
||||||
|
tomli_w.dump({"users": users}, f)
|
||||||
|
|
||||||
|
|
||||||
|
def create_user(username: str, password: str, output_dir: str, is_admin: bool = False):
|
||||||
|
users = _load_users()
|
||||||
|
users[username] = {
|
||||||
|
"password_hash": _hash_password(password),
|
||||||
|
"output_dir": output_dir,
|
||||||
|
"is_admin": is_admin,
|
||||||
|
}
|
||||||
|
_save_users(users)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Session management ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def authenticate(username: str, password: str) -> Optional[str]:
|
||||||
|
"""Verify credentials. Returns a session token on success, None on failure."""
|
||||||
|
users = _load_users()
|
||||||
|
user = users.get(username)
|
||||||
|
if not user:
|
||||||
|
return None
|
||||||
|
if not _verify_password(password, user["password_hash"]):
|
||||||
|
return None
|
||||||
|
token = secrets.token_urlsafe(32)
|
||||||
|
_sessions[token] = username
|
||||||
|
return token
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_for_token(token: str) -> Optional[dict]:
|
||||||
|
"""Return user info dict for a valid token, or None."""
|
||||||
|
username = _sessions.get(token)
|
||||||
|
if not username:
|
||||||
|
return None
|
||||||
|
users = _load_users()
|
||||||
|
user = users.get(username)
|
||||||
|
if not user:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"username": username,
|
||||||
|
"output_dir": user["output_dir"],
|
||||||
|
"is_admin": user.get("is_admin", False),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def invalidate_token(token: str):
|
||||||
|
_sessions.pop(token, None)
|
||||||
|
|
||||||
|
|
||||||
|
# ── First-run setup wizard ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def setup_wizard():
|
||||||
|
"""Interactive console setup. Runs when no users exist yet."""
|
||||||
|
print("\n=== tüit Transkriptor — Ersteinrichtung ===\n")
|
||||||
|
print("Bitte richte den ersten Nutzer ein (wird Administrator).\n")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
username = input("Benutzername: ").strip()
|
||||||
|
if username:
|
||||||
|
break
|
||||||
|
print("Benutzername darf nicht leer sein.")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
password = getpass.getpass("Passwort: ")
|
||||||
|
confirm = getpass.getpass("Passwort bestätigen: ")
|
||||||
|
if password != confirm:
|
||||||
|
print("Passwörter stimmen nicht überein.")
|
||||||
|
continue
|
||||||
|
if len(password) < 6:
|
||||||
|
print("Passwort muss mindestens 6 Zeichen lang sein.")
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
|
||||||
|
default_dir = os.path.expanduser(f"~/Transkripte/{username}")
|
||||||
|
answer = input(f"Transkripte speichern unter [{default_dir}]: ").strip()
|
||||||
|
output_dir = answer if answer else default_dir
|
||||||
|
|
||||||
|
create_user(username, password, output_dir, is_admin=True)
|
||||||
|
|
||||||
|
print(f"\nNutzer '{username}' wurde angelegt.")
|
||||||
|
print(f"Transkripte werden gespeichert unter: {output_dir}")
|
||||||
|
print("\nWeitere Nutzer können später über die Web-Oberfläche hinzugefügt werden.\n")
|
||||||
@@ -21,6 +21,9 @@ DEFAULTS = {
|
|||||||
"~/cloud.shron.de/Hetzner Storagebox/work"
|
"~/cloud.shron.de/Hetzner Storagebox/work"
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
"network": {
|
||||||
|
"host": "127.0.0.1",
|
||||||
|
},
|
||||||
"pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"),
|
"pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,151 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="de">
|
||||||
|
<head>
|
||||||
|
<meta charset="UTF-8">
|
||||||
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
|
<title>tüit Transkriptor — Anmelden</title>
|
||||||
|
<link rel="preconnect" href="https://fonts.googleapis.com">
|
||||||
|
<link href="https://fonts.googleapis.com/css2?family=Overpass:wght@300;400;600;700&display=swap" rel="stylesheet">
|
||||||
|
<style>
|
||||||
|
:root {
|
||||||
|
--red: #DA251C;
|
||||||
|
--yellow: #FFD802;
|
||||||
|
--bg: #111;
|
||||||
|
--surface: #1a1a1a;
|
||||||
|
--surface2: #232323;
|
||||||
|
--text: #e8e8e8;
|
||||||
|
--muted: #888;
|
||||||
|
--border: #2e2e2e;
|
||||||
|
}
|
||||||
|
* { box-sizing: border-box; margin: 0; padding: 0; }
|
||||||
|
body {
|
||||||
|
font-family: 'Overpass', system-ui, sans-serif;
|
||||||
|
background: var(--bg);
|
||||||
|
color: var(--text);
|
||||||
|
min-height: 100vh;
|
||||||
|
display: flex;
|
||||||
|
flex-direction: column;
|
||||||
|
align-items: center;
|
||||||
|
justify-content: center;
|
||||||
|
}
|
||||||
|
.card {
|
||||||
|
background: var(--surface);
|
||||||
|
border: 1px solid var(--border);
|
||||||
|
border-radius: 12px;
|
||||||
|
padding: 40px;
|
||||||
|
width: 100%;
|
||||||
|
max-width: 360px;
|
||||||
|
}
|
||||||
|
.logo {
|
||||||
|
display: flex;
|
||||||
|
align-items: center;
|
||||||
|
gap: 10px;
|
||||||
|
margin-bottom: 32px;
|
||||||
|
}
|
||||||
|
.logo-dot { width: 12px; height: 12px; background: var(--red); border-radius: 50%; flex-shrink: 0; }
|
||||||
|
.logo h1 { font-size: 1.1rem; font-weight: 600; letter-spacing: 0.04em; }
|
||||||
|
.logo h1 span { color: var(--red); }
|
||||||
|
.field { display: flex; flex-direction: column; gap: 6px; margin-bottom: 16px; }
|
||||||
|
label { font-size: 0.78rem; color: var(--muted); text-transform: uppercase; letter-spacing: 0.06em; }
|
||||||
|
input {
|
||||||
|
background: var(--surface2);
|
||||||
|
border: 1px solid var(--border);
|
||||||
|
color: var(--text);
|
||||||
|
border-radius: 8px;
|
||||||
|
padding: 10px 12px;
|
||||||
|
font-family: inherit;
|
||||||
|
font-size: 0.95rem;
|
||||||
|
outline: none;
|
||||||
|
transition: border-color 0.15s;
|
||||||
|
width: 100%;
|
||||||
|
}
|
||||||
|
input:focus { border-color: var(--yellow); }
|
||||||
|
input::placeholder { color: var(--muted); }
|
||||||
|
button[type="submit"] {
|
||||||
|
width: 100%;
|
||||||
|
margin-top: 8px;
|
||||||
|
padding: 12px;
|
||||||
|
background: var(--red);
|
||||||
|
color: #fff;
|
||||||
|
border: none;
|
||||||
|
border-radius: 8px;
|
||||||
|
font-family: inherit;
|
||||||
|
font-size: 1rem;
|
||||||
|
font-weight: 600;
|
||||||
|
cursor: pointer;
|
||||||
|
transition: opacity 0.15s;
|
||||||
|
}
|
||||||
|
button[type="submit"]:hover { opacity: 0.88; }
|
||||||
|
button[type="submit"]:disabled { opacity: 0.5; cursor: default; }
|
||||||
|
#error {
|
||||||
|
display: none;
|
||||||
|
margin-top: 14px;
|
||||||
|
padding: 10px 12px;
|
||||||
|
background: rgba(218, 37, 28, 0.12);
|
||||||
|
border: 1px solid rgba(218, 37, 28, 0.4);
|
||||||
|
border-radius: 6px;
|
||||||
|
font-size: 0.85rem;
|
||||||
|
color: #ff6b6b;
|
||||||
|
}
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="card">
|
||||||
|
<div class="logo">
|
||||||
|
<div class="logo-dot"></div>
|
||||||
|
<h1>tüit <span>Transkriptor</span></h1>
|
||||||
|
</div>
|
||||||
|
<form id="login-form">
|
||||||
|
<div class="field">
|
||||||
|
<label for="username">Benutzername</label>
|
||||||
|
<input type="text" id="username" name="username" autocomplete="username" autofocus placeholder="Benutzername">
|
||||||
|
</div>
|
||||||
|
<div class="field">
|
||||||
|
<label for="password">Passwort</label>
|
||||||
|
<input type="password" id="password" name="password" autocomplete="current-password" placeholder="Passwort">
|
||||||
|
</div>
|
||||||
|
<button type="submit" id="submit-btn">Anmelden</button>
|
||||||
|
<div id="error"></div>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
<script>
|
||||||
|
const form = document.getElementById('login-form');
|
||||||
|
const errorEl = document.getElementById('error');
|
||||||
|
const submitBtn = document.getElementById('submit-btn');
|
||||||
|
|
||||||
|
form.addEventListener('submit', async (e) => {
|
||||||
|
e.preventDefault();
|
||||||
|
errorEl.style.display = 'none';
|
||||||
|
submitBtn.disabled = true;
|
||||||
|
submitBtn.textContent = 'Anmelden…';
|
||||||
|
|
||||||
|
// Read values directly — no innerHTML with untrusted data
|
||||||
|
const username = document.getElementById('username').value;
|
||||||
|
const password = document.getElementById('password').value;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const r = await fetch('/login', {
|
||||||
|
method: 'POST',
|
||||||
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify({ username, password }),
|
||||||
|
});
|
||||||
|
if (r.ok) {
|
||||||
|
const data = await r.json();
|
||||||
|
sessionStorage.setItem('token', data.token);
|
||||||
|
location.href = '/';
|
||||||
|
} else {
|
||||||
|
const data = await r.json().catch(() => ({}));
|
||||||
|
errorEl.textContent = data.detail || 'Anmeldung fehlgeschlagen.';
|
||||||
|
errorEl.style.display = 'block';
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
errorEl.textContent = 'Server nicht erreichbar.';
|
||||||
|
errorEl.style.display = 'block';
|
||||||
|
} finally {
|
||||||
|
submitBtn.disabled = false;
|
||||||
|
submitBtn.textContent = 'Anmelden';
|
||||||
|
}
|
||||||
|
});
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
@@ -29,6 +29,11 @@ async def index():
|
|||||||
return FileResponse(str(FRONTEND_DIR / "index.html"))
|
return FileResponse(str(FRONTEND_DIR / "index.html"))
|
||||||
|
|
||||||
|
|
||||||
|
@app.get("/login")
|
||||||
|
async def login_page():
|
||||||
|
return FileResponse(str(FRONTEND_DIR / "login.html"))
|
||||||
|
|
||||||
|
|
||||||
@app.get("/app.js")
|
@app.get("/app.js")
|
||||||
async def appjs():
|
async def appjs():
|
||||||
return FileResponse(str(FRONTEND_DIR / "app.js"))
|
return FileResponse(str(FRONTEND_DIR / "app.js"))
|
||||||
|
|||||||
+31
-1
@@ -1,10 +1,14 @@
|
|||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
_TEST_USER = {"username": "testuser", "output_dir": "/tmp", "is_admin": False}
|
||||||
|
|
||||||
|
|
||||||
def make_app():
|
def make_app():
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from api.router import router
|
from api.router import router, current_user
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
# Override auth for tests — no real credentials needed
|
||||||
|
app.dependency_overrides[current_user] = lambda: _TEST_USER
|
||||||
app.include_router(router)
|
app.include_router(router)
|
||||||
return app
|
return app
|
||||||
|
|
||||||
@@ -14,6 +18,7 @@ def test_status_returns_idle():
|
|||||||
r = client.get("/status")
|
r = client.get("/status")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert r.json()["status"] == "idle"
|
assert r.json()["status"] == "idle"
|
||||||
|
assert r.json()["username"] == "testuser"
|
||||||
|
|
||||||
|
|
||||||
def test_config_get_returns_dict():
|
def test_config_get_returns_dict():
|
||||||
@@ -28,3 +33,28 @@ def test_transcripts_returns_list():
|
|||||||
r = client.get("/transcripts")
|
r = client.get("/transcripts")
|
||||||
assert r.status_code == 200
|
assert r.status_code == 200
|
||||||
assert isinstance(r.json(), list)
|
assert isinstance(r.json(), list)
|
||||||
|
|
||||||
|
|
||||||
|
def test_status_requires_auth():
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from api.router import router
|
||||||
|
app = FastAPI()
|
||||||
|
app.include_router(router)
|
||||||
|
client = TestClient(app, raise_server_exceptions=False)
|
||||||
|
r = client.get("/status")
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_login_rejects_wrong_credentials():
|
||||||
|
import tempfile, os
|
||||||
|
from unittest.mock import patch
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from api.router import router
|
||||||
|
app = FastAPI()
|
||||||
|
app.include_router(router)
|
||||||
|
client = TestClient(app, raise_server_exceptions=False)
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
r = client.post("/login", json={"username": "nobody", "password": "wrong"})
|
||||||
|
assert r.status_code == 401
|
||||||
|
|||||||
@@ -0,0 +1,91 @@
|
|||||||
|
import importlib
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
|
def _fresh_auth(tmpdir):
|
||||||
|
"""Reload auth module with a temp users file and clear sessions."""
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
auth._sessions.clear()
|
||||||
|
return os.path.join(tmpdir, "users.toml")
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_users_false_when_empty():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
assert not auth.has_users()
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_and_authenticate():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
auth._sessions.clear()
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
auth.create_user("thomas", "geheim123", "/tmp/transkripte", is_admin=True)
|
||||||
|
token = auth.authenticate("thomas", "geheim123")
|
||||||
|
assert token is not None
|
||||||
|
assert len(token) > 10
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_wrong_password():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
auth._sessions.clear()
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
auth.create_user("thomas", "geheim123", "/tmp/transkripte")
|
||||||
|
assert auth.authenticate("thomas", "falsch") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_unknown_user():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
assert auth.authenticate("niemand", "irgendwas") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_user_for_token():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
auth._sessions.clear()
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
auth.create_user("anna", "secret456", "/tmp/anna")
|
||||||
|
token = auth.authenticate("anna", "secret456")
|
||||||
|
user = auth.get_user_for_token(token)
|
||||||
|
assert user["username"] == "anna"
|
||||||
|
assert user["output_dir"] == "/tmp/anna"
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalidate_token():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
auth._sessions.clear()
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
auth.create_user("bob", "pass789!", "/tmp/bob")
|
||||||
|
token = auth.authenticate("bob", "pass789!")
|
||||||
|
auth.invalidate_token(token)
|
||||||
|
assert auth.get_user_for_token(token) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_has_users_true_after_create():
|
||||||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||||||
|
import auth
|
||||||
|
importlib.reload(auth)
|
||||||
|
users_path = os.path.join(tmpdir, "users.toml")
|
||||||
|
with patch("auth.USERS_PATH", users_path):
|
||||||
|
auth.create_user("lisa", "abc123!", "/tmp/lisa")
|
||||||
|
assert auth.has_users()
|
||||||
Reference in New Issue
Block a user