Files
tueit_Transkriptor/docs/plans/2026-04-01-implementation.md

41 KiB

tüit Transkriptor Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build a local AI transcription desktop tool with system tray icon, audio capture, faster-whisper transcription, Ollama LLM post-processing, and a browser-based UI that saves Markdown files to a Nextcloud-synced folder.

Architecture: pystray tray icon + FastAPI local server (port 8765) + browser UI. Audio captured via sounddevice, transcribed via faster-whisper (ROCm), refined via Ollama (gemma3:12b). SIGUSR1 toggles recording for Wayland-compatible hotkey support.

Tech Stack: Python 3.11+, FastAPI, uvicorn, pystray, Pillow, sounddevice, faster-whisper, httpx, tomllib (stdlib 3.11+)


Task 1: Project Scaffold

Files:

  • Create: requirements.txt
  • Create: .gitignore
  • Create: CLAUDE.md

Step 1: Create requirements.txt

fastapi>=0.111
uvicorn[standard]>=0.29
pystray>=0.19
Pillow>=10.0
sounddevice>=0.4.6
faster-whisper>=1.0.3
httpx>=0.27
numpy>=1.26
tomli_w>=1.0
pytest>=8.0
pytest-asyncio>=0.23

Step 2: Create .gitignore

__pycache__/
*.pyc
*.pyo
.venv/
venv/
*.egg-info/
dist/
.env
data/

Step 3: Create pytest.ini

[pytest]
asyncio_mode = auto

Step 4: Create CLAUDE.md

# CLAUDE.md — tüit Transkriptor

Desktop transcription tool. Python, no Docker.

## Key Commands

    # Install dependencies
    pip install -r requirements.txt

    # Run
    python main.py

    # Run tests
    pytest -v

    # Trigger recording toggle via signal
    pkill -USR1 -f main.py

## Architecture

See docs/plans/2026-04-01-desktop-transcription-design.md

Step 5: Commit

git add requirements.txt .gitignore CLAUDE.md pytest.ini
git commit -m "chore: project scaffold"

Task 2: Config Module

Files:

  • Create: config.py
  • Create: tests/__init__.py
  • Create: tests/test_config.py

Step 1: Write failing tests

# tests/test_config.py
import os
import tempfile
from unittest.mock import patch


def test_config_loads_defaults():
    with tempfile.TemporaryDirectory() as tmpdir:
        cfg_path = os.path.join(tmpdir, "config.toml")
        with patch("config.CONFIG_PATH", cfg_path):
            import importlib, config
            importlib.reload(config)
            cfg = config.load()
            assert cfg["ollama"]["model"] == "gemma3:12b"
            assert cfg["whisper"]["model"] == "large-v3"
            assert cfg["server"]["port"] == 8765


def test_config_creates_file_on_first_run():
    with tempfile.TemporaryDirectory() as tmpdir:
        cfg_path = os.path.join(tmpdir, "config.toml")
        with patch("config.CONFIG_PATH", cfg_path):
            import importlib, config
            importlib.reload(config)
            config.load()
            assert os.path.exists(cfg_path)

Step 2: Run tests to verify they fail

pytest tests/test_config.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'config'

Step 3: Implement config.py

import os
import tomllib

CONFIG_PATH = os.path.expanduser("~/.config/tueit-transcriber/config.toml")

DEFAULTS = {
    "ollama": {
        "base_url": "http://localhost:11434",
        "model": "gemma3:12b",
    },
    "whisper": {
        "model": "large-v3",
        "language": "de",
        "device": "auto",  # "auto" = use GPU if ROCm available, else CPU
    },
    "server": {
        "port": 8765,
    },
    "output": {
        "path": os.path.expanduser(
            "~/cloud.shron.de/Hetzner Storagebox/work"
        ),
    },
    "pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"),
}


def load() -> dict:
    os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
    if not os.path.exists(CONFIG_PATH):
        _write_defaults()
    with open(CONFIG_PATH, "rb") as f:
        on_disk = tomllib.load(f)
    return _deep_merge(DEFAULTS, on_disk)


def _deep_merge(base: dict, override: dict) -> dict:
    result = dict(base)
    for k, v in override.items():
        if k in result and isinstance(result[k], dict) and isinstance(v, dict):
            result[k] = _deep_merge(result[k], v)
        else:
            result[k] = v
    return result


def _write_defaults():
    try:
        import tomli_w
        with open(CONFIG_PATH, "wb") as f:
            tomli_w.dump(DEFAULTS, f)
    except ImportError:
        with open(CONFIG_PATH, "w") as f:
            f.write("# tüit Transkriptor config\n\n")
            f.write('[ollama]\nbase_url = "http://localhost:11434"\nmodel = "gemma3:12b"\n\n')
            f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\n\n')
            f.write('[server]\nport = 8765\n\n')
            f.write(f'[output]\npath = "{DEFAULTS["output"]["path"]}"\n')

