Files
tueit_Transkriptor/api/router.py
T

325 lines
12 KiB
Python

import asyncio
import os
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException, Header
from api.state import state, Status
from config import load as load_config
from output import list_transcripts, read_transcript
router = APIRouter()
_ws_clients: list[WebSocket] = []
def _guest_user() -> dict:
"""Return the first registered user — used for hotkey/tray-triggered recordings."""
from auth import _load_users
users = _load_users()
if not users:
raise RuntimeError("Kein Nutzer eingerichtet")
username, data = next(iter(users.items()))
return {"username": username, "output_dir": data["output_dir"], "is_admin": data.get("is_admin", False)}
# ---------------------------------------------------------------------------
# Auth dependency
# ---------------------------------------------------------------------------
async def current_user(authorization: Optional[str] = Header(None)) -> dict:
from auth import get_user_for_token
token = None
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
if not token:
raise HTTPException(status_code=401, detail="Nicht angemeldet")
user = get_user_for_token(token)
if not user:
raise HTTPException(status_code=401, detail="Ungültiger oder abgelaufener Token")
return user
# ---------------------------------------------------------------------------
# Auth endpoints (no current_user dependency — these are unauthenticated)
# ---------------------------------------------------------------------------
@router.post("/login")
async def login(body: dict):
from auth import authenticate
username = body.get("username", "")
password = body.get("password", "")
if not username or not password:
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
token = authenticate(username, password)
if not token:
raise HTTPException(status_code=401, detail="Ungültige Anmeldedaten")
return {"token": token, "username": username}
@router.post("/logout")
async def logout(authorization: Optional[str] = Header(None)):
from auth import invalidate_token
if authorization and authorization.startswith("Bearer "):
invalidate_token(authorization[7:])
return {"ok": True}
@router.get("/setup")
async def setup_page():
from fastapi.responses import FileResponse
from auth import has_users
from pathlib import Path
if has_users():
from fastapi.responses import RedirectResponse
return RedirectResponse("/")
return FileResponse(str(Path(__file__).parent.parent / "frontend" / "setup.html"))
@router.post("/setup")
async def setup_post(body: dict):
from auth import has_users, create_user
from config import load as load_config
if has_users():
raise HTTPException(status_code=403, detail="Bereits eingerichtet")
username = body.get("username", "").strip()
password = body.get("password", "")
if not username or len(password) < 6:
raise HTTPException(status_code=400, detail="Ungültige Eingabe")
cfg = load_config()
default_dir = cfg["output"]["path"]
output_dir = body.get("output_dir") or default_dir
create_user(username, password, output_dir, is_admin=True)
return {"ok": True}
# ---------------------------------------------------------------------------
# Protected endpoints
# ---------------------------------------------------------------------------
@router.get("/status")
async def get_status(user: dict = Depends(current_user)):
return {"status": state.status, "username": user["username"], "is_admin": user.get("is_admin", False)}
@router.post("/toggle")
async def toggle_recording(user: dict = Depends(current_user)):
from api.pipeline import run_pipeline
if state.status == Status.RECORDING:
asyncio.create_task(run_pipeline())
return {"action": "stopped"}
if state.status == Status.ERROR:
await state.set_status(Status.IDLE)
return {"action": "reset"}
if state.status == Status.IDLE:
from audio import AudioRecorder
cfg = load_config()
audio_device = cfg.get("audio", {}).get("device") or None
state._recorder = AudioRecorder(device=audio_device)
state._recorder.start()
state.recording_user = user["username"]
state._recording_output_dir = os.path.join(user["output_dir"], user["username"])
state._recording_instructions = user.get("instructions", "")
await state.set_status(Status.RECORDING)
return {"action": "started"}
return {"action": "busy", "status": state.status}
@router.post("/instructions")
async def set_instructions(body: dict, user: dict = Depends(current_user)):
user["instructions"] = body.get("instructions", "")
return {"ok": True}
@router.get("/transcripts")
async def get_transcripts(user: dict = Depends(current_user)):
user_dir = os.path.join(user["output_dir"], user["username"])
return list_transcripts(user_dir)
@router.get("/transcripts/{filename}")
async def get_transcript(filename: str, user: dict = Depends(current_user)):
from fastapi.responses import PlainTextResponse
user_dir = os.path.join(user["output_dir"], user["username"])
content = read_transcript(user_dir, filename)
if content is None:
raise HTTPException(status_code=404, detail="Nicht gefunden")
return PlainTextResponse(content)
@router.post("/transcripts/{filename}/reprocess")
async def reprocess_transcript(filename: str, body: dict, user: dict = Depends(current_user)):
from output import read_transcript
from fastapi.responses import PlainTextResponse
from llm import OllamaClient
user_dir = os.path.join(user["output_dir"], user["username"])
content = read_transcript(user_dir, filename)
if content is None:
raise HTTPException(status_code=404, detail="Nicht gefunden")
# Strip YAML frontmatter before sending to LLM
body_text = content
if content.startswith("---\n"):
end = content.find("\n---\n", 4)
if end != -1:
body_text = content[end + 5:].lstrip("\n")
cfg = load_config()
instructions = body.get("instructions", "")
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
refined = await client.refine(body_text, instructions=instructions, model=cfg["ollama"]["model"])
# Overwrite same file (keep filename stable, update frontmatter date)
from datetime import datetime
path = os.path.join(user_dir, filename)
with open(path, "w", encoding="utf-8") as f:
now = datetime.now()
f.write(f"---\ndate: {now.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n")
f.write(refined if refined.endswith("\n") else refined + "\n")
return PlainTextResponse(refined)
@router.delete("/transcripts/{filename}")
async def delete_transcript(filename: str, user: dict = Depends(current_user)):
user_dir = os.path.join(user["output_dir"], user["username"])
if os.path.basename(filename) != filename or not filename.endswith(".md"):
raise HTTPException(status_code=404, detail="Nicht gefunden")
path = os.path.join(user_dir, filename)
if not os.path.exists(path):
raise HTTPException(status_code=404, detail="Nicht gefunden")
os.unlink(path)
return {"ok": True}
@router.get("/config")
async def get_config(user: dict = Depends(current_user)):
return load_config()
@router.put("/config")
async def put_config(body: dict, user: dict = Depends(current_user)):
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern")
from config import _deep_merge, CONFIG_PATH
import tomli_w
cfg = load_config()
merged = _deep_merge(cfg, body)
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(merged, f)
return merged
@router.post("/open")
async def open_file(body: dict, user: dict = Depends(current_user)):
import subprocess
path = body.get("path", "")
# Only allow opening files within the user's own output directory
user_dir = os.path.join(user["output_dir"], user["username"])
if path and os.path.exists(path) and os.path.abspath(path).startswith(os.path.abspath(user_dir)):
subprocess.Popen(["xdg-open", path])
return {"ok": True}
@router.get("/audio/devices")
async def list_audio_devices(user: dict = Depends(current_user)):
import subprocess
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren")
try:
out = subprocess.check_output(
["pactl", "list", "sources", "short"],
stderr=subprocess.DEVNULL, timeout=5,
).decode()
except Exception as e:
raise HTTPException(status_code=500, detail=f"pactl fehlgeschlagen: {e}")
devices = []
for line in out.strip().splitlines():
parts = line.split("\t")
if len(parts) >= 2:
devices.append({
"index": parts[0],
"name": parts[1],
"state": parts[4] if len(parts) > 4 else "",
})
return devices
@router.post("/audio/combined")
async def create_combined_source(body: dict, user: dict = Depends(current_user)):
import subprocess, json, pathlib
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren")
mic = body.get("mic", "")
monitor = body.get("monitor", "")
if not mic or not monitor:
raise HTTPException(status_code=400, detail="mic und monitor erforderlich")
# Validate: names must come from pactl list — no shell injection via user input
out = subprocess.check_output(
["pactl", "list", "sources", "short"], stderr=subprocess.DEVNULL, timeout=5
).decode()
known = {line.split("\t")[1] for line in out.strip().splitlines() if "\t" in line}
if mic not in known or monitor not in known:
raise HTTPException(status_code=400, detail="Unbekanntes Audio-Device")
sink_id = subprocess.check_output([
"pactl", "load-module", "module-null-sink",
"sink_name=transkriptor-combined",
"sink_properties=device.description=Transkriptor Combined",
], timeout=5).decode().strip()
mic_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={mic}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
mon_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={monitor}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
state_path = pathlib.Path(
os.path.expanduser("~/.config/tueit-transcriber/pipewire-modules.json")
)
state_path.parent.mkdir(parents=True, exist_ok=True)
ids = [int(sink_id), int(mic_id), int(mon_id)]
state_path.write_text(json.dumps({"ids": ids}))
return {"device": "transkriptor-combined.monitor", "module_ids": ids}
@router.get("/settings")
async def settings_page_route(user: dict = Depends(current_user)):
from fastapi.responses import FileResponse, RedirectResponse
from pathlib import Path
if not user.get("is_admin"):
return RedirectResponse("/")
return FileResponse(str(Path(__file__).parent.parent / "frontend" / "settings.html"))
@router.post("/speakers")
async def post_speakers(body: dict, user: dict = Depends(current_user)):
if state._speakers_event is None:
raise HTTPException(status_code=409, detail="Keine ausstehende Sprecher-Zuordnung")
state._speaker_names = {k: v for k, v in body.items() if isinstance(k, str)}
state._speakers_event.set()
return {"ok": True}
@router.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
from auth import get_user_for_token
token = ws.query_params.get("token", "")
user = get_user_for_token(token)
if not user:
await ws.close(code=4001)
return
await ws.accept()
_ws_clients.append(ws)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast(message: dict):
for ws in list(_ws_clients):
try:
await ws.send_json(message)
except Exception:
if ws in _ws_clients:
_ws_clients.remove(ws)