diff --git a/.gitignore b/.gitignore index e458ed5..36e402c 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,10 @@ .worktrees/ +__pycache__/ +*.pyc +*.pyo +.venv/ +venv/ +*.egg-info/ +dist/ +.env +data/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..291bea9 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,21 @@ +# CLAUDE.md — tüit Transkriptor + +Desktop transcription tool. Python, no Docker. + +## Key Commands + + # Install dependencies + pip install -r requirements.txt + + # Run + python main.py + + # Run tests + pytest -v + + # Trigger recording toggle via signal + pkill -USR1 -f main.py + +## Architecture + +See docs/plans/2026-04-01-desktop-transcription-design.md diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/pipeline.py b/api/pipeline.py new file mode 100644 index 0000000..b2b5cd0 --- /dev/null +++ b/api/pipeline.py @@ -0,0 +1,73 @@ +import os +import tempfile + +from api.state import state, Status +from config import load as load_config +from transcription import engine as transcription_engine +from llm import OllamaClient +from output import save_transcript +from api.router import broadcast + + +async def run_pipeline(): + cfg = load_config() + recorder = getattr(state, "_recorder", None) + if recorder is None: + return + + output_dir = getattr(state, "_recording_output_dir", cfg["output"]["path"]) + instructions = getattr(state, "_recording_instructions", "") + + recorder.stop() + await state.set_status(Status.PROCESSING) + await broadcast({"event": "processing"}) + + wav_path = None + try: + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: + wav_path = f.name + recorder.save_wav(wav_path) + + raw_text = await transcription_engine.transcribe_file( + wav_path, + language=cfg["whisper"]["language"], + model_name=cfg["whisper"]["model"], + device=cfg["whisper"]["device"], + ) + await broadcast({"event": "transcribed", "raw": raw_text}) + + client = OllamaClient(base_url=cfg["ollama"]["base_url"]) + refined = await client.refine( + raw_text=raw_text, + instructions=instructions, + model=cfg["ollama"]["model"], + ) + await broadcast({"event": "refined", "markdown": refined}) + + title = "Diktat" + for line in refined.splitlines(): + if line.startswith("# "): + title = line[2:].strip() + break + + path = save_transcript( + title=title, + content=refined, + output_dir=output_dir, + ) + await broadcast({"event": "saved", "path": path, "title": title}) + await state.set_status(Status.IDLE) + + except Exception as e: + state.last_error = str(e) + await state.set_status(Status.ERROR) + await broadcast({"event": "error", "message": str(e)}) + finally: + state.recording_user = None + state._recording_output_dir = None + state._recording_instructions = "" + if wav_path: + try: + os.unlink(wav_path) + except OSError: + pass diff --git a/api/router.py b/api/router.py new file mode 100644 index 0000000..42d8c98 --- /dev/null +++ b/api/router.py @@ -0,0 +1,144 @@ +import asyncio +import os +from typing import Optional + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException, Header + +from api.state import state, Status +from config import load as load_config +from output import list_transcripts + +router = APIRouter() +_ws_clients: list[WebSocket] = [] + + +# --------------------------------------------------------------------------- +# Auth dependency +# --------------------------------------------------------------------------- + +async def current_user(authorization: Optional[str] = Header(None)) -> dict: + from auth import get_user_for_token + token = None + if authorization and authorization.startswith("Bearer "): + token = authorization[7:] + if not token: + raise HTTPException(status_code=401, detail="Nicht angemeldet") + user = get_user_for_token(token) + if not user: + raise HTTPException(status_code=401, detail="Ungültiger oder abgelaufener Token") + return user + + +# --------------------------------------------------------------------------- +# Auth endpoints (no current_user dependency — these are unauthenticated) +# --------------------------------------------------------------------------- + +@router.post("/login") +async def login(body: dict): + from auth import authenticate + username = body.get("username", "") + password = body.get("password", "") + if not username or not password: + raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich") + token = authenticate(username, password) + if not token: + raise HTTPException(status_code=401, detail="Ungültige Anmeldedaten") + return {"token": token, "username": username} + + +@router.post("/logout") +async def logout(authorization: Optional[str] = Header(None)): + from auth import invalidate_token + if authorization and authorization.startswith("Bearer "): + invalidate_token(authorization[7:]) + return {"ok": True} + + +# --------------------------------------------------------------------------- +# Protected endpoints +# --------------------------------------------------------------------------- + +@router.get("/status") +async def get_status(user: dict = Depends(current_user)): + return {"status": state.status, "username": user["username"]} + + +@router.post("/toggle") +async def toggle_recording(user: dict = Depends(current_user)): + from api.pipeline import run_pipeline + if state.status == Status.RECORDING: + asyncio.create_task(run_pipeline()) + return {"action": "stopped"} + if state.status == Status.IDLE: + from audio import AudioRecorder + state._recorder = AudioRecorder() + state._recorder.start() + state.recording_user = user["username"] + state._recording_output_dir = os.path.join(user["output_dir"], user["username"]) + state._recording_instructions = user.get("instructions", "") + await state.set_status(Status.RECORDING) + return {"action": "started"} + return {"action": "busy", "status": state.status} + + +@router.post("/instructions") +async def set_instructions(body: dict, user: dict = Depends(current_user)): + user["instructions"] = body.get("instructions", "") + return {"ok": True} + + +@router.get("/transcripts") +async def get_transcripts(user: dict = Depends(current_user)): + user_dir = os.path.join(user["output_dir"], user["username"]) + return list_transcripts(user_dir) + + +@router.get("/config") +async def get_config(user: dict = Depends(current_user)): + return load_config() + + +@router.put("/config") +async def put_config(body: dict, user: dict = Depends(current_user)): + if not user.get("is_admin"): + raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern") + cfg = load_config() + cfg.update(body) + return cfg + + +@router.post("/open") +async def open_file(body: dict, user: dict = Depends(current_user)): + import subprocess + path = body.get("path", "") + # Only allow opening files within the user's own output directory + user_dir = os.path.join(user["output_dir"], user["username"]) + if path and os.path.exists(path) and os.path.abspath(path).startswith(os.path.abspath(user_dir)): + subprocess.Popen(["xdg-open", path]) + return {"ok": True} + + +@router.websocket("/ws") +async def websocket_endpoint(ws: WebSocket, token: str = ""): + from auth import get_user_for_token + user = get_user_for_token(token) + if not user: + await ws.close(code=4001) + return + await ws.accept() + _ws_clients.append(ws) + try: + while True: + await ws.receive_text() + except WebSocketDisconnect: + if ws in _ws_clients: + _ws_clients.remove(ws) + + +async def broadcast(message: dict): + for ws in list(_ws_clients): + try: + await ws.send_json(message) + except Exception: + if ws in _ws_clients: + _ws_clients.remove(ws) diff --git a/api/state.py b/api/state.py new file mode 100644 index 0000000..289466c --- /dev/null +++ b/api/state.py @@ -0,0 +1,36 @@ +import asyncio +from dataclasses import dataclass, field +from enum import Enum +from typing import Callable + + +class Status(str, Enum): + IDLE = "idle" + RECORDING = "recording" + PROCESSING = "processing" + ERROR = "error" + + +@dataclass +class AppState: + status: Status = Status.IDLE + recording_user: str | None = None # which user triggered the current recording + last_error: str | None = None + _listeners: list[Callable] = field(default_factory=list, repr=False) + + def subscribe(self, callback: Callable): + self._listeners.append(callback) + + async def notify(self): + for cb in self._listeners: + if asyncio.iscoroutinefunction(cb): + await cb(self) + else: + cb(self) + + async def set_status(self, status: Status): + self.status = status + await self.notify() + + +state = AppState() diff --git a/audio.py b/audio.py new file mode 100644 index 0000000..5b64345 --- /dev/null +++ b/audio.py @@ -0,0 +1,46 @@ +import wave +import threading +import numpy as np + + +class AudioRecorder: + def __init__(self, sample_rate: int = 16000): + self.sample_rate = sample_rate + self._buffer: list[np.ndarray] = [] + self._stream = None + self.is_recording = False + self._lock = threading.Lock() + + def _callback(self, indata, frames, time, status): + if self.is_recording: + with self._lock: + self._buffer.append(indata[:, 0].copy().astype(np.int16)) + + def start(self): + import sounddevice as sd + self._buffer = [] + self.is_recording = True + self._stream = sd.InputStream( + samplerate=self.sample_rate, + channels=1, + dtype="int16", + callback=self._callback, + ) + self._stream.start() + + def stop(self): + self.is_recording = False + if self._stream: + self._stream.stop() + self._stream.close() + self._stream = None + + def save_wav(self, path: str) -> str: + with self._lock: + data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16) + with wave.open(path, "wb") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(self.sample_rate) + wf.writeframes(data.tobytes()) + return path diff --git a/auth.py b/auth.py new file mode 100644 index 0000000..e3148a6 --- /dev/null +++ b/auth.py @@ -0,0 +1,128 @@ +import getpass +import hashlib +import os +import secrets +import tomllib +from typing import Optional + +import tomli_w + +USERS_PATH = os.path.expanduser("~/.config/tueit-transcriber/users.toml") + +# In-memory session store: token → username +# Users must re-login after server restart — acceptable for a desktop app. +_sessions: dict[str, str] = {} + + +def _hash_password(password: str) -> str: + salt = secrets.token_hex(16) + key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex() + return f"{salt}:{key}" + + +def _verify_password(password: str, stored: str) -> bool: + try: + salt, key = stored.split(":", 1) + except ValueError: + return False + new_key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex() + return secrets.compare_digest(new_key, key) + + +# ── User store ───────────────────────────────────────────────────────────────── + +def has_users() -> bool: + return bool(_load_users()) + + +def _load_users() -> dict: + if not os.path.exists(USERS_PATH): + return {} + with open(USERS_PATH, "rb") as f: + return tomllib.load(f).get("users", {}) + + +def _save_users(users: dict): + os.makedirs(os.path.dirname(USERS_PATH), exist_ok=True) + with open(USERS_PATH, "wb") as f: + tomli_w.dump({"users": users}, f) + + +def create_user(username: str, password: str, output_dir: str, is_admin: bool = False): + users = _load_users() + users[username] = { + "password_hash": _hash_password(password), + "output_dir": output_dir, + "is_admin": is_admin, + } + _save_users(users) + + +# ── Session management ───────────────────────────────────────────────────────── + +def authenticate(username: str, password: str) -> Optional[str]: + """Verify credentials. Returns a session token on success, None on failure.""" + users = _load_users() + user = users.get(username) + if not user: + return None + if not _verify_password(password, user["password_hash"]): + return None + token = secrets.token_urlsafe(32) + _sessions[token] = username + return token + + +def get_user_for_token(token: str) -> Optional[dict]: + """Return user info dict for a valid token, or None.""" + username = _sessions.get(token) + if not username: + return None + users = _load_users() + user = users.get(username) + if not user: + return None + return { + "username": username, + "output_dir": user["output_dir"], + "is_admin": user.get("is_admin", False), + } + + +def invalidate_token(token: str): + _sessions.pop(token, None) + + +# ── First-run setup wizard ───────────────────────────────────────────────────── + +def setup_wizard(): + """Interactive console setup. Runs when no users exist yet.""" + print("\n=== tüit Transkriptor — Ersteinrichtung ===\n") + print("Bitte richte den ersten Nutzer ein (wird Administrator).\n") + + while True: + username = input("Benutzername: ").strip() + if username: + break + print("Benutzername darf nicht leer sein.") + + while True: + password = getpass.getpass("Passwort: ") + confirm = getpass.getpass("Passwort bestätigen: ") + if password != confirm: + print("Passwörter stimmen nicht überein.") + continue + if len(password) < 6: + print("Passwort muss mindestens 6 Zeichen lang sein.") + continue + break + + default_dir = os.path.expanduser(f"~/Transkripte/{username}") + answer = input(f"Transkripte speichern unter [{default_dir}]: ").strip() + output_dir = answer if answer else default_dir + + create_user(username, password, output_dir, is_admin=True) + + print(f"\nNutzer '{username}' wurde angelegt.") + print(f"Transkripte werden gespeichert unter: {output_dir}") + print("\nWeitere Nutzer können später über die Web-Oberfläche hinzugefügt werden.\n") diff --git a/config.py b/config.py new file mode 100644 index 0000000..ceaeb4e --- /dev/null +++ b/config.py @@ -0,0 +1,61 @@ +import os +import tomllib + +CONFIG_PATH = os.path.expanduser("~/.config/tueit-transcriber/config.toml") + +DEFAULTS = { + "ollama": { + "base_url": "http://localhost:11434", + "model": "gemma3:12b", + }, + "whisper": { + "model": "large-v3", + "language": "de", + "device": "auto", # "auto" = use GPU if ROCm available, else CPU + }, + "server": { + "port": 8765, + }, + "output": { + "path": os.path.expanduser( + "~/cloud.shron.de/Hetzner Storagebox/work" + ), + }, + "network": { + "host": "127.0.0.1", + }, + "pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"), +} + + +def load() -> dict: + os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True) + if not os.path.exists(CONFIG_PATH): + _write_defaults() + with open(CONFIG_PATH, "rb") as f: + on_disk = tomllib.load(f) + return _deep_merge(DEFAULTS, on_disk) + + +def _deep_merge(base: dict, override: dict) -> dict: + result = dict(base) + for k, v in override.items(): + if k in result and isinstance(result[k], dict) and isinstance(v, dict): + result[k] = _deep_merge(result[k], v) + else: + result[k] = v + return result + + +def _write_defaults(): + try: + import tomli_w + with open(CONFIG_PATH, "wb") as f: + tomli_w.dump(DEFAULTS, f) + except ImportError: + with open(CONFIG_PATH, "w") as f: + f.write("# tüit Transkriptor config\n\n") + f.write('[ollama]\nbase_url = "http://localhost:11434"\nmodel = "gemma3:12b"\n\n') + f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\n\n') + f.write('[server]\nport = 8765\n\n') + f.write(f'[output]\npath = "{DEFAULTS["output"]["path"]}"\n') diff --git a/frontend/app.js b/frontend/app.js new file mode 100644 index 0000000..4cbb505 --- /dev/null +++ b/frontend/app.js @@ -0,0 +1,122 @@ +const btn = document.getElementById('record-btn'); +const statusText = document.getElementById('status-text'); +const headerStatus = document.getElementById('header-status'); +const preview = document.getElementById('preview'); +const instructionsEl = document.getElementById('instructions'); +const transcriptList = document.getElementById('transcript-list'); +const userChip = document.getElementById('user-chip'); +const logoutBtn = document.getElementById('logout-btn'); + +const STATUS_LABELS = { + idle: 'Bereit', + recording: 'Aufnahme läuft\u2026', + processing: 'Wird verarbeitet\u2026', + error: 'Fehler', +}; + +// Auth token is stored in sessionStorage so it's gone when the tab closes. +// On first load, if no token is present the server will redirect to /login. +const token = sessionStorage.getItem('token'); + +function authHeaders() { + return token ? { 'Authorization': `Bearer ${token}` } : {}; +} + +function apiFetch(url, options = {}) { + return fetch(url, { + ...options, + headers: { 'Content-Type': 'application/json', ...authHeaders(), ...(options.headers || {}) }, + }); +} + +logoutBtn.addEventListener('click', () => { + apiFetch('/logout', { method: 'POST' }).finally(() => { + sessionStorage.removeItem('token'); + location.href = '/login'; + }); +}); + +instructionsEl.addEventListener('input', async () => { + await apiFetch('/instructions', { + method: 'POST', + body: JSON.stringify({ instructions: instructionsEl.value }), + }); +}); + +function setStatus(status) { + btn.className = status; + headerStatus.className = `status-badge ${status}`; + const label = STATUS_LABELS[status] || status; + statusText.textContent = label; + headerStatus.textContent = label; + btn.disabled = status === 'processing'; +} + +btn.addEventListener('click', () => apiFetch('/toggle', { method: 'POST' })); + +function connectWs() { + const proto = location.protocol === 'https:' ? 'wss:' : 'ws:'; + const ws = new WebSocket(`${proto}//${location.host}/ws?token=${encodeURIComponent(token || '')}`); + ws.onmessage = (e) => { + const msg = JSON.parse(e.data); + if (msg.event === 'processing') setStatus('processing'); + if (msg.event === 'transcribed' || msg.event === 'refined') { + const text = msg.raw || msg.markdown || ''; + preview.textContent = text; + preview.classList.add('has-content'); + } + if (msg.event === 'saved') { + setStatus('idle'); + loadTranscripts(); + } + if (msg.event === 'error') { + setStatus('idle'); + preview.textContent = `Fehler: ${msg.message}`; + } + }; + ws.onclose = () => setTimeout(connectWs, 2000); +} + +async function loadTranscripts() { + const r = await apiFetch('/transcripts'); + if (!r.ok) return; + const items = await r.json(); + + transcriptList.replaceChildren( + ...items.map((t) => { + const div = document.createElement('div'); + div.className = 'transcript-item'; + + const name = document.createElement('span'); + name.textContent = t.filename.replace('.md', ''); + + const meta = document.createElement('span'); + meta.className = 'meta'; + meta.textContent = `${Math.round(t.size / 1024 * 10) / 10} KB`; + + div.append(name, meta); + div.addEventListener('click', () => { + apiFetch('/open', { + method: 'POST', + body: JSON.stringify({ path: t.path }), + }); + }); + return div; + }) + ); +} + +(async () => { + const r = await apiFetch('/status'); + if (r.status === 401) { + location.href = '/login'; + return; + } + const data = await r.json(); + setStatus(data.status); + if (data.username) { + userChip.textContent = data.username; + } + connectWs(); + loadTranscripts(); +})(); diff --git a/frontend/index.html b/frontend/index.html new file mode 100644 index 0000000..58bb770 --- /dev/null +++ b/frontend/index.html @@ -0,0 +1,170 @@ + + + + + + tüit Transkriptor + + + + + +
+
+