Step 4: Run tests to verify they pass

pytest tests/test_config.py -v

Expected: PASS

Step 5: Commit

git add config.py tests/__init__.py tests/test_config.py
git commit -m "feat: config module with TOML defaults"

Task 3: Output Module

Files:

  • Create: output.py
  • Create: tests/test_output.py

Step 1: Write failing tests

# tests/test_output.py
import os
import tempfile
from datetime import datetime


def test_save_transcript_creates_file():
    with tempfile.TemporaryDirectory() as tmpdir:
        from output import save_transcript
        path = save_transcript(
            title="Test Aufnahme",
            content="Dies ist ein Test.",
            output_dir=tmpdir,
            dt=datetime(2026, 4, 1, 14, 32, 0),
        )
        assert os.path.exists(path)


def test_save_transcript_filename_format():
    with tempfile.TemporaryDirectory() as tmpdir:
        from output import save_transcript
        path = save_transcript(
            title="Mein erstes Diktat",
            content="Inhalt.",
            output_dir=tmpdir,
            dt=datetime(2026, 4, 1, 14, 32, 0),
        )
        assert os.path.basename(path) == "2026-04-01-1432-mein-erstes-diktat.md"


def test_save_transcript_contains_frontmatter():
    with tempfile.TemporaryDirectory() as tmpdir:
        from output import save_transcript
        path = save_transcript(
            title="Test",
            content="Inhalt.",
            output_dir=tmpdir,
            dt=datetime(2026, 4, 1, 14, 32, 0),
        )
        text = open(path).read()
        assert "---" in text
        assert "date:" in text
        assert "transkript" in text


def test_save_transcript_contains_content():
    with tempfile.TemporaryDirectory() as tmpdir:
        from output import save_transcript
        path = save_transcript(
            title="Test",
            content="Das ist der Inhalt.",
            output_dir=tmpdir,
            dt=datetime(2026, 4, 1, 14, 32, 0),
        )
        assert "Das ist der Inhalt." in open(path).read()


def test_slugify():
    from output import slugify
    assert slugify("Mein erstes Diktat") == "mein-erstes-diktat"
    assert slugify("test  --  foo") == "test-foo"

Step 2: Run to verify failure

pytest tests/test_output.py -v

Expected: FAIL

Step 3: Implement output.py

import os
import re
import unicodedata
from datetime import datetime


def slugify(text: str) -> str:
    for src, dst in [("ä","a"),("ö","o"),("ü","u"),("Ä","a"),("Ö","o"),("Ü","u"),("ß","ss")]:
        text = text.replace(src, dst)
    text = unicodedata.normalize("NFKD", text)
    text = "".join(c for c in text if unicodedata.category(c) != "Mn")
    text = text.lower()
    text = re.sub(r"[^a-z0-9]+", "-", text)
    return text.strip("-")


def save_transcript(
    title: str,
    content: str,
    output_dir: str,
    dt: datetime | None = None,
) -> str:
    if dt is None:
        dt = datetime.now()
    slug = slugify(title)[:60]
    filename = f"{dt.strftime('%Y-%m-%d-%H%M')}-{slug}.md"
    os.makedirs(output_dir, exist_ok=True)
    path = os.path.join(output_dir, filename)
    with open(path, "w", encoding="utf-8") as f:
        f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n")
        f.write(f"# {title}\n\n")
        f.write(content)
        if not content.endswith("\n"):
            f.write("\n")
    return path


def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]:
    if not os.path.exists(output_dir):
        return []
    files = sorted(
        [f for f in os.listdir(output_dir) if f.endswith(".md")],
        reverse=True,
    )[:limit]
    result = []
    for f in files:
        full = os.path.join(output_dir, f)
        stat = os.stat(full)
        result.append({"filename": f, "path": full, "size": stat.st_size, "mtime": stat.st_mtime})
    return result

Step 4: Run tests to verify they pass

pytest tests/test_output.py -v

Expected: PASS

Step 5: Commit

git add output.py tests/test_output.py
git commit -m "feat: output module — Markdown file writer with slugified filenames"

Task 4: LLM Module

Files:

  • Create: llm.py
  • Create: tests/test_llm.py

Step 1: Write failing tests

# tests/test_llm.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock


@pytest.mark.asyncio
async def test_refine_calls_ollama():
    from llm import OllamaClient
    mock_response = MagicMock()
    mock_response.json.return_value = {"response": "# Titel\n\nInhalt."}
    mock_response.raise_for_status = MagicMock()

    with patch("httpx.AsyncClient") as MockClient:
        instance = MockClient.return_value.__aenter__.return_value
        instance.post = AsyncMock(return_value=mock_response)
        client = OllamaClient(base_url="http://localhost:11434")
        result = await client.refine(
            raw_text="Das ist ein test.",
            instructions="Mach eine Zusammenfassung.",
            model="gemma3:12b",
        )
        assert "Inhalt" in result
        instance.post.assert_called_once()


