From 6b0f2ba39a95ca692b2e31316dec6c600c263e77 Mon Sep 17 00:00:00 2001 From: "thomas.kopp" Date: Wed, 1 Apr 2026 02:04:40 +0200 Subject: [PATCH] =?UTF-8?q?docs:=20implementation=20plan=20for=20t=C3=BCit?= =?UTF-8?q?=20Transkriptor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/plans/2026-04-01-implementation.md | 1547 +++++++++++++++++++++++ 1 file changed, 1547 insertions(+) create mode 100644 docs/plans/2026-04-01-implementation.md diff --git a/docs/plans/2026-04-01-implementation.md b/docs/plans/2026-04-01-implementation.md new file mode 100644 index 0000000..4525f7f --- /dev/null +++ b/docs/plans/2026-04-01-implementation.md @@ -0,0 +1,1547 @@ +# tüit Transkriptor Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Build a local AI transcription desktop tool with system tray icon, audio capture, faster-whisper transcription, Ollama LLM post-processing, and a browser-based UI that saves Markdown files to a Nextcloud-synced folder. + +**Architecture:** pystray tray icon + FastAPI local server (port 8765) + browser UI. Audio captured via sounddevice, transcribed via faster-whisper (ROCm), refined via Ollama (gemma3:12b). SIGUSR1 toggles recording for Wayland-compatible hotkey support. + +**Tech Stack:** Python 3.11+, FastAPI, uvicorn, pystray, Pillow, sounddevice, faster-whisper, httpx, tomllib (stdlib 3.11+) + +--- + +### Task 1: Project Scaffold + +**Files:** +- Create: `requirements.txt` +- Create: `.gitignore` +- Create: `CLAUDE.md` + +**Step 1: Create requirements.txt** + +``` +fastapi>=0.111 +uvicorn[standard]>=0.29 +pystray>=0.19 +Pillow>=10.0 +sounddevice>=0.4.6 +faster-whisper>=1.0.3 +httpx>=0.27 +numpy>=1.26 +tomli_w>=1.0 +pytest>=8.0 +pytest-asyncio>=0.23 +``` + +**Step 2: Create .gitignore** + +``` +__pycache__/ +*.pyc +*.pyo +.venv/ +venv/ +*.egg-info/ +dist/ +.env +data/ +``` + +**Step 3: Create pytest.ini** + +```ini +[pytest] +asyncio_mode = auto +``` + +**Step 4: Create CLAUDE.md** + +```markdown +# CLAUDE.md — tüit Transkriptor + +Desktop transcription tool. Python, no Docker. + +## Key Commands + + # Install dependencies + pip install -r requirements.txt + + # Run + python main.py + + # Run tests + pytest -v + + # Trigger recording toggle via signal + pkill -USR1 -f main.py + +## Architecture + +See docs/plans/2026-04-01-desktop-transcription-design.md +``` + +**Step 5: Commit** + +```bash +git add requirements.txt .gitignore CLAUDE.md pytest.ini +git commit -m "chore: project scaffold" +``` + +--- + +### Task 2: Config Module + +**Files:** +- Create: `config.py` +- Create: `tests/__init__.py` +- Create: `tests/test_config.py` + +**Step 1: Write failing tests** + +```python +# tests/test_config.py +import os +import tempfile +from unittest.mock import patch + + +def test_config_loads_defaults(): + with tempfile.TemporaryDirectory() as tmpdir: + cfg_path = os.path.join(tmpdir, "config.toml") + with patch("config.CONFIG_PATH", cfg_path): + import importlib, config + importlib.reload(config) + cfg = config.load() + assert cfg["ollama"]["model"] == "gemma3:12b" + assert cfg["whisper"]["model"] == "large-v3" + assert cfg["server"]["port"] == 8765 + + +def test_config_creates_file_on_first_run(): + with tempfile.TemporaryDirectory() as tmpdir: + cfg_path = os.path.join(tmpdir, "config.toml") + with patch("config.CONFIG_PATH", cfg_path): + import importlib, config + importlib.reload(config) + config.load() + assert os.path.exists(cfg_path) +``` + +**Step 2: Run tests to verify they fail** + +```bash +pytest tests/test_config.py -v +``` +Expected: FAIL — `ModuleNotFoundError: No module named 'config'` + +**Step 3: Implement config.py** + +```python +import os +import tomllib + +CONFIG_PATH = os.path.expanduser("~/.config/tueit-transcriber/config.toml") + +DEFAULTS = { + "ollama": { + "base_url": "http://localhost:11434", + "model": "gemma3:12b", + }, + "whisper": { + "model": "large-v3", + "language": "de", + "device": "auto", # "auto" = use GPU if ROCm available, else CPU + }, + "server": { + "port": 8765, + }, + "output": { + "path": os.path.expanduser( + "~/cloud.shron.de/Hetzner Storagebox/work" + ), + }, + "pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"), +} + + +def load() -> dict: + os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True) + if not os.path.exists(CONFIG_PATH): + _write_defaults() + with open(CONFIG_PATH, "rb") as f: + on_disk = tomllib.load(f) + return _deep_merge(DEFAULTS, on_disk) + + +def _deep_merge(base: dict, override: dict) -> dict: + result = dict(base) + for k, v in override.items(): + if k in result and isinstance(result[k], dict) and isinstance(v, dict): + result[k] = _deep_merge(result[k], v) + else: + result[k] = v + return result + + +def _write_defaults(): + try: + import tomli_w + with open(CONFIG_PATH, "wb") as f: + tomli_w.dump(DEFAULTS, f) + except ImportError: + with open(CONFIG_PATH, "w") as f: + f.write("# tüit Transkriptor config\n\n") + f.write('[ollama]\nbase_url = "http://localhost:11434"\nmodel = "gemma3:12b"\n\n') + f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\n\n') + f.write('[server]\nport = 8765\n\n') + f.write(f'[output]\npath = "{DEFAULTS["output"]["path"]}"\n') +``` + +**Step 4: Run tests to verify they pass** + +```bash +pytest tests/test_config.py -v +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add config.py tests/__init__.py tests/test_config.py +git commit -m "feat: config module with TOML defaults" +``` + +--- + +### Task 3: Output Module + +**Files:** +- Create: `output.py` +- Create: `tests/test_output.py` + +**Step 1: Write failing tests** + +```python +# tests/test_output.py +import os +import tempfile +from datetime import datetime + + +def test_save_transcript_creates_file(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Test Aufnahme", + content="Dies ist ein Test.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + assert os.path.exists(path) + + +def test_save_transcript_filename_format(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Mein erstes Diktat", + content="Inhalt.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + assert os.path.basename(path) == "2026-04-01-1432-mein-erstes-diktat.md" + + +def test_save_transcript_contains_frontmatter(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Test", + content="Inhalt.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + text = open(path).read() + assert "---" in text + assert "date:" in text + assert "transkript" in text + + +def test_save_transcript_contains_content(): + with tempfile.TemporaryDirectory() as tmpdir: + from output import save_transcript + path = save_transcript( + title="Test", + content="Das ist der Inhalt.", + output_dir=tmpdir, + dt=datetime(2026, 4, 1, 14, 32, 0), + ) + assert "Das ist der Inhalt." in open(path).read() + + +def test_slugify(): + from output import slugify + assert slugify("Mein erstes Diktat") == "mein-erstes-diktat" + assert slugify("test -- foo") == "test-foo" +``` + +**Step 2: Run to verify failure** + +```bash +pytest tests/test_output.py -v +``` +Expected: FAIL + +**Step 3: Implement output.py** + +```python +import os +import re +import unicodedata +from datetime import datetime + + +def slugify(text: str) -> str: + for src, dst in [("ä","a"),("ö","o"),("ü","u"),("Ä","a"),("Ö","o"),("Ü","u"),("ß","ss")]: + text = text.replace(src, dst) + text = unicodedata.normalize("NFKD", text) + text = "".join(c for c in text if unicodedata.category(c) != "Mn") + text = text.lower() + text = re.sub(r"[^a-z0-9]+", "-", text) + return text.strip("-") + + +def save_transcript( + title: str, + content: str, + output_dir: str, + dt: datetime | None = None, +) -> str: + if dt is None: + dt = datetime.now() + slug = slugify(title)[:60] + filename = f"{dt.strftime('%Y-%m-%d-%H%M')}-{slug}.md" + os.makedirs(output_dir, exist_ok=True) + path = os.path.join(output_dir, filename) + with open(path, "w", encoding="utf-8") as f: + f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n") + f.write(f"# {title}\n\n") + f.write(content) + if not content.endswith("\n"): + f.write("\n") + return path + + +def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]: + if not os.path.exists(output_dir): + return [] + files = sorted( + [f for f in os.listdir(output_dir) if f.endswith(".md")], + reverse=True, + )[:limit] + result = [] + for f in files: + full = os.path.join(output_dir, f) + stat = os.stat(full) + result.append({"filename": f, "path": full, "size": stat.st_size, "mtime": stat.st_mtime}) + return result +``` + +**Step 4: Run tests to verify they pass** + +```bash +pytest tests/test_output.py -v +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add output.py tests/test_output.py +git commit -m "feat: output module — Markdown file writer with slugified filenames" +``` + +--- + +### Task 4: LLM Module + +**Files:** +- Create: `llm.py` +- Create: `tests/test_llm.py` + +**Step 1: Write failing tests** + +```python +# tests/test_llm.py +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + + +@pytest.mark.asyncio +async def test_refine_calls_ollama(): + from llm import OllamaClient + mock_response = MagicMock() + mock_response.json.return_value = {"response": "# Titel\n\nInhalt."} + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as MockClient: + instance = MockClient.return_value.__aenter__.return_value + instance.post = AsyncMock(return_value=mock_response) + client = OllamaClient(base_url="http://localhost:11434") + result = await client.refine( + raw_text="Das ist ein test.", + instructions="Mach eine Zusammenfassung.", + model="gemma3:12b", + ) + assert "Inhalt" in result + instance.post.assert_called_once() + + +@pytest.mark.asyncio +async def test_list_models_returns_list(): + from llm import OllamaClient + mock_response = MagicMock() + mock_response.json.return_value = {"models": [{"name": "gemma3:12b"}, {"name": "mistral:7b"}]} + mock_response.raise_for_status = MagicMock() + + with patch("httpx.AsyncClient") as MockClient: + instance = MockClient.return_value.__aenter__.return_value + instance.get = AsyncMock(return_value=mock_response) + client = OllamaClient(base_url="http://localhost:11434") + models = await client.list_models() + assert "gemma3:12b" in models +``` + +**Step 2: Run to verify failure** + +```bash +pytest tests/test_llm.py -v +``` +Expected: FAIL + +**Step 3: Implement llm.py** + +```python +import httpx + +SYSTEM_PROMPT = """Du bist ein präziser Schreibassistent. +Du bekommst einen rohen Sprachtranskript und optionale Instruktionen des Nutzers. +Deine Aufgabe: +1. Bereinige den Text (Füllwörter, Wiederholungen, Tippfehler) +2. Strukturiere ihn mit Markdown-Überschriften wenn sinnvoll +3. Erzeuge einen passenden deutschen Titel als H1 +4. Beachte Instruktionen des Nutzers wenn vorhanden +5. Antworte NUR mit dem fertigen Markdown — kein Kommentar, keine Erklärung + +Format: +# Titel + +Inhalt... +""" + + +class OllamaClient: + def __init__(self, base_url: str = "http://localhost:11434"): + self.base_url = base_url + + async def list_models(self) -> list[str]: + async with httpx.AsyncClient() as client: + r = await client.get(f"{self.base_url}/api/tags") + r.raise_for_status() + return [m["name"] for m in r.json().get("models", [])] + + async def refine( + self, + raw_text: str, + instructions: str = "", + model: str = "gemma3:12b", + ) -> str: + prompt = f"Transkript:\n{raw_text}" + if instructions.strip(): + prompt += f"\n\nInstruktionen:\n{instructions.strip()}" + async with httpx.AsyncClient(timeout=120) as client: + r = await client.post( + f"{self.base_url}/api/generate", + json={"model": model, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False}, + ) + r.raise_for_status() + return r.json()["response"] +``` + +**Step 4: Run tests to verify they pass** + +```bash +pytest tests/test_llm.py -v +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add llm.py tests/test_llm.py +git commit -m "feat: LLM module — Ollama client with transcript refinement" +``` + +--- + +### Task 5: Transcription Module + +**Files:** +- Create: `transcription.py` +- Create: `tests/test_transcription.py` + +**Step 1: Write failing tests** + +```python +# tests/test_transcription.py +import asyncio +from unittest.mock import MagicMock + + +def test_transcription_engine_is_singleton(): + from transcription import engine, TranscriptionEngine + assert isinstance(engine, TranscriptionEngine) + + +def test_transcribe_file_calls_whisper(tmp_path): + wav = tmp_path / "test.wav" + wav.write_bytes(b"\x00" * 100) + + mock_model = MagicMock() + mock_segment = MagicMock() + mock_segment.text = " Hallo Welt" + mock_model.transcribe.return_value = ([mock_segment], MagicMock()) + + from transcription import TranscriptionEngine + eng = TranscriptionEngine() + eng._model = mock_model + + result = asyncio.run(eng.transcribe_file(str(wav), language="de")) + assert result == "Hallo Welt" + mock_model.transcribe.assert_called_once_with(str(wav), language="de") +``` + +**Step 2: Run to verify failure** + +```bash +pytest tests/test_transcription.py -v +``` +Expected: FAIL + +**Step 3: Implement transcription.py** + +```python +import asyncio + + +class TranscriptionEngine: + _model = None + + def _get_model(self, model_name: str = "large-v3", device: str = "auto"): + if self._model is None: + from faster_whisper import WhisperModel + if device == "auto": + try: + self._model = WhisperModel(model_name, device="cuda", compute_type="float16") + except Exception: + self._model = WhisperModel(model_name, device="cpu", compute_type="int8") + else: + compute = "float16" if device in ("cuda", "rocm") else "int8" + self._model = WhisperModel(model_name, device=device, compute_type=compute) + return self._model + + async def transcribe_file( + self, + audio_path: str, + language: str = "de", + model_name: str = "large-v3", + device: str = "auto", + ) -> str: + loop = asyncio.get_event_loop() + model = self._get_model(model_name, device) + segments, _ = await loop.run_in_executor( + None, + lambda: model.transcribe(audio_path, language=language), + ) + return "".join(seg.text for seg in segments).strip() + + +engine = TranscriptionEngine() +``` + +**Step 4: Run tests to verify they pass** + +```bash +pytest tests/test_transcription.py -v +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add transcription.py tests/test_transcription.py +git commit -m "feat: transcription module — faster-whisper with ROCm auto-detect" +``` + +--- + +### Task 6: Audio Module + +**Files:** +- Create: `audio.py` +- Create: `tests/test_audio.py` + +**Step 1: Write failing tests** + +```python +# tests/test_audio.py +import numpy as np +from unittest.mock import patch, MagicMock + + +def test_recorder_starts_and_stops(): + from audio import AudioRecorder + with patch("sounddevice.InputStream") as MockStream: + mock_stream = MagicMock() + MockStream.return_value.start = MagicMock() + MockStream.return_value.stop = MagicMock() + MockStream.return_value.close = MagicMock() + recorder = AudioRecorder(sample_rate=16000) + assert not recorder.is_recording + recorder._stream = MockStream.return_value + recorder.is_recording = True + recorder.stop() + assert not recorder.is_recording + + +def test_recorder_save_wav(tmp_path): + import wave + from audio import AudioRecorder + recorder = AudioRecorder(sample_rate=16000) + recorder._buffer = [np.zeros(1600, dtype=np.int16)] + out = str(tmp_path / "test.wav") + recorder.save_wav(out) + with wave.open(out) as wf: + assert wf.getframerate() == 16000 + assert wf.getnchannels() == 1 +``` + +**Step 2: Run to verify failure** + +```bash +pytest tests/test_audio.py -v +``` +Expected: FAIL + +**Step 3: Implement audio.py** + +```python +import wave +import threading +import numpy as np + + +class AudioRecorder: + def __init__(self, sample_rate: int = 16000): + self.sample_rate = sample_rate + self._buffer: list[np.ndarray] = [] + self._stream = None + self.is_recording = False + self._lock = threading.Lock() + + def _callback(self, indata, frames, time, status): + if self.is_recording: + with self._lock: + self._buffer.append(indata[:, 0].copy().astype(np.int16)) + + def start(self): + import sounddevice as sd + self._buffer = [] + self.is_recording = True + self._stream = sd.InputStream( + samplerate=self.sample_rate, + channels=1, + dtype="int16", + callback=self._callback, + ) + self._stream.start() + + def stop(self): + self.is_recording = False + if self._stream: + self._stream.stop() + self._stream.close() + self._stream = None + + def save_wav(self, path: str) -> str: + with self._lock: + data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16) + with wave.open(path, "wb") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(self.sample_rate) + wf.writeframes(data.tobytes()) + return path +``` + +**Step 4: Run tests to verify they pass** + +```bash +pytest tests/test_audio.py -v +``` +Expected: PASS + +**Step 5: Commit** + +```bash +git add audio.py tests/test_audio.py +git commit -m "feat: audio module — sounddevice recorder with WAV export" +``` + +--- + +### Task 7: App State Module + +**Files:** +- Create: `api/__init__.py` +- Create: `api/state.py` + +**Step 1: Implement** + +```python +# api/__init__.py +# (empty) +``` + +```python +# api/state.py +import asyncio +from dataclasses import dataclass, field +from enum import Enum +from typing import Callable + + +class Status(str, Enum): + IDLE = "idle" + RECORDING = "recording" + PROCESSING = "processing" + ERROR = "error" + + +@dataclass +class AppState: + status: Status = Status.IDLE + instructions: str = "" + last_transcript_path: str | None = None + last_error: str | None = None + _listeners: list[Callable] = field(default_factory=list, repr=False) + + def subscribe(self, callback: Callable): + self._listeners.append(callback) + + async def notify(self): + for cb in self._listeners: + if asyncio.iscoroutinefunction(cb): + await cb(self) + else: + cb(self) + + async def set_status(self, status: Status): + self.status = status + await self.notify() + + +state = AppState() +``` + +**Step 2: Commit** + +```bash +git add api/__init__.py api/state.py +git commit -m "feat: app state module with status enum and subscriber pattern" +``` + +--- + +### Task 8: API Router + Pipeline + +**Files:** +- Create: `api/router.py` +- Create: `api/pipeline.py` +- Create: `tests/test_api.py` + +**Step 1: Write failing tests** + +```python +# tests/test_api.py +from fastapi.testclient import TestClient + + +def make_app(): + from fastapi import FastAPI + from api.router import router + app = FastAPI() + app.include_router(router) + return app + + +def test_status_returns_idle(): + client = TestClient(make_app()) + r = client.get("/status") + assert r.status_code == 200 + assert r.json()["status"] == "idle" + + +def test_config_get_returns_dict(): + client = TestClient(make_app()) + r = client.get("/config") + assert r.status_code == 200 + assert "ollama" in r.json() + + +def test_transcripts_returns_list(): + client = TestClient(make_app()) + r = client.get("/transcripts") + assert r.status_code == 200 + assert isinstance(r.json(), list) +``` + +**Step 2: Run to verify failure** + +```bash +pytest tests/test_api.py -v +``` +Expected: FAIL + +**Step 3: Implement api/router.py** + +```python +# api/router.py +import asyncio +import os +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +from api.state import state, Status +from config import load as load_config +from output import list_transcripts + +router = APIRouter() +_ws_clients: list[WebSocket] = [] + + +@router.get("/status") +async def get_status(): + return {"status": state.status, "instructions": state.instructions} + + +@router.post("/toggle") +async def toggle_recording(): + from api.pipeline import run_pipeline + if state.status == Status.RECORDING: + asyncio.create_task(run_pipeline()) + return {"action": "stopped"} + if state.status == Status.IDLE: + from audio import AudioRecorder + state._recorder = AudioRecorder() + state._recorder.start() + await state.set_status(Status.RECORDING) + return {"action": "started"} + return {"action": "busy", "status": state.status} + + +@router.post("/instructions") +async def set_instructions(body: dict): + state.instructions = body.get("instructions", "") + return {"ok": True} + + +@router.get("/transcripts") +async def get_transcripts(): + cfg = load_config() + return list_transcripts(cfg["output"]["path"]) + + +@router.get("/config") +async def get_config(): + return load_config() + + +@router.put("/config") +async def put_config(body: dict): + cfg = load_config() + cfg.update(body) + return cfg + + +@router.post("/open") +async def open_file(body: dict): + import subprocess + path = body.get("path", "") + if path and os.path.exists(path): + subprocess.Popen(["xdg-open", path]) + return {"ok": True} + + +@router.websocket("/ws") +async def websocket_endpoint(ws: WebSocket): + await ws.accept() + _ws_clients.append(ws) + try: + while True: + await ws.receive_text() + except WebSocketDisconnect: + if ws in _ws_clients: + _ws_clients.remove(ws) + + +async def broadcast(message: dict): + for ws in list(_ws_clients): + try: + await ws.send_json(message) + except Exception: + if ws in _ws_clients: + _ws_clients.remove(ws) +``` + +**Step 4: Implement api/pipeline.py** + +```python +# api/pipeline.py +import os +import tempfile + +from api.state import state, Status +from config import load as load_config +from transcription import engine as transcription_engine +from llm import OllamaClient +from output import save_transcript +from api.router import broadcast + + +async def run_pipeline(): + cfg = load_config() + recorder = getattr(state, "_recorder", None) + if recorder is None: + return + + recorder.stop() + await state.set_status(Status.PROCESSING) + await broadcast({"event": "processing"}) + + wav_path = None + try: + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: + wav_path = f.name + recorder.save_wav(wav_path) + + raw_text = await transcription_engine.transcribe_file( + wav_path, + language=cfg["whisper"]["language"], + model_name=cfg["whisper"]["model"], + device=cfg["whisper"]["device"], + ) + await broadcast({"event": "transcribed", "raw": raw_text}) + + client = OllamaClient(base_url=cfg["ollama"]["base_url"]) + refined = await client.refine( + raw_text=raw_text, + instructions=state.instructions, + model=cfg["ollama"]["model"], + ) + await broadcast({"event": "refined", "markdown": refined}) + + title = "Diktat" + for line in refined.splitlines(): + if line.startswith("# "): + title = line[2:].strip() + break + + path = save_transcript( + title=title, + content=refined, + output_dir=cfg["output"]["path"], + ) + state.last_transcript_path = path + await broadcast({"event": "saved", "path": path, "title": title}) + await state.set_status(Status.IDLE) + + except Exception as e: + state.last_error = str(e) + await state.set_status(Status.ERROR) + await broadcast({"event": "error", "message": str(e)}) + finally: + if wav_path: + try: + os.unlink(wav_path) + except OSError: + pass +``` + +**Step 5: Run tests to verify they pass** + +```bash +pytest tests/test_api.py -v +``` +Expected: PASS + +**Step 6: Commit** + +```bash +git add api/router.py api/pipeline.py tests/test_api.py +git commit -m "feat: API router + pipeline — toggle, status, transcripts, WebSocket" +``` + +--- + +### Task 9: Frontend + +**Files:** +- Create: `frontend/index.html` +- Create: `frontend/app.js` + +**Step 1: Create frontend/index.html** + +```html + + + + + + tüit Transkriptor + + + + + +
+
+