tüit Transkriptor

+
+ Bereit + + +
+
+
+
+ + Klicken zum Starten +
+ +
+ + +
+ +
+ +
Noch keine Aufnahme verarbeitet.
+
+ +
+ +
+
+
+ + + diff --git a/frontend/login.html b/frontend/login.html new file mode 100644 index 0000000..c37769e --- /dev/null +++ b/frontend/login.html @@ -0,0 +1,151 @@ + + + + + + tüit Transkriptor — Anmelden + + + + + +
+ +
+
+ + +
+
+ + +
+ +
+
+
+ + + diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..40906f9 --- /dev/null +++ b/install.sh @@ -0,0 +1,88 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SERVICE_NAME="tueit-transcriber" +SERVICE_FILE="$HOME/.config/systemd/user/${SERVICE_NAME}.service" + +echo "=== tüit Transkriptor Installer ===" + +command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; } + +if ! command -v ollama >/dev/null 2>&1; then + echo "WARNING: ollama not found. Install from https://ollama.com" + echo " Nach der Installation: ollama pull gemma3:12b" +fi + +if command -v rocminfo >/dev/null 2>&1; then + echo "ROCm erkannt — GPU-Beschleunigung verfügbar" +else + echo "INFO: ROCm nicht gefunden — Whisper läuft auf der CPU (langsamer)" + echo " Für GPU: sudo pacman -S rocm-hip-sdk" +fi + +echo "Python-Abhängigkeiten werden installiert..." +pip install --user -r "$SCRIPT_DIR/requirements.txt" + +# ── Netzwerk-Modus abfragen ──────────────────────────────────────────────────── + +echo "" +echo "Soll die App auch von anderen Geräten im Heimnetz erreichbar sein?" +echo " [1] Nur lokal (Standard, sicherer)" +echo " [2] Im Heimnetz (Windows, Android, andere Linux-Geräte)" +read -r -p "Auswahl [1/2]: " NET_MODE + +if [[ "$NET_MODE" == "2" ]]; then + HOST="0.0.0.0" + echo "INFO: App wird auf allen Netzwerk-Interfaces gestartet." + echo " Firewall: sudo ufw allow 8765/tcp" +else + HOST="127.0.0.1" +fi + +# Netzwerk-Host in Config schreiben, falls noch nicht vorhanden +CFG_FILE="$HOME/.config/tueit-transcriber/config.toml" +mkdir -p "$(dirname "$CFG_FILE")" +if ! grep -q "\[network\]" "$CFG_FILE" 2>/dev/null; then + printf '\n[network]\nhost = "%s"\n' "$HOST" >> "$CFG_FILE" + echo "Config aktualisiert: $CFG_FILE" +fi + +# ── Systemd User Service ─────────────────────────────────────────────────────── + +mkdir -p "$HOME/.config/systemd/user" +cat > "$SERVICE_FILE" < list[str]: + async with httpx.AsyncClient() as client: + r = await client.get(f"{self.base_url}/api/tags") + r.raise_for_status() + return [m["name"] for m in r.json().get("models", [])] + + async def refine( + self, + raw_text: str, + instructions: str = "", + model: str = "gemma3:12b", + ) -> str: + prompt = f"Transkript:\n{raw_text}" + if instructions.strip(): + prompt += f"\n\nInstruktionen:\n{instructions.strip()}" + async with httpx.AsyncClient(timeout=120) as client: + r = await client.post( + f"{self.base_url}/api/generate", + json={"model": model, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False}, + ) + r.raise_for_status() + return r.json()["response"] diff --git a/main.py b/main.py new file mode 100644 index 0000000..8292ec4 --- /dev/null +++ b/main.py @@ -0,0 +1,160 @@ +import asyncio +import os +import signal +import threading +import webbrowser +from pathlib import Path + +import uvicorn +from fastapi import FastAPI +from fastapi.responses import FileResponse, RedirectResponse +from fastapi.staticfiles import StaticFiles +import pystray +from PIL import Image, ImageDraw + +from api.router import router +from api.state import state, Status +from config import load as load_config + +# ── FastAPI ──────────────────────────────────────────────────────────────────── + +app = FastAPI(title="tüit Transkriptor") +app.include_router(router) + +FRONTEND_DIR = Path(__file__).parent / "frontend" + + +@app.get("/") +async def index(): + return FileResponse(str(FRONTEND_DIR / "index.html")) + + +@app.get("/login") +async def login_page(): + return FileResponse(str(FRONTEND_DIR / "login.html")) + + +@app.get("/app.js") +async def appjs(): + return FileResponse(str(FRONTEND_DIR / "app.js")) + + +# ── PID file ─────────────────────────────────────────────────────────────────── + +def write_pid(pid_path: str): + os.makedirs(os.path.dirname(pid_path), exist_ok=True) + Path(pid_path).write_text(str(os.getpid())) + + +def remove_pid(pid_path: str): + try: + os.unlink(pid_path) + except FileNotFoundError: + pass + + +# ── SIGUSR1 → toggle ────────────────────────────────────────────────────────── +# We capture uvicorn's event loop after it starts, so the signal handler can +# schedule the toggle coroutine in the correct loop — not a separate one. + +_uvicorn_loop: asyncio.AbstractEventLoop | None = None + + +def _sigusr1_handler(signum, frame): + if _uvicorn_loop: + _uvicorn_loop.call_soon_threadsafe( + lambda: asyncio.ensure_future(_async_toggle(), loop=_uvicorn_loop) + ) + + +async def _async_toggle(): + from api.router import toggle_recording + # Toggle without a real user dependency — use guest context for signal-triggered recordings. + from api.router import _guest_user + await toggle_recording(user=_guest_user()) + + +# ── Tray ─────────────────────────────────────────────────────────────────────── + +def _make_icon(recording: bool = False) -> Image.Image: + img = Image.new("RGBA", (64, 64), (0, 0, 0, 0)) + draw = ImageDraw.Draw(img) + color = (218, 37, 28, 255) if recording else (80, 80, 80, 255) + draw.ellipse([8, 8, 56, 56], fill=color) + return img + + +def run_tray(port: int): + icon = pystray.Icon( + "tueit-transcriber", + _make_icon(False), + "tüit Transkriptor", + menu=pystray.Menu( + pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: ( + _uvicorn_loop and _uvicorn_loop.call_soon_threadsafe( + lambda: asyncio.ensure_future(_async_toggle(), loop=_uvicorn_loop) + ) + ), default=True), + pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")), + pystray.MenuItem("Beenden", lambda i, it: (icon.stop(), os._exit(0))), + ), + ) + + def update_icon(s): + icon.icon = _make_icon(s.status == Status.RECORDING) + + state.subscribe(update_icon) + icon.run() + + +# ── Server ───────────────────────────────────────────────────────────────────── + +class _LoopCapture(uvicorn.Server): + """Subclass that exposes its event loop for the SIGUSR1 handler.""" + def install_signal_handlers(self): + # Disable uvicorn's own signal handlers so our SIGUSR1 handler works. + pass + + async def startup(self, sockets=None): + global _uvicorn_loop + _uvicorn_loop = asyncio.get_running_loop() + await super().startup(sockets=sockets) + + +def run_server(config: uvicorn.Config): + server = _LoopCapture(config) + server.run() + + +# ── Entrypoint ───────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + from auth import setup_wizard, has_users + if not has_users(): + setup_wizard() + + cfg = load_config() + port = cfg["server"]["port"] + host = cfg.get("network", {}).get("host", "127.0.0.1") + pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid")) + + write_pid(pid_path) + signal.signal(signal.SIGUSR1, _sigusr1_handler) + + uvicorn_cfg = uvicorn.Config(app, host=host, port=port, log_level="warning") + server_thread = threading.Thread(target=run_server, args=(uvicorn_cfg,), daemon=True) + server_thread.start() + + # Wait until uvicorn has captured its loop + import time + for _ in range(50): + if _uvicorn_loop is not None: + break + time.sleep(0.1) + + webbrowser.open(f"http://localhost:{port}") + + try: + run_tray(port) + finally: + remove_pid(pid_path) diff --git a/output.py b/output.py new file mode 100644 index 0000000..46635da --- /dev/null +++ b/output.py @@ -0,0 +1,50 @@ +import os +import re +import unicodedata +from datetime import datetime + + +def slugify(text: str) -> str: + for src, dst in [("ä","a"),("ö","o"),("ü","u"),("Ä","a"),("Ö","o"),("Ü","u"),("ß","ss")]: + text = text.replace(src, dst) + text = unicodedata.normalize("NFKD", text) + text = "".join(c for c in text if unicodedata.category(c) != "Mn") + text = text.lower() + text = re.sub(r"[^a-z0-9]+", "-", text) + return text.strip("-") + + +def save_transcript( + title: str, + content: str, + output_dir: str, + dt: datetime | None = None, +) -> str: + if dt is None: + dt = datetime.now() + slug = slugify(title)[:60] + filename = f"{dt.strftime('%Y-%m-%d-%H%M')}-{slug}.md" + os.makedirs(output_dir, exist_ok=True) + path = os.path.join(output_dir, filename) + with open(path, "w", encoding="utf-8") as f: + f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n") + f.write(f"# {title}\n\n") + f.write(content) + if not content.endswith("\n"): + f.write("\n") + return path + + +def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]: + if not os.path.exists(output_dir): + return [] + files = sorted( + [f for f in os.listdir(output_dir) if f.endswith(".md")], + reverse=True, + )[:limit] + result = [] + for f in files: + full = os.path.join(output_dir, f) + stat = os.stat(full) + result.append({"filename": f, "path": full, "size": stat.st_size, "mtime": stat.st_mtime}) + return result diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..2f4c80e --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode = auto diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6568721 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +fastapi>=0.111 +uvicorn[standard]>=0.29 +pystray>=0.19 +Pillow>=10.0 +sounddevice>=0.4.6 +faster-whisper>=1.0.3 +httpx>=0.27 +numpy>=1.26 +tomli_w>=1.0 +pytest>=8.0 +pytest-asyncio>=0.23 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..96b8f50 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,60 @@ +from fastapi.testclient import TestClient + +_TEST_USER = {"username": "testuser", "output_dir": "/tmp", "is_admin": False} + + +def make_app(): + from fastapi import FastAPI + from api.router import router, current_user + app = FastAPI() + # Override auth for tests — no real credentials needed + app.dependency_overrides[current_user] = lambda: _TEST_USER + app.include_router(router) + return app + + +def test_status_returns_idle(): + client = TestClient(make_app()) + r = client.get("/status") + assert r.status_code == 200 + assert r.json()["status"] == "idle" + assert r.json()["username"] == "testuser" + + +def test_config_get_returns_dict(): + client = TestClient(make_app()) + r = client.get("/config") + assert r.status_code == 200 + assert "ollama" in r.json() + + +def test_transcripts_returns_list(): + client = TestClient(make_app()) + r = client.get("/transcripts") + assert r.status_code == 200 + assert isinstance(r.json(), list) + + +def test_status_requires_auth(): + from fastapi import FastAPI + from api.router import router + app = FastAPI() + app.include_router(router) + client = TestClient(app, raise_server_exceptions=False) + r = client.get("/status") + assert r.status_code == 401 + + +def test_login_rejects_wrong_credentials(): + import tempfile, os + from unittest.mock import patch + from fastapi import FastAPI + from api.router import router + app = FastAPI() + app.include_router(router) + client = TestClient(app, raise_server_exceptions=False) + with tempfile.TemporaryDirectory() as tmpdir: + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + r = client.post("/login", json={"username": "nobody", "password": "wrong"}) + assert r.status_code == 401 diff --git a/tests/test_audio.py b/tests/test_audio.py new file mode 100644 index 0000000..fef3f84 --- /dev/null +++ b/tests/test_audio.py @@ -0,0 +1,29 @@ +import numpy as np +from unittest.mock import patch, MagicMock + + +def test_recorder_starts_and_stops(): + from audio import AudioRecorder + with patch("sounddevice.InputStream") as MockStream: + mock_stream = MagicMock() + MockStream.return_value.start = MagicMock() + MockStream.return_value.stop = MagicMock() + MockStream.return_value.close = MagicMock() + recorder = AudioRecorder(sample_rate=16000) + assert not recorder.is_recording + recorder._stream = MockStream.return_value + recorder.is_recording = True + recorder.stop() + assert not recorder.is_recording + + +def test_recorder_save_wav(tmp_path): + import wave + from audio import AudioRecorder + recorder = AudioRecorder(sample_rate=16000) + recorder._buffer = [np.zeros(1600, dtype=np.int16)] + out = str(tmp_path / "test.wav") + recorder.save_wav(out) + with wave.open(out) as wf: + assert wf.getframerate() == 16000 + assert wf.getnchannels() == 1 diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..f82b428 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,91 @@ +import importlib +import os +import tempfile +from unittest.mock import patch + + +def _fresh_auth(tmpdir): + """Reload auth module with a temp users file and clear sessions.""" + import auth + importlib.reload(auth) + auth._sessions.clear() + return os.path.join(tmpdir, "users.toml") + + +def test_has_users_false_when_empty(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + assert not auth.has_users() + + +def test_create_and_authenticate(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + auth._sessions.clear() + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + auth.create_user("thomas", "geheim123", "/tmp/transkripte", is_admin=True) + token = auth.authenticate("thomas", "geheim123") + assert token is not None + assert len(token) > 10 + + +def test_authenticate_wrong_password(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + auth._sessions.clear() + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + auth.create_user("thomas", "geheim123", "/tmp/transkripte") + assert auth.authenticate("thomas", "falsch") is None + + +def test_authenticate_unknown_user(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + assert auth.authenticate("niemand", "irgendwas") is None + + +def test_get_user_for_token(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + auth._sessions.clear() + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + auth.create_user("anna", "secret456", "/tmp/anna") + token = auth.authenticate("anna", "secret456") + user = auth.get_user_for_token(token) + assert user["username"] == "anna" + assert user["output_dir"] == "/tmp/anna" + + +def test_invalidate_token(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + auth._sessions.clear() + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + auth.create_user("bob", "pass789!", "/tmp/bob") + token = auth.authenticate("bob", "pass789!") + auth.invalidate_token(token) + assert auth.get_user_for_token(token) is None + + +def test_has_users_true_after_create(): + with tempfile.TemporaryDirectory() as tmpdir: + import auth + importlib.reload(auth) + users_path = os.path.join(tmpdir, "users.toml") + with patch("auth.USERS_PATH", users_path): + auth.create_user("lisa", "abc123!", "/tmp/lisa") + assert auth.has_users() diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..726f542 --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,25 @@ +import os +import tempfile +from unittest.mock import patch + + +def test_config_loads_defaults(): + with tempfile.TemporaryDirectory() as tmpdir: + cfg_path = os.path.join(tmpdir, "config.toml") + with patch("config.CONFIG_PATH", cfg_path): + import importlib, config + importlib.reload(config) + cfg = config.load() + assert cfg["ollama"]["model"] == "gemma3:12b" + assert cfg["whisper"]["model"] == "large-v3" + assert cfg["server"]["port"] == 8765 + + +def test_config_creates_file_on_first_run(): + with tempfile.TemporaryDirectory() as tmpdir: + import importlib, config + importlib.reload(config) + cfg_path = os.path.join(tmpdir, "config.toml") + with patch("config.CONFIG_PATH", cfg_path): + config.load() + assert os.path.exists(cfg_path) diff --git a/tests/test_llm.py b/tests/test_llm.py new file mode 100644 index 0000000..ea543cf --- /dev/null +++ b/tests/test_llm.py @@ -0,0 +1,37 @@ +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + + +@pytest.mark.asyncio +async def test_refine_calls_ollama(): + from llm import OllamaClient + mock_response = MagicMock() + mock_response.json.return_value = {"response": "# Titel\n\nInhalt."} + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as MockClient: + instance = MockClient.return_value.__aenter__.return_value + instance.post = AsyncMock(return_value=mock_response) + client = OllamaClient(base_url="http://localhost:11434") + result = await client.refine( + raw_text="Das ist ein test.", + instructions="Mach eine Zusammenfassung.", + model="gemma3:12b", + ) + assert "Inhalt" in result + instance.post.assert_called_once() + + +@pytest.mark.asyncio +async def test_list_models_returns_list(): + from llm import OllamaClient + mock_response = MagicMock() + mock_response.json.return_value = {"models": [{"name": "gemma3:12b"}, {"name": "mistral:7b"}]} + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as MockClient: + instance = MockClient.return_value.__aenter__.return_value + instance.get = AsyncMock(return_value=mock_response) + client = OllamaClient(base_url="http://localhost:11434") + models = await client.list_models() + assert "gemma3:12b" in models diff --git a/tests/test_output.py b/tests/test_output.py new file mode 100644 index 0000000..fef93d3 --- /dev/null +++ b/tests/test_output.py @@ -0,0 +1,60 @@ +import os +import tempfile +from datetime import datetime + + +def test_save_transcript_creates_file(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Test Aufnahme", + content="Dies ist ein Test.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + assert os.path.exists(path) + + +def test_save_transcript_filename_format(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Mein erstes Diktat", + content="Inhalt.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + assert os.path.basename(path) == "2026-04-01-1432-mein-erstes-diktat.md" + + +def test_save_transcript_contains_frontmatter(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Test", + content="Inhalt.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + text = open(path).read() + assert "---" in text + assert "date:" in text + assert "transkript" in text + + +def test_save_transcript_contains_content(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Test", + content="Das ist der Inhalt.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + assert "Das ist der Inhalt." in open(path).read() + + +def test_slugify(): + from output import slugify + assert slugify("Mein erstes Diktat") == "mein-erstes-diktat" + assert slugify("test -- foo") == "test-foo" diff --git a/tests/test_transcription.py b/tests/test_transcription.py new file mode 100644 index 0000000..e4b65bd --- /dev/null +++ b/tests/test_transcription.py @@ -0,0 +1,25 @@ +import asyncio +from unittest.mock import MagicMock + + +def test_transcription_engine_is_singleton(): + from transcription import engine, TranscriptionEngine + assert isinstance(engine, TranscriptionEngine) + + +def test_transcribe_file_calls_whisper(tmp_path): + wav = tmp_path / "test.wav" + wav.write_bytes(b"\x00" * 100) + + mock_model = MagicMock() + mock_segment = MagicMock() + mock_segment.text = " Hallo Welt" + mock_model.transcribe.return_value = ([mock_segment], MagicMock()) + + from transcription import TranscriptionEngine + eng = TranscriptionEngine() + eng._model = mock_model + + result = asyncio.run(eng.transcribe_file(str(wav), language="de")) + assert result == "Hallo Welt" + mock_model.transcribe.assert_called_once_with(str(wav), language="de") diff --git a/transcription.py b/transcription.py new file mode 100644 index 0000000..6de59d2 --- /dev/null +++ b/transcription.py @@ -0,0 +1,36 @@ +import asyncio + + +class TranscriptionEngine: + _model = None + + def _get_model(self, model_name: str = "large-v3", device: str = "auto"): + if self._model is None: + from faster_whisper import WhisperModel + if device == "auto": + try: + self._model = WhisperModel(model_name, device="cuda", compute_type="float16") + except Exception: + self._model = WhisperModel(model_name, device="cpu", compute_type="int8") + else: + compute = "float16" if device in ("cuda", "rocm") else "int8" + self._model = WhisperModel(model_name, device=device, compute_type=compute) + return self._model + + async def transcribe_file( + self, + audio_path: str, + language: str = "de", + model_name: str = "large-v3", + device: str = "auto", + ) -> str: + loop = asyncio.get_event_loop() + model = self._get_model(model_name, device) + segments, _ = await loop.run_in_executor( + None, + lambda: model.transcribe(audio_path, language=language), + ) + return "".join(seg.text for seg in segments).strip() + + +engine = TranscriptionEngine()