@pytest.mark.asyncio
async def test_list_models_returns_list():
    from llm import OllamaClient
    mock_response = MagicMock()
    mock_response.json.return_value = {"models": [{"name": "gemma3:12b"}, {"name": "mistral:7b"}]}
    mock_response.raise_for_status = MagicMock()

    with patch("httpx.AsyncClient") as MockClient:
        instance = MockClient.return_value.__aenter__.return_value
        instance.get = AsyncMock(return_value=mock_response)
        client = OllamaClient(base_url="http://localhost:11434")
        models = await client.list_models()
        assert "gemma3:12b" in models

Step 2: Run to verify failure

pytest tests/test_llm.py -v

Expected: FAIL

Step 3: Implement llm.py

import httpx

SYSTEM_PROMPT = """Du bist ein präziser Schreibassistent.
Du bekommst einen rohen Sprachtranskript und optionale Instruktionen des Nutzers.
Deine Aufgabe:
1. Bereinige den Text (Füllwörter, Wiederholungen, Tippfehler)
2. Strukturiere ihn mit Markdown-Überschriften wenn sinnvoll
3. Erzeuge einen passenden deutschen Titel als H1
4. Beachte Instruktionen des Nutzers wenn vorhanden
5. Antworte NUR mit dem fertigen Markdown — kein Kommentar, keine Erklärung

Format:
# Titel

Inhalt...
"""


class OllamaClient:
    def __init__(self, base_url: str = "http://localhost:11434"):
        self.base_url = base_url

    async def list_models(self) -> list[str]:
        async with httpx.AsyncClient() as client:
            r = await client.get(f"{self.base_url}/api/tags")
            r.raise_for_status()
            return [m["name"] for m in r.json().get("models", [])]

    async def refine(
        self,
        raw_text: str,
        instructions: str = "",
        model: str = "gemma3:12b",
    ) -> str:
        prompt = f"Transkript:\n{raw_text}"
        if instructions.strip():
            prompt += f"\n\nInstruktionen:\n{instructions.strip()}"
        async with httpx.AsyncClient(timeout=120) as client:
            r = await client.post(
                f"{self.base_url}/api/generate",
                json={"model": model, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False},
            )
            r.raise_for_status()
            return r.json()["response"]

Step 4: Run tests to verify they pass

pytest tests/test_llm.py -v

Expected: PASS

Step 5: Commit

git add llm.py tests/test_llm.py
git commit -m "feat: LLM module — Ollama client with transcript refinement"

Task 5: Transcription Module

Files:

  • Create: transcription.py
  • Create: tests/test_transcription.py

Step 1: Write failing tests

# tests/test_transcription.py
import asyncio
from unittest.mock import MagicMock


def test_transcription_engine_is_singleton():
    from transcription import engine, TranscriptionEngine
    assert isinstance(engine, TranscriptionEngine)


def test_transcribe_file_calls_whisper(tmp_path):
    wav = tmp_path / "test.wav"
    wav.write_bytes(b"\x00" * 100)

    mock_model = MagicMock()
    mock_segment = MagicMock()
    mock_segment.text = " Hallo Welt"
    mock_model.transcribe.return_value = ([mock_segment], MagicMock())

    from transcription import TranscriptionEngine
    eng = TranscriptionEngine()
    eng._model = mock_model

    result = asyncio.run(eng.transcribe_file(str(wav), language="de"))
    assert result == "Hallo Welt"
    mock_model.transcribe.assert_called_once_with(str(wav), language="de")

Step 2: Run to verify failure

pytest tests/test_transcription.py -v

Expected: FAIL

Step 3: Implement transcription.py

import asyncio


class TranscriptionEngine:
    _model = None

    def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
        if self._model is None:
            from faster_whisper import WhisperModel
            if device == "auto":
                try:
                    self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
                except Exception:
                    self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
            else:
                compute = "float16" if device in ("cuda", "rocm") else "int8"
                self._model = WhisperModel(model_name, device=device, compute_type=compute)
        return self._model

    async def transcribe_file(
        self,
        audio_path: str,
        language: str = "de",
        model_name: str = "large-v3",
        device: str = "auto",
    ) -> str:
        loop = asyncio.get_event_loop()
        model = self._get_model(model_name, device)
        segments, _ = await loop.run_in_executor(
            None,
            lambda: model.transcribe(audio_path, language=language),
        )
        return "".join(seg.text for seg in segments).strip()


engine = TranscriptionEngine()

Step 4: Run tests to verify they pass

pytest tests/test_transcription.py -v

Expected: PASS

Step 5: Commit

git add transcription.py tests/test_transcription.py
git commit -m "feat: transcription module — faster-whisper with ROCm auto-detect"

Task 6: Audio Module

Files:

  • Create: audio.py
  • Create: tests/test_audio.py