tüit Transkriptor

+ Bereit +
+
+
+ + Klicken zum Starten +
+ +
+ + +
+ +
+ +
Noch keine Aufnahme verarbeitet.
+
+ +
+ +
+
+
+ + + +``` + +**Step 2: Create frontend/app.js** + +Note: All DOM manipulation uses `textContent` and `createElement` — no `innerHTML` with untrusted data to prevent XSS. + +```javascript +const btn = document.getElementById('record-btn'); +const statusText = document.getElementById('status-text'); +const headerStatus = document.getElementById('header-status'); +const preview = document.getElementById('preview'); +const instructionsEl = document.getElementById('instructions'); +const transcriptList = document.getElementById('transcript-list'); + +const STATUS_LABELS = { + idle: 'Bereit', + recording: 'Aufnahme läuft\u2026', + processing: 'Wird verarbeitet\u2026', + error: 'Fehler', +}; + +instructionsEl.addEventListener('input', async () => { + await fetch('/instructions', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ instructions: instructionsEl.value }), + }); +}); + +function setStatus(status) { + btn.className = status; + headerStatus.className = `status-badge ${status}`; + const label = STATUS_LABELS[status] || status; + statusText.textContent = label; + headerStatus.textContent = label; + btn.disabled = status === 'processing'; +} + +btn.addEventListener('click', () => fetch('/toggle', { method: 'POST' })); + +function connectWs() { + const ws = new WebSocket(`ws://${location.host}/ws`); + ws.onmessage = (e) => { + const msg = JSON.parse(e.data); + if (msg.event === 'processing') setStatus('processing'); + if (msg.event === 'transcribed' || msg.event === 'refined') { + const text = msg.raw || msg.markdown || ''; + preview.textContent = text; + preview.classList.add('has-content'); + } + if (msg.event === 'saved') { + setStatus('idle'); + loadTranscripts(); + } + if (msg.event === 'error') { + setStatus('idle'); + preview.textContent = `Fehler: ${msg.message}`; + } + }; + ws.onclose = () => setTimeout(connectWs, 2000); +} + +async function loadTranscripts() { + const r = await fetch('/transcripts'); + const items = await r.json(); + + // Build DOM nodes — no innerHTML with untrusted data + transcriptList.replaceChildren( + ...items.map((t) => { + const div = document.createElement('div'); + div.className = 'transcript-item'; + + const name = document.createElement('span'); + name.textContent = t.filename.replace('.md', ''); + + const meta = document.createElement('span'); + meta.className = 'meta'; + meta.textContent = `${Math.round(t.size / 1024 * 10) / 10} KB`; + + div.append(name, meta); + div.addEventListener('click', () => { + fetch('/open', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ path: t.path }), + }); + }); + return div; + }) + ); +} + +(async () => { + const r = await fetch('/status'); + const data = await r.json(); + setStatus(data.status); + instructionsEl.value = data.instructions || ''; + connectWs(); + loadTranscripts(); +})(); +``` + +**Step 3: Commit** + +```bash +git add frontend/ +git commit -m "feat: browser UI — tüit CI dark theme, XSS-safe DOM rendering" +``` + +--- + +### Task 10: Main Entry Point + Tray + Signal Handler + +**Files:** +- Create: `main.py` + +**Step 1: Implement main.py** + +```python +import asyncio +import os +import signal +import threading +import time +import webbrowser +from pathlib import Path + +import uvicorn +from fastapi import FastAPI +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles +import pystray +from PIL import Image, ImageDraw + +from api.router import router +from api.state import state, Status +from config import load as load_config + +# ── FastAPI ──────────────────────────────────────────────────────────────────── + +app = FastAPI(title="tüit Transkriptor") +app.include_router(router) + +FRONTEND_DIR = Path(__file__).parent / "frontend" + + +@app.get("/") +async def index(): + return FileResponse(str(FRONTEND_DIR / "index.html")) + + +@app.get("/app.js") +async def appjs(): + return FileResponse(str(FRONTEND_DIR / "app.js")) + + +# ── PID file ─────────────────────────────────────────────────────────────────── + +def write_pid(): + cfg = load_config() + pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid")) + os.makedirs(os.path.dirname(pid_path), exist_ok=True) + Path(pid_path).write_text(str(os.getpid())) + + +def remove_pid(): + cfg = load_config() + pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid")) + try: + os.unlink(pid_path) + except FileNotFoundError: + pass + + +# ── SIGUSR1 → toggle ────────────────────────────────────────────────────────── + +_loop: asyncio.AbstractEventLoop | None = None + + +def _sigusr1_handler(signum, frame): + if _loop: + _loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_async_toggle())) + + +async def _async_toggle(): + from api.router import toggle_recording + await toggle_recording() + + +# ── Tray ─────────────────────────────────────────────────────────────────────── + +def _make_icon(recording: bool = False) -> Image.Image: + img = Image.new("RGBA", (64, 64), (0, 0, 0, 0)) + draw = ImageDraw.Draw(img) + color = (218, 37, 28, 255) if recording else (80, 80, 80, 255) + draw.ellipse([8, 8, 56, 56], fill=color) + return img + + +def run_tray(port: int): + icon = pystray.Icon( + "tueit-transcriber", + _make_icon(False), + "tüit Transkriptor", + menu=pystray.Menu( + pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: ( + _loop and _loop.call_soon_threadsafe( + lambda: asyncio.ensure_future(_async_toggle()) + ) + ), default=True), + pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")), + pystray.MenuItem("Beenden", lambda i, it: (remove_pid(), icon.stop(), os._exit(0))), + ), + ) + + def update_icon(s): + icon.icon = _make_icon(s.status == Status.RECORDING) + + state.subscribe(update_icon) + icon.run() + + +# ── Server ───────────────────────────────────────────────────────────────────── + +def run_server(port: int): + uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning") + + +# ── Entrypoint ───────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + cfg = load_config() + port = cfg["server"]["port"] + + write_pid() + signal.signal(signal.SIGUSR1, _sigusr1_handler) + + # Store event loop reference for signal handler + server_thread = threading.Thread(target=run_server, args=(port,), daemon=True) + server_thread.start() + + # Give uvicorn a moment to bind + time.sleep(0.8) + + # Capture the event loop uvicorn created + # (uvicorn runs its own loop in the server thread — we need a separate loop + # for the signal handler; SIGUSR1 triggers in the main thread) + _loop = asyncio.new_event_loop() + + webbrowser.open(f"http://localhost:{port}") + + try: + run_tray(port) + finally: + remove_pid() +``` + +**Step 2: Commit** + +```bash +git add main.py +git commit -m "feat: main entry point — FastAPI + pystray tray + SIGUSR1 signal handler" +``` + +--- + +### Task 11: install.sh + systemd User Service + +**Files:** +- Create: `install.sh` + +**Step 1: Create install.sh** + +```bash +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SERVICE_NAME="tueit-transcriber" +SERVICE_FILE="$HOME/.config/systemd/user/${SERVICE_NAME}.service" + +echo "=== tüit Transkriptor Installer ===" + +command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; } + +if ! command -v ollama >/dev/null 2>&1; then + echo "WARNING: ollama not found. Install from https://ollama.com" + echo " After install: ollama pull gemma3:12b" +fi + +if command -v rocminfo >/dev/null 2>&1; then + echo "ROCm detected — GPU acceleration available" +else + echo "INFO: ROCm not found — Whisper will run on CPU (slower)" + echo " To enable GPU: sudo pacman -S rocm-hip-sdk" +fi + +echo "Installing Python dependencies..." +pip install --user -r "$SCRIPT_DIR/requirements.txt" + +mkdir -p "$HOME/.config/systemd/user" +cat > "$SERVICE_FILE" <