Files
tueit_Transkriptor/api/router.py
T

378 lines
15 KiB
Python

import asyncio
import os
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException, Header
from api.state import state, Status
from config import load as load_config
from output import list_transcripts, read_transcript
router = APIRouter()
_ws_clients: list[WebSocket] = []
def _guest_user() -> dict:
"""Return the first registered user — used for hotkey/tray-triggered recordings."""
from auth import _load_users
users = _load_users()
if not users:
raise RuntimeError("Kein Nutzer eingerichtet")
username, data = next(iter(users.items()))
return {"username": username, "output_dir": data["output_dir"], "is_admin": data.get("is_admin", False)}
# ---------------------------------------------------------------------------
# Auth dependency
# ---------------------------------------------------------------------------
async def current_user(authorization: Optional[str] = Header(None)) -> dict:
from auth import get_user_for_token
token = None
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
if not token:
raise HTTPException(status_code=401, detail="Nicht angemeldet")
user = get_user_for_token(token)
if not user:
raise HTTPException(status_code=401, detail="Ungültiger oder abgelaufener Token")
return user
# ---------------------------------------------------------------------------
# Auth endpoints (no current_user dependency — these are unauthenticated)
# ---------------------------------------------------------------------------
@router.post("/login")
async def login(body: dict):
from auth import authenticate
username = body.get("username", "")
password = body.get("password", "")
if not username or not password:
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
token = authenticate(username, password)
if not token:
raise HTTPException(status_code=401, detail="Ungültige Anmeldedaten")
return {"token": token, "username": username}
@router.post("/logout")
async def logout(authorization: Optional[str] = Header(None)):
from auth import invalidate_token
if authorization and authorization.startswith("Bearer "):
invalidate_token(authorization[7:])
return {"ok": True}
@router.get("/setup")
async def setup_page():
from fastapi.responses import FileResponse
from auth import has_users
from pathlib import Path
if has_users():
from fastapi.responses import RedirectResponse
return RedirectResponse("/")
return FileResponse(str(Path(__file__).parent.parent / "frontend" / "setup.html"))
@router.post("/setup")
async def setup_post(body: dict):
from auth import has_users, create_user
from config import load as load_config
if has_users():
raise HTTPException(status_code=403, detail="Bereits eingerichtet")
username = body.get("username", "").strip()
password = body.get("password", "")
if not username or len(password) < 6:
raise HTTPException(status_code=400, detail="Ungültige Eingabe")
cfg = load_config()
default_dir = cfg["output"]["path"]
output_dir = body.get("output_dir") or default_dir
create_user(username, password, output_dir, is_admin=True)
return {"ok": True}
# ---------------------------------------------------------------------------
# Protected endpoints
# ---------------------------------------------------------------------------
@router.get("/status")
async def get_status(user: dict = Depends(current_user)):
return {"status": state.status, "username": user["username"], "is_admin": user.get("is_admin", False)}
@router.post("/toggle")
async def toggle_recording(user: dict = Depends(current_user)):
from api.pipeline import run_pipeline
if state.status == Status.RECORDING:
asyncio.create_task(run_pipeline())
return {"action": "stopped"}
if state.status == Status.ERROR:
await state.set_status(Status.IDLE)
return {"action": "reset"}
if state.status == Status.IDLE:
from audio import AudioRecorder
cfg = load_config()
audio_device = cfg.get("audio", {}).get("device") or None
state._recorder = AudioRecorder(device=audio_device)
state._recorder.start()
state.recording_user = user["username"]
state._recording_output_dir = os.path.join(user["output_dir"], user["username"])
state._recording_instructions = user.get("instructions", "")
await state.set_status(Status.RECORDING)
return {"action": "started"}
return {"action": "busy", "status": state.status}
@router.post("/instructions")
async def set_instructions(body: dict, user: dict = Depends(current_user)):
user["instructions"] = body.get("instructions", "")
return {"ok": True}
@router.get("/transcripts")
async def get_transcripts(user: dict = Depends(current_user)):
user_dir = os.path.join(user["output_dir"], user["username"])
return list_transcripts(user_dir)
@router.get("/transcripts/{filename}")
async def get_transcript(filename: str, user: dict = Depends(current_user)):
from fastapi.responses import PlainTextResponse
user_dir = os.path.join(user["output_dir"], user["username"])
content = read_transcript(user_dir, filename)
if content is None:
raise HTTPException(status_code=404, detail="Nicht gefunden")
return PlainTextResponse(content)
@router.post("/transcripts/{filename}/reprocess")
async def reprocess_transcript(filename: str, body: dict, user: dict = Depends(current_user)):
from output import read_transcript
from fastapi.responses import PlainTextResponse
from llm import OllamaClient
user_dir = os.path.join(user["output_dir"], user["username"])
content = read_transcript(user_dir, filename)
if content is None:
raise HTTPException(status_code=404, detail="Nicht gefunden")
# Strip YAML frontmatter before sending to LLM
body_text = content
if content.startswith("---\n"):
end = content.find("\n---\n", 4)
if end != -1:
body_text = content[end + 5:].lstrip("\n")
cfg = load_config()
instructions = body.get("instructions", "")
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
refined = await client.refine(body_text, instructions=instructions, model=cfg["ollama"]["model"])
# Overwrite same file (keep filename stable, update frontmatter date)
from datetime import datetime
path = os.path.join(user_dir, filename)
with open(path, "w", encoding="utf-8") as f:
now = datetime.now()
f.write(f"---\ndate: {now.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n")
f.write(refined if refined.endswith("\n") else refined + "\n")
return PlainTextResponse(refined)
@router.delete("/transcripts/{filename}")
async def delete_transcript(filename: str, user: dict = Depends(current_user)):
user_dir = os.path.join(user["output_dir"], user["username"])
if os.path.basename(filename) != filename or not filename.endswith(".md"):
raise HTTPException(status_code=404, detail="Nicht gefunden")
path = os.path.join(user_dir, filename)
if not os.path.exists(path):
raise HTTPException(status_code=404, detail="Nicht gefunden")
os.unlink(path)
return {"ok": True}
@router.get("/config")
async def get_config(user: dict = Depends(current_user)):
return load_config()
@router.put("/config")
async def put_config(body: dict, user: dict = Depends(current_user)):
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern")
from config import _deep_merge, CONFIG_PATH
import tomli_w
cfg = load_config()
merged = _deep_merge(cfg, body)
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(merged, f)
return merged
@router.post("/open")
async def open_file(body: dict, user: dict = Depends(current_user)):
import subprocess, shutil
user_dir = os.path.join(user["output_dir"], user["username"])
abs_user_dir = os.path.abspath(user_dir)
# Accept either a single path or a list of paths (for 3-file recordings)
raw_paths = body.get("paths") or ([body.get("path")] if body.get("path") else [])
paths = [p for p in raw_paths if p and os.path.exists(p) and os.path.abspath(p).startswith(abs_user_dir)]
if not paths:
return {"ok": False}
mode = body.get("mode", "editor") # "editor" | "folder" | "obsidian"
if mode == "obsidian":
from urllib.parse import quote
cfg = load_config()
vault = cfg.get("obsidian", {}).get("vault", "").strip()
# If only the index was passed, also include sibling transkript/zusammenfassung
all_paths = list(paths)
for p in paths:
if p.endswith("-index.md"):
base = p[: -len("-index.md")]
for suffix in ("-transkript.md", "-zusammenfassung.md"):
sibling = base + suffix
if os.path.exists(sibling) and sibling not in all_paths:
all_paths.append(sibling)
open_target = all_paths[0]
if vault and os.path.isdir(vault):
for p in all_paths:
dest = os.path.join(vault, os.path.basename(p))
shutil.copy2(p, dest)
open_target = os.path.join(vault, os.path.basename(all_paths[0]))
uri = f"obsidian://open?path={quote(open_target, safe='/')}"
obsidian_bin = shutil.which("obsidian") or "/usr/bin/obsidian"
subprocess.Popen([obsidian_bin, uri])
elif mode == "folder" and shutil.which("dolphin"):
subprocess.Popen(["dolphin", "--select", paths[0]])
elif mode == "folder":
subprocess.Popen(["xdg-open", os.path.dirname(paths[0])])
else:
subprocess.Popen(["xdg-open", paths[0]])
return {"ok": True}
def _pactl_source_for_sd_name(sd_name: str) -> str:
"""Map a sounddevice device name to its pactl source name via description matching.
sounddevice strips the 'Monitor of ' prefix from pactl source descriptions.
Falls back to sd_name if no match found."""
import subprocess
try:
out = subprocess.check_output(
["pactl", "list", "sources"], stderr=subprocess.DEVNULL, timeout=5
).decode()
current_name = None
for line in out.splitlines():
line = line.strip()
if line.startswith("Name:"):
current_name = line.split(":", 1)[1].strip()
elif line.startswith("Description:") and current_name:
desc = line.split(":", 1)[1].strip().removeprefix("Monitor of ")
if desc == sd_name:
return current_name
current_name = None
except Exception:
pass
return sd_name
@router.get("/audio/devices")
async def list_audio_devices(user: dict = Depends(current_user)):
import sounddevice as sd
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren")
try:
devices = [
{"index": i, "name": d["name"]}
for i, d in enumerate(sd.query_devices())
if d["max_input_channels"] > 0
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"sounddevice fehlgeschlagen: {e}")
return devices
@router.post("/audio/combined")
async def create_combined_source(body: dict, user: dict = Depends(current_user)):
import subprocess, json, pathlib
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren")
mic_sd = body.get("mic", "")
monitor_sd = body.get("monitor", "")
if not mic_sd or not monitor_sd:
raise HTTPException(status_code=400, detail="mic und monitor erforderlich")
# Map sounddevice names → pactl source names for loopback commands
mic = _pactl_source_for_sd_name(mic_sd)
monitor = _pactl_source_for_sd_name(monitor_sd)
# Validate pactl names exist
out = subprocess.check_output(
["pactl", "list", "sources", "short"], stderr=subprocess.DEVNULL, timeout=5
).decode()
known = {line.split("\t")[1] for line in out.strip().splitlines() if "\t" in line}
if mic not in known or monitor not in known:
raise HTTPException(status_code=400, detail="Unbekanntes Audio-Device")
# Use description without spaces so sounddevice name == sink_name
sink_id = subprocess.check_output([
"pactl", "load-module", "module-null-sink",
"sink_name=transkriptor-combined",
"sink_properties=device.description=transkriptor-combined",
], timeout=5).decode().strip()
mic_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={mic}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
mon_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={monitor}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
state_path = pathlib.Path(
os.path.expanduser("~/.config/tueit-transcriber/pipewire-modules.json")
)
state_path.parent.mkdir(parents=True, exist_ok=True)
ids = [int(sink_id), int(mic_id), int(mon_id)]
# Store pactl names for restore, sounddevice name as device
state_path.write_text(json.dumps({"ids": ids, "mic": mic, "monitor": monitor}))
return {"device": "transkriptor-combined", "module_ids": ids}
@router.get("/settings")
async def settings_page_route():
from fastapi.responses import FileResponse
from pathlib import Path
return FileResponse(str(Path(__file__).parent.parent / "frontend" / "settings.html"))
@router.post("/speakers")
async def post_speakers(body: dict, user: dict = Depends(current_user)):
if state._speakers_event is None:
raise HTTPException(status_code=409, detail="Keine ausstehende Sprecher-Zuordnung")
state._speaker_names = {k: v for k, v in body.items() if isinstance(k, str)}
state._speakers_event.set()
return {"ok": True}
@router.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
from auth import get_user_for_token
token = ws.query_params.get("token", "")
user = get_user_for_token(token)
if not user:
await ws.close(code=4001)
return
await ws.accept()
_ws_clients.append(ws)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast(message: dict):
for ws in list(_ws_clients):
try:
await ws.send_json(message)
except Exception:
if ws in _ws_clients:
_ws_clients.remove(ws)