Step 1: Write failing tests

# tests/test_audio.py
import numpy as np
from unittest.mock import patch, MagicMock


def test_recorder_starts_and_stops():
    from audio import AudioRecorder
    with patch("sounddevice.InputStream") as MockStream:
        mock_stream = MagicMock()
        MockStream.return_value.start = MagicMock()
        MockStream.return_value.stop = MagicMock()
        MockStream.return_value.close = MagicMock()
        recorder = AudioRecorder(sample_rate=16000)
        assert not recorder.is_recording
        recorder._stream = MockStream.return_value
        recorder.is_recording = True
        recorder.stop()
        assert not recorder.is_recording


def test_recorder_save_wav(tmp_path):
    import wave
    from audio import AudioRecorder
    recorder = AudioRecorder(sample_rate=16000)
    recorder._buffer = [np.zeros(1600, dtype=np.int16)]
    out = str(tmp_path / "test.wav")
    recorder.save_wav(out)
    with wave.open(out) as wf:
        assert wf.getframerate() == 16000
        assert wf.getnchannels() == 1

Step 2: Run to verify failure

pytest tests/test_audio.py -v

Expected: FAIL

Step 3: Implement audio.py

import wave
import threading
import numpy as np


class AudioRecorder:
    def __init__(self, sample_rate: int = 16000):
        self.sample_rate = sample_rate
        self._buffer: list[np.ndarray] = []
        self._stream = None
        self.is_recording = False
        self._lock = threading.Lock()

    def _callback(self, indata, frames, time, status):
        if self.is_recording:
            with self._lock:
                self._buffer.append(indata[:, 0].copy().astype(np.int16))

    def start(self):
        import sounddevice as sd
        self._buffer = []
        self.is_recording = True
        self._stream = sd.InputStream(
            samplerate=self.sample_rate,
            channels=1,
            dtype="int16",
            callback=self._callback,
        )
        self._stream.start()

    def stop(self):
        self.is_recording = False
        if self._stream:
            self._stream.stop()
            self._stream.close()
            self._stream = None

    def save_wav(self, path: str) -> str:
        with self._lock:
            data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16)
        with wave.open(path, "wb") as wf:
            wf.setnchannels(1)
            wf.setsampwidth(2)
            wf.setframerate(self.sample_rate)
            wf.writeframes(data.tobytes())
        return path

Step 4: Run tests to verify they pass

pytest tests/test_audio.py -v

Expected: PASS

Step 5: Commit

git add audio.py tests/test_audio.py
git commit -m "feat: audio module — sounddevice recorder with WAV export"

Task 7: App State Module

Files:

  • Create: api/__init__.py
  • Create: api/state.py

Step 1: Implement

# api/__init__.py
# (empty)
# api/state.py
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable


class Status(str, Enum):
    IDLE = "idle"
    RECORDING = "recording"
    PROCESSING = "processing"
    ERROR = "error"


@dataclass
class AppState:
    status: Status = Status.IDLE
    instructions: str = ""
    last_transcript_path: str | None = None
    last_error: str | None = None
    _listeners: list[Callable] = field(default_factory=list, repr=False)

    def subscribe(self, callback: Callable):
        self._listeners.append(callback)

    async def notify(self):
        for cb in self._listeners:
            if asyncio.iscoroutinefunction(cb):
                await cb(self)
            else:
                cb(self)

    async def set_status(self, status: Status):
        self.status = status
        await self.notify()


state = AppState()

Step 2: Commit

git add api/__init__.py api/state.py
git commit -m "feat: app state module with status enum and subscriber pattern"

Task 8: API Router + Pipeline

Files:

  • Create: api/router.py
  • Create: api/pipeline.py
  • Create: tests/test_api.py

Step 1: Write failing tests

# tests/test_api.py
from fastapi.testclient import TestClient


def make_app():
    from fastapi import FastAPI
    from api.router import router
    app = FastAPI()
    app.include_router(router)
    return app


def test_status_returns_idle():
    client = TestClient(make_app())
    r = client.get("/status")
    assert r.status_code == 200
    assert r.json()["status"] == "idle"


def test_config_get_returns_dict():
    client = TestClient(make_app())
    r = client.get("/config")
    assert r.status_code == 200
    assert "ollama" in r.json()


def test_transcripts_returns_list():
    client = TestClient(make_app())
    r = client.get("/transcripts")
    assert r.status_code == 200
    assert isinstance(r.json(), list)

Step 2: Run to verify failure

pytest tests/test_api.py -v

Expected: FAIL

Step 3: Implement api/router.py

# api/router.py
import asyncio
import os
from fastapi import APIRouter, WebSocket, WebSocketDisconnect

from api.state import state, Status
from config import load as load_config
from output import list_transcripts

router = APIRouter()
_ws_clients: list[WebSocket] = []


@router.get("/status")
async def get_status():
    return {"status": state.status, "instructions": state.instructions}


