Files
tueit_Transkriptor/api/router.py
T

187 lines
6.4 KiB
Python

import asyncio
import os
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException, Header
from api.state import state, Status
from config import load as load_config
from output import list_transcripts, read_transcript
router = APIRouter()
_ws_clients: list[WebSocket] = []
# ---------------------------------------------------------------------------
# Auth dependency
# ---------------------------------------------------------------------------
async def current_user(authorization: Optional[str] = Header(None)) -> dict:
from auth import get_user_for_token
token = None
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
if not token:
raise HTTPException(status_code=401, detail="Nicht angemeldet")
user = get_user_for_token(token)
if not user:
raise HTTPException(status_code=401, detail="Ungültiger oder abgelaufener Token")
return user
# ---------------------------------------------------------------------------
# Auth endpoints (no current_user dependency — these are unauthenticated)
# ---------------------------------------------------------------------------
@router.post("/login")
async def login(body: dict):
from auth import authenticate
username = body.get("username", "")
password = body.get("password", "")
if not username or not password:
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
token = authenticate(username, password)
if not token:
raise HTTPException(status_code=401, detail="Ungültige Anmeldedaten")
return {"token": token, "username": username}
@router.post("/logout")
async def logout(authorization: Optional[str] = Header(None)):
from auth import invalidate_token
if authorization and authorization.startswith("Bearer "):
invalidate_token(authorization[7:])
return {"ok": True}
@router.get("/setup")
async def setup_page():
from fastapi.responses import FileResponse
from auth import has_users
from pathlib import Path
if has_users():
from fastapi.responses import RedirectResponse
return RedirectResponse("/")
return FileResponse(str(Path(__file__).parent.parent / "frontend" / "setup.html"))
@router.post("/setup")
async def setup_post(body: dict):
from auth import has_users, create_user
from config import load as load_config
if has_users():
raise HTTPException(status_code=403, detail="Bereits eingerichtet")
username = body.get("username", "").strip()
password = body.get("password", "")
if not username or len(password) < 6:
raise HTTPException(status_code=400, detail="Ungültige Eingabe")
cfg = load_config()
default_dir = cfg["output"]["path"]
output_dir = body.get("output_dir") or default_dir
create_user(username, password, output_dir, is_admin=True)
return {"ok": True}
# ---------------------------------------------------------------------------
# Protected endpoints
# ---------------------------------------------------------------------------
@router.get("/status")
async def get_status(user: dict = Depends(current_user)):
return {"status": state.status, "username": user["username"]}
@router.post("/toggle")
async def toggle_recording(user: dict = Depends(current_user)):
from api.pipeline import run_pipeline
if state.status == Status.RECORDING:
asyncio.create_task(run_pipeline())
return {"action": "stopped"}
if state.status == Status.ERROR:
await state.set_status(Status.IDLE)
return {"action": "reset"}
if state.status == Status.IDLE:
from audio import AudioRecorder
state._recorder = AudioRecorder()
state._recorder.start()
state.recording_user = user["username"]
state._recording_output_dir = os.path.join(user["output_dir"], user["username"])
state._recording_instructions = user.get("instructions", "")
await state.set_status(Status.RECORDING)
return {"action": "started"}
return {"action": "busy", "status": state.status}
@router.post("/instructions")
async def set_instructions(body: dict, user: dict = Depends(current_user)):
user["instructions"] = body.get("instructions", "")
return {"ok": True}
@router.get("/transcripts")
async def get_transcripts(user: dict = Depends(current_user)):
user_dir = os.path.join(user["output_dir"], user["username"])
return list_transcripts(user_dir)
@router.get("/transcripts/{filename}")
async def get_transcript(filename: str, user: dict = Depends(current_user)):
from fastapi.responses import PlainTextResponse
user_dir = os.path.join(user["output_dir"], user["username"])
content = read_transcript(user_dir, filename)
if content is None:
raise HTTPException(status_code=404, detail="Nicht gefunden")
return PlainTextResponse(content)
@router.get("/config")
async def get_config(user: dict = Depends(current_user)):
return load_config()
@router.put("/config")
async def put_config(body: dict, user: dict = Depends(current_user)):
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern")
cfg = load_config()
cfg.update(body)
return cfg
@router.post("/open")
async def open_file(body: dict, user: dict = Depends(current_user)):
import subprocess
path = body.get("path", "")
# Only allow opening files within the user's own output directory
user_dir = os.path.join(user["output_dir"], user["username"])
if path and os.path.exists(path) and os.path.abspath(path).startswith(os.path.abspath(user_dir)):
subprocess.Popen(["xdg-open", path])
return {"ok": True}
@router.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
from auth import get_user_for_token
token = ws.query_params.get("token", "")
user = get_user_for_token(token)
if not user:
await ws.close(code=4001)
return
await ws.accept()
_ws_clients.append(ws)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast(message: dict):
for ws in list(_ws_clients):
try:
await ws.send_json(message)
except Exception:
if ws in _ws_clients:
_ws_clients.remove(ws)