@router.post("/toggle")
async def toggle_recording():
    from api.pipeline import run_pipeline
    if state.status == Status.RECORDING:
        asyncio.create_task(run_pipeline())
        return {"action": "stopped"}
    if state.status == Status.IDLE:
        from audio import AudioRecorder
        state._recorder = AudioRecorder()
        state._recorder.start()
        await state.set_status(Status.RECORDING)
        return {"action": "started"}
    return {"action": "busy", "status": state.status}


@router.post("/instructions")
async def set_instructions(body: dict):
    state.instructions = body.get("instructions", "")
    return {"ok": True}


@router.get("/transcripts")
async def get_transcripts():
    cfg = load_config()
    return list_transcripts(cfg["output"]["path"])


@router.get("/config")
async def get_config():
    return load_config()


@router.put("/config")
async def put_config(body: dict):
    cfg = load_config()
    cfg.update(body)
    return cfg


@router.post("/open")
async def open_file(body: dict):
    import subprocess
    path = body.get("path", "")
    if path and os.path.exists(path):
        subprocess.Popen(["xdg-open", path])
    return {"ok": True}


@router.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    _ws_clients.append(ws)
    try:
        while True:
            await ws.receive_text()
    except WebSocketDisconnect:
        if ws in _ws_clients:
            _ws_clients.remove(ws)


async def broadcast(message: dict):
    for ws in list(_ws_clients):
        try:
            await ws.send_json(message)
        except Exception:
            if ws in _ws_clients:
                _ws_clients.remove(ws)

Step 4: Implement api/pipeline.py

# api/pipeline.py
import os
import tempfile

from api.state import state, Status
from config import load as load_config
from transcription import engine as transcription_engine
from llm import OllamaClient
from output import save_transcript
from api.router import broadcast


async def run_pipeline():
    cfg = load_config()
    recorder = getattr(state, "_recorder", None)
    if recorder is None:
        return

    recorder.stop()
    await state.set_status(Status.PROCESSING)
    await broadcast({"event": "processing"})

    wav_path = None
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            wav_path = f.name
        recorder.save_wav(wav_path)

        raw_text = await transcription_engine.transcribe_file(
            wav_path,
            language=cfg["whisper"]["language"],
            model_name=cfg["whisper"]["model"],
            device=cfg["whisper"]["device"],
        )
        await broadcast({"event": "transcribed", "raw": raw_text})

        client = OllamaClient(base_url=cfg["ollama"]["base_url"])
        refined = await client.refine(
            raw_text=raw_text,
            instructions=state.instructions,
            model=cfg["ollama"]["model"],
        )
        await broadcast({"event": "refined", "markdown": refined})

        title = "Diktat"
        for line in refined.splitlines():
            if line.startswith("# "):
                title = line[2:].strip()
                break

        path = save_transcript(
            title=title,
            content=refined,
            output_dir=cfg["output"]["path"],
        )
        state.last_transcript_path = path
        await broadcast({"event": "saved", "path": path, "title": title})
        await state.set_status(Status.IDLE)

    except Exception as e:
        state.last_error = str(e)
        await state.set_status(Status.ERROR)
        await broadcast({"event": "error", "message": str(e)})
    finally:
        if wav_path:
            try:
                os.unlink(wav_path)
            except OSError:
                pass

Step 5: Run tests to verify they pass

pytest tests/test_api.py -v

Expected: PASS

Step 6: Commit

git add api/router.py api/pipeline.py tests/test_api.py
git commit -m "feat: API router + pipeline — toggle, status, transcripts, WebSocket"

Task 9: Frontend

Files:

  • Create: frontend/index.html
  • Create: frontend/app.js

Step 1: Create frontend/index.html

<!DOCTYPE html>
<html lang="de">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>tüit Transkriptor</title>
  <link rel="preconnect" href="https://fonts.googleapis.com">
  <link href="https://fonts.googleapis.com/css2?family=Overpass:wght@300;400;600;700&display=swap" rel="stylesheet">
  <style>
    :root {
      --red: #DA251C;
      --yellow: #FFD802;
      --bg: #111;
      --surface: #1a1a1a;
      --surface2: #232323;
      --text: #e8e8e8;
      --muted: #888;
      --border: #2e2e2e;
    }
    * { box-sizing: border-box; margin: 0; padding: 0; }
    body {
      font-family: 'Overpass', system-ui, sans-serif;
      background: var(--bg);
      color: var(--text);
      min-height: 100vh;
      display: flex;
      flex-direction: column;
    }
    header {
      display: flex;
      align-items: center;
      gap: 12px;
      padding: 16px 24px;
      border-bottom: 1px solid var(--border);
    }
    .logo-dot { width: 12px; height: 12px; background: var(--red); border-radius: 50%; }
    header h1 { font-size: 1.1rem; font-weight: 600; letter-spacing: 0.04em; }
    header h1 span { color: var(--red); }
    .status-badge {
      margin-left: auto;
      font-size: 0.75rem;
      padding: 4px 10px;
      border-radius: 20px;
      background: var(--surface2);
      color: var(--muted);
      text-transform: uppercase;
      letter-spacing: 0.08em;
    }
    .status-badge.recording { background: var(--red); color: #fff; }
    .status-badge.processing { background: var(--yellow); color: #111; }
    main {
      flex: 1;
      display: flex;
      flex-direction: column;
      gap: 20px;
      padding: 24px;
      max-width: 800px;
      width: 100%;
      margin: 0 auto;
    }
    .record-section { display: flex; flex-direction: column; align-items: center; gap: 16px; }
    #record-btn {
      width: 96px; height: 96px; border-radius: 50%;
      background: var(--surface2); border: 3px solid var(--border);
      cursor: pointer; transition: all 0.15s ease;
      display: flex; align-items: center; justify-content: center;
      outline: none;
    }
    #record-btn:hover { border-color: var(--red); }
    #record-btn.recording { background: var(--red); border-color: var(--red); animation: pulse 1.4s infinite; }
    #record-btn.processing { background: var(--yellow); border-color: var(--yellow); cursor: default; }
    @keyframes pulse {
      0%,100% { box-shadow: 0 0 0 0 rgba(218,37,28,0.4); }
      50% { box-shadow: 0 0 0 16px rgba(218,37,28,0); }
    }
    .mic-icon { width: 36px; height: 36px; fill: var(--text); }
    #record-btn.recording .mic-icon { fill: #fff; }
    #record-btn.processing .mic-icon { fill: #111; }
    #status-text { font-size: 0.85rem; color: var(--muted); }
    .instructions-section { display: flex; flex-direction: column; gap: 8px; }
    label { font-size: 0.8rem; color: var(--muted); text-transform: uppercase; letter-spacing: 0.06em; }
    textarea {
      background: var(--surface); border: 1px solid var(--border);
      color: var(--text); border-radius: 8px; padding: 12px;
      font-family: inherit; font-size: 0.9rem; resize: vertical;
      min-height: 80px; outline: none; transition: border-color 0.15s;
    }
    textarea:focus { border-color: var(--yellow); }
    textarea::placeholder { color: var(--muted); }
    .preview-section { display: flex; flex-direction: column; gap: 8px; }
    #preview {
      background: var(--surface); border: 1px solid var(--border);
      border-radius: 8px; padding: 16px;
      font-size: 0.85rem; line-height: 1.6; color: var(--muted);
      min-height: 60px; white-space: pre-wrap; word-break: break-word;
    }
    #preview.has-content { color: var(--text); }
    .transcripts-section { display: flex; flex-direction: column; gap: 8px; }
    #transcript-list { display: flex; flex-direction: column; gap: 6px; }
    .transcript-item {
      background: var(--surface); border: 1px solid var(--border);
      border-radius: 6px; padding: 10px 14px;
      display: flex; align-items: center; justify-content: space-between;
      font-size: 0.82rem; cursor: pointer; transition: border-color 0.1s;
    }
    .transcript-item:hover { border-color: var(--red); }
  </style>
</head>
<body>
  <header>
    <div class="logo-dot"></div>
    <h1>tüit <span>Transkriptor</span></h1>
    <span class="status-badge" id="header-status">Bereit</span>
  </header>
  <main>
    <section class="record-section">
      <button id="record-btn" title="Aufnahme starten / stoppen">
        <svg class="mic-icon" viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg">
          <path d="M12 1a4 4 0 0 1 4 4v6a4 4 0 0 1-8 0V5a4 4 0 0 1 4-4zm0 2a2 2 0 0 0-2 2v6a2 2 0 0 0 4 0V5a2 2 0 0 0-2-2zM6.5 10.5A5.5 5.5 0 0 0 12 16a5.5 5.5 0 0 0 5.5-5.5h2A7.5 7.5 0 0 1 13 17.93V21h2v2H9v-2h2v-3.07A7.5 7.5 0 0 1 4.5 10.5h2z"/>
        </svg>
      </button>
      <span id="status-text">Klicken zum Starten</span>
    </section>

    <section class="instructions-section">
      <label for="instructions">Instruktionen für den Sekretär</label>
      <textarea
        id="instructions"
        placeholder="z.B. &quot;Heb die wichtigsten Punkte hervor&quot; · &quot;Erstelle ein Ticket&quot; · &quot;Mach ein Angebot daraus&quot;"
      ></textarea>
    </section>

    <section class="preview-section">
      <label>Vorschau</label>
      <div id="preview">Noch keine Aufnahme verarbeitet.</div>
    </section>

    <section class="transcripts-section">
      <label>Letzte Transkripte</label>
      <div id="transcript-list"></div>
    </section>
  </main>
  <script src="/app.js"></script>
</body>
</html>

Step 2: Create frontend/app.js

Note: All DOM manipulation uses textContent and createElement — no innerHTML with untrusted data to prevent XSS.

const btn = document.getElementById('record-btn');
const statusText = document.getElementById('status-text');
const headerStatus = document.getElementById('header-status');
const preview = document.getElementById('preview');
const instructionsEl = document.getElementById('instructions');
const transcriptList = document.getElementById('transcript-list');

const STATUS_LABELS = {
  idle: 'Bereit',
  recording: 'Aufnahme läuft\u2026',
  processing: 'Wird verarbeitet\u2026',
  error: 'Fehler',
};

instructionsEl.addEventListener('input', async () => {
  await fetch('/instructions', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ instructions: instructionsEl.value }),
  });
});

function setStatus(status) {
  btn.className = status;
  headerStatus.className = `status-badge ${status}`;
  const label = STATUS_LABELS[status] || status;
  statusText.textContent = label;
  headerStatus.textContent = label;
  btn.disabled = status === 'processing';
}

btn.addEventListener('click', () => fetch('/toggle', { method: 'POST' }));

function connectWs() {
  const ws = new WebSocket(`ws://${location.host}/ws`);
  ws.onmessage = (e) => {
    const msg = JSON.parse(e.data);
    if (msg.event === 'processing') setStatus('processing');
    if (msg.event === 'transcribed' || msg.event === 'refined') {
      const text = msg.raw || msg.markdown || '';
      preview.textContent = text;
      preview.classList.add('has-content');
    }
    if (msg.event === 'saved') {
      setStatus('idle');
      loadTranscripts();
    }
    if (msg.event === 'error') {
      setStatus('idle');
      preview.textContent = `Fehler: ${msg.message}`;
    }
  };
  ws.onclose = () => setTimeout(connectWs, 2000);
}

async function loadTranscripts() {
  const r = await fetch('/transcripts');
  const items = await r.json();

  // Build DOM nodes — no innerHTML with untrusted data
  transcriptList.replaceChildren(
    ...items.map((t) => {
      const div = document.createElement('div');
      div.className = 'transcript-item';

      const name = document.createElement('span');
      name.textContent = t.filename.replace('.md', '');

      const meta = document.createElement('span');
      meta.className = 'meta';
      meta.textContent = `${Math.round(t.size / 1024 * 10) / 10} KB`;

      div.append(name, meta);
      div.addEventListener('click', () => {
        fetch('/open', {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify({ path: t.path }),
        });
      });
      return div;
    })
  );
}

(async () => {
  const r = await fetch('/status');
  const data = await r.json();
  setStatus(data.status);
  instructionsEl.value = data.instructions || '';
  connectWs();
  loadTranscripts();
})();

Step 3: Commit

git add frontend/
git commit -m "feat: browser UI — tüit CI dark theme, XSS-safe DOM rendering"

Task 10: Main Entry Point + Tray + Signal Handler

Files:

  • Create: main.py

Step 1: Implement main.py

import asyncio
import os
import signal
import threading
import time
import webbrowser
from pathlib import Path

import uvicorn
from fastapi import FastAPI
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import pystray
from PIL import Image, ImageDraw

from api.router import router
from api.state import state, Status
from config import load as load_config

# ── FastAPI ────────────────────────────────────────────────────────────────────

app = FastAPI(title="tüit Transkriptor")
app.include_router(router)

FRONTEND_DIR = Path(__file__).parent / "frontend"


@app.get("/")
async def index():
    return FileResponse(str(FRONTEND_DIR / "index.html"))


@app.get("/app.js")
async def appjs():
    return FileResponse(str(FRONTEND_DIR / "app.js"))


# ── PID file ───────────────────────────────────────────────────────────────────

def write_pid():
    cfg = load_config()
    pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid"))
    os.makedirs(os.path.dirname(pid_path), exist_ok=True)
    Path(pid_path).write_text(str(os.getpid()))


def remove_pid():
    cfg = load_config()
    pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid"))
    try:
        os.unlink(pid_path)
    except FileNotFoundError:
        pass


# ── SIGUSR1 → toggle ──────────────────────────────────────────────────────────

_loop: asyncio.AbstractEventLoop | None = None


def _sigusr1_handler(signum, frame):
    if _loop:
        _loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_async_toggle()))


async def _async_toggle():
    from api.router import toggle_recording
    await toggle_recording()


# ── Tray ───────────────────────────────────────────────────────────────────────

def _make_icon(recording: bool = False) -> Image.Image:
    img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
    draw = ImageDraw.Draw(img)
    color = (218, 37, 28, 255) if recording else (80, 80, 80, 255)
    draw.ellipse([8, 8, 56, 56], fill=color)
    return img


def run_tray(port: int):
    icon = pystray.Icon(
        "tueit-transcriber",
        _make_icon(False),
        "tüit Transkriptor",
        menu=pystray.Menu(
            pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: (
                _loop and _loop.call_soon_threadsafe(
                    lambda: asyncio.ensure_future(_async_toggle())
                )
            ), default=True),
            pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")),
            pystray.MenuItem("Beenden", lambda i, it: (remove_pid(), icon.stop(), os._exit(0))),
        ),
    )

    def update_icon(s):
        icon.icon = _make_icon(s.status == Status.RECORDING)

    state.subscribe(update_icon)
    icon.run()


# ── Server ─────────────────────────────────────────────────────────────────────

def run_server(port: int):
    uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning")


# ── Entrypoint ─────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    cfg = load_config()
    port = cfg["server"]["port"]

    write_pid()
    signal.signal(signal.SIGUSR1, _sigusr1_handler)

    # Store event loop reference for signal handler
    server_thread = threading.Thread(target=run_server, args=(port,), daemon=True)
    server_thread.start()

    # Give uvicorn a moment to bind
    time.sleep(0.8)

    # Capture the event loop uvicorn created
    # (uvicorn runs its own loop in the server thread — we need a separate loop
    #  for the signal handler; SIGUSR1 triggers in the main thread)
    _loop = asyncio.new_event_loop()

    webbrowser.open(f"http://localhost:{port}")

    try:
        run_tray(port)
    finally:
        remove_pid()

Step 2: Commit

git add main.py
git commit -m "feat: main entry point — FastAPI + pystray tray + SIGUSR1 signal handler"

Task 11: install.sh + systemd User Service

Files:

  • Create: install.sh

Step 1: Create install.sh

#!/usr/bin/env bash
set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SERVICE_NAME="tueit-transcriber"
SERVICE_FILE="$HOME/.config/systemd/user/${SERVICE_NAME}.service"

echo "=== tüit Transkriptor Installer ==="

command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; }

if ! command -v ollama >/dev/null 2>&1; then
  echo "WARNING: ollama not found. Install from https://ollama.com"
  echo "  After install: ollama pull gemma3:12b"
fi

if command -v rocminfo >/dev/null 2>&1; then
  echo "ROCm detected — GPU acceleration available"
else
  echo "INFO: ROCm not found — Whisper will run on CPU (slower)"
  echo "  To enable GPU: sudo pacman -S rocm-hip-sdk"
fi

echo "Installing Python dependencies..."
pip install --user -r "$SCRIPT_DIR/requirements.txt"

mkdir -p "$HOME/.config/systemd/user"
cat > "$SERVICE_FILE" <<EOF
[Unit]
Description=tüit Transkriptor
After=graphical-session.target

[Service]
ExecStart=$(command -v python3) ${SCRIPT_DIR}/main.py
Restart=on-failure
RestartSec=5
Environment=DISPLAY=:0
Environment=DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/$(id -u)/bus

[Install]
WantedBy=default.target
EOF

systemctl --user daemon-reload
systemctl --user enable "$SERVICE_NAME"
systemctl --user start "$SERVICE_NAME"

echo ""
echo "=== Done ==="
echo "Status: systemctl --user status $SERVICE_NAME"
echo ""
echo "KDE Hotkey setup:"
echo "  System Settings → Shortcuts → Custom Shortcuts"
echo "  Command: pkill -USR1 -f main.py"
echo ""
echo "First run: ollama pull gemma3:12b"

Step 2: Commit

chmod +x install.sh
git add install.sh
git commit -m "feat: install.sh — ROCm check, systemd user service, hotkey instructions"

Task 12: Smoke Test + Gitea Remote

Step 1: Run full test suite

cd /home/templis/work/tueit_Transkriptor
pip install -r requirements.txt
pytest -v

Expected: All tests PASS

Step 2: Verify clean import

python -c "from main import app; print('OK')"

Expected: OK

Step 3: Push to Gitea

Use the tueit-gitea skill for repo creation. Namespace: thomas.kopp or a tools group.

git remote add origin git@git.tueit.de:thomas.kopp/tueit_Transkriptor.git
git push -u origin main

Step 4: Start and manually verify

python main.py
# Browser opens at http://localhost:8765
# Tray icon appears in system tray
# Test SIGUSR1:
pkill -USR1 -f main.py   # → red tray icon, "Aufnahme läuft…"
pkill -USR1 -f main.py   # → processing → idle

Summary

Task Component Tests
1 Scaffold
2 Config (TOML) tests/test_config.py
3 Output (Markdown writer) tests/test_output.py
4 LLM (Ollama client) tests/test_llm.py
5 Transcription (Whisper) tests/test_transcription.py
6 Audio (sounddevice) tests/test_audio.py
7 App state
8 API router + pipeline tests/test_api.py
9 Frontend (HTML/JS, XSS-safe)
10 Main + tray + SIGUSR1
11 install.sh + systemd
12 Smoke test + Gitea push manual