Files
tueit_Transkriptor/docs/plans/2026-04-01-implementation.md

1548 lines
41 KiB
Markdown

# tüit Transkriptor Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Build a local AI transcription desktop tool with system tray icon, audio capture, faster-whisper transcription, Ollama LLM post-processing, and a browser-based UI that saves Markdown files to a Nextcloud-synced folder.
**Architecture:** pystray tray icon + FastAPI local server (port 8765) + browser UI. Audio captured via sounddevice, transcribed via faster-whisper (ROCm), refined via Ollama (gemma3:12b). SIGUSR1 toggles recording for Wayland-compatible hotkey support.
**Tech Stack:** Python 3.11+, FastAPI, uvicorn, pystray, Pillow, sounddevice, faster-whisper, httpx, tomllib (stdlib 3.11+)
---
### Task 1: Project Scaffold
**Files:**
- Create: `requirements.txt`
- Create: `.gitignore`
- Create: `CLAUDE.md`
**Step 1: Create requirements.txt**
```
fastapi>=0.111
uvicorn[standard]>=0.29
pystray>=0.19
Pillow>=10.0
sounddevice>=0.4.6
faster-whisper>=1.0.3
httpx>=0.27
numpy>=1.26
tomli_w>=1.0
pytest>=8.0
pytest-asyncio>=0.23
```
**Step 2: Create .gitignore**
```
__pycache__/
*.pyc
*.pyo
.venv/
venv/
*.egg-info/
dist/
.env
data/
```
**Step 3: Create pytest.ini**
```ini
[pytest]
asyncio_mode = auto
```
**Step 4: Create CLAUDE.md**
```markdown
# CLAUDE.md — tüit Transkriptor
Desktop transcription tool. Python, no Docker.
## Key Commands
# Install dependencies
pip install -r requirements.txt
# Run
python main.py
# Run tests
pytest -v
# Trigger recording toggle via signal
pkill -USR1 -f main.py
## Architecture
See docs/plans/2026-04-01-desktop-transcription-design.md
```
**Step 5: Commit**
```bash
git add requirements.txt .gitignore CLAUDE.md pytest.ini
git commit -m "chore: project scaffold"
```
---
### Task 2: Config Module
**Files:**
- Create: `config.py`
- Create: `tests/__init__.py`
- Create: `tests/test_config.py`
**Step 1: Write failing tests**
```python
# tests/test_config.py
import os
import tempfile
from unittest.mock import patch
def test_config_loads_defaults():
with tempfile.TemporaryDirectory() as tmpdir:
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
import importlib, config
importlib.reload(config)
cfg = config.load()
assert cfg["ollama"]["model"] == "gemma3:12b"
assert cfg["whisper"]["model"] == "large-v3"
assert cfg["server"]["port"] == 8765
def test_config_creates_file_on_first_run():
with tempfile.TemporaryDirectory() as tmpdir:
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
import importlib, config
importlib.reload(config)
config.load()
assert os.path.exists(cfg_path)
```
**Step 2: Run tests to verify they fail**
```bash
pytest tests/test_config.py -v
```
Expected: FAIL — `ModuleNotFoundError: No module named 'config'`
**Step 3: Implement config.py**
```python
import os
import tomllib
CONFIG_PATH = os.path.expanduser("~/.config/tueit-transcriber/config.toml")
DEFAULTS = {
"ollama": {
"base_url": "http://localhost:11434",
"model": "gemma3:12b",
},
"whisper": {
"model": "large-v3",
"language": "de",
"device": "auto", # "auto" = use GPU if ROCm available, else CPU
},
"server": {
"port": 8765,
},
"output": {
"path": os.path.expanduser(
"~/cloud.shron.de/Hetzner Storagebox/work"
),
},
"pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"),
}
def load() -> dict:
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
if not os.path.exists(CONFIG_PATH):
_write_defaults()
with open(CONFIG_PATH, "rb") as f:
on_disk = tomllib.load(f)
return _deep_merge(DEFAULTS, on_disk)
def _deep_merge(base: dict, override: dict) -> dict:
result = dict(base)
for k, v in override.items():
if k in result and isinstance(result[k], dict) and isinstance(v, dict):
result[k] = _deep_merge(result[k], v)
else:
result[k] = v
return result
def _write_defaults():
try:
import tomli_w
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(DEFAULTS, f)
except ImportError:
with open(CONFIG_PATH, "w") as f:
f.write("# tüit Transkriptor config\n\n")
f.write('[ollama]\nbase_url = "http://localhost:11434"\nmodel = "gemma3:12b"\n\n')
f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\n\n')
f.write('[server]\nport = 8765\n\n')
f.write(f'[output]\npath = "{DEFAULTS["output"]["path"]}"\n')
```
**Step 4: Run tests to verify they pass**
```bash
pytest tests/test_config.py -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add config.py tests/__init__.py tests/test_config.py
git commit -m "feat: config module with TOML defaults"
```
---
### Task 3: Output Module
**Files:**
- Create: `output.py`
- Create: `tests/test_output.py`
**Step 1: Write failing tests**
```python
# tests/test_output.py
import os
import tempfile
from datetime import datetime
def test_save_transcript_creates_file():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Test Aufnahme",
content="Dies ist ein Test.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
assert os.path.exists(path)
def test_save_transcript_filename_format():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Mein erstes Diktat",
content="Inhalt.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
assert os.path.basename(path) == "2026-04-01-1432-mein-erstes-diktat.md"
def test_save_transcript_contains_frontmatter():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Test",
content="Inhalt.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
text = open(path).read()
assert "---" in text
assert "date:" in text
assert "transkript" in text
def test_save_transcript_contains_content():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Test",
content="Das ist der Inhalt.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
assert "Das ist der Inhalt." in open(path).read()
def test_slugify():
from output import slugify
assert slugify("Mein erstes Diktat") == "mein-erstes-diktat"
assert slugify("test -- foo") == "test-foo"
```
**Step 2: Run to verify failure**
```bash
pytest tests/test_output.py -v
```
Expected: FAIL
**Step 3: Implement output.py**
```python
import os
import re
import unicodedata
from datetime import datetime
def slugify(text: str) -> str:
for src, dst in [("ä","a"),("ö","o"),("ü","u"),("Ä","a"),("Ö","o"),("Ü","u"),("ß","ss")]:
text = text.replace(src, dst)
text = unicodedata.normalize("NFKD", text)
text = "".join(c for c in text if unicodedata.category(c) != "Mn")
text = text.lower()
text = re.sub(r"[^a-z0-9]+", "-", text)
return text.strip("-")
def save_transcript(
title: str,
content: str,
output_dir: str,
dt: datetime | None = None,
) -> str:
if dt is None:
dt = datetime.now()
slug = slugify(title)[:60]
filename = f"{dt.strftime('%Y-%m-%d-%H%M')}-{slug}.md"
os.makedirs(output_dir, exist_ok=True)
path = os.path.join(output_dir, filename)
with open(path, "w", encoding="utf-8") as f:
f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n")
f.write(f"# {title}\n\n")
f.write(content)
if not content.endswith("\n"):
f.write("\n")
return path
def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]:
if not os.path.exists(output_dir):
return []
files = sorted(
[f for f in os.listdir(output_dir) if f.endswith(".md")],
reverse=True,
)[:limit]
result = []
for f in files:
full = os.path.join(output_dir, f)
stat = os.stat(full)
result.append({"filename": f, "path": full, "size": stat.st_size, "mtime": stat.st_mtime})
return result
```
**Step 4: Run tests to verify they pass**
```bash
pytest tests/test_output.py -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add output.py tests/test_output.py
git commit -m "feat: output module — Markdown file writer with slugified filenames"
```
---
### Task 4: LLM Module
**Files:**
- Create: `llm.py`
- Create: `tests/test_llm.py`
**Step 1: Write failing tests**
```python
# tests/test_llm.py
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
@pytest.mark.asyncio
async def test_refine_calls_ollama():
from llm import OllamaClient
mock_response = MagicMock()
mock_response.json.return_value = {"response": "# Titel\n\nInhalt."}
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as MockClient:
instance = MockClient.return_value.__aenter__.return_value
instance.post = AsyncMock(return_value=mock_response)
client = OllamaClient(base_url="http://localhost:11434")
result = await client.refine(
raw_text="Das ist ein test.",
instructions="Mach eine Zusammenfassung.",
model="gemma3:12b",
)
assert "Inhalt" in result
instance.post.assert_called_once()
@pytest.mark.asyncio
async def test_list_models_returns_list():
from llm import OllamaClient
mock_response = MagicMock()
mock_response.json.return_value = {"models": [{"name": "gemma3:12b"}, {"name": "mistral:7b"}]}
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as MockClient:
instance = MockClient.return_value.__aenter__.return_value
instance.get = AsyncMock(return_value=mock_response)
client = OllamaClient(base_url="http://localhost:11434")
models = await client.list_models()
assert "gemma3:12b" in models
```
**Step 2: Run to verify failure**
```bash
pytest tests/test_llm.py -v
```
Expected: FAIL
**Step 3: Implement llm.py**
```python
import httpx
SYSTEM_PROMPT = """Du bist ein präziser Schreibassistent.
Du bekommst einen rohen Sprachtranskript und optionale Instruktionen des Nutzers.
Deine Aufgabe:
1. Bereinige den Text (Füllwörter, Wiederholungen, Tippfehler)
2. Strukturiere ihn mit Markdown-Überschriften wenn sinnvoll
3. Erzeuge einen passenden deutschen Titel als H1
4. Beachte Instruktionen des Nutzers wenn vorhanden
5. Antworte NUR mit dem fertigen Markdown — kein Kommentar, keine Erklärung
Format:
# Titel
Inhalt...
"""
class OllamaClient:
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
async def list_models(self) -> list[str]:
async with httpx.AsyncClient() as client:
r = await client.get(f"{self.base_url}/api/tags")
r.raise_for_status()
return [m["name"] for m in r.json().get("models", [])]
async def refine(
self,
raw_text: str,
instructions: str = "",
model: str = "gemma3:12b",
) -> str:
prompt = f"Transkript:\n{raw_text}"
if instructions.strip():
prompt += f"\n\nInstruktionen:\n{instructions.strip()}"
async with httpx.AsyncClient(timeout=120) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={"model": model, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False},
)
r.raise_for_status()
return r.json()["response"]
```
**Step 4: Run tests to verify they pass**
```bash
pytest tests/test_llm.py -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add llm.py tests/test_llm.py
git commit -m "feat: LLM module — Ollama client with transcript refinement"
```
---
### Task 5: Transcription Module
**Files:**
- Create: `transcription.py`
- Create: `tests/test_transcription.py`
**Step 1: Write failing tests**
```python
# tests/test_transcription.py
import asyncio
from unittest.mock import MagicMock
def test_transcription_engine_is_singleton():
from transcription import engine, TranscriptionEngine
assert isinstance(engine, TranscriptionEngine)
def test_transcribe_file_calls_whisper(tmp_path):
wav = tmp_path / "test.wav"
wav.write_bytes(b"\x00" * 100)
mock_model = MagicMock()
mock_segment = MagicMock()
mock_segment.text = " Hallo Welt"
mock_model.transcribe.return_value = ([mock_segment], MagicMock())
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
eng._model = mock_model
result = asyncio.run(eng.transcribe_file(str(wav), language="de"))
assert result == "Hallo Welt"
mock_model.transcribe.assert_called_once_with(str(wav), language="de")
```
**Step 2: Run to verify failure**
```bash
pytest tests/test_transcription.py -v
```
Expected: FAIL
**Step 3: Implement transcription.py**
```python
import asyncio
class TranscriptionEngine:
_model = None
def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
if self._model is None:
from faster_whisper import WhisperModel
if device == "auto":
try:
self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
except Exception:
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
else:
compute = "float16" if device in ("cuda", "rocm") else "int8"
self._model = WhisperModel(model_name, device=device, compute_type=compute)
return self._model
async def transcribe_file(
self,
audio_path: str,
language: str = "de",
model_name: str = "large-v3",
device: str = "auto",
) -> str:
loop = asyncio.get_event_loop()
model = self._get_model(model_name, device)
segments, _ = await loop.run_in_executor(
None,
lambda: model.transcribe(audio_path, language=language),
)
return "".join(seg.text for seg in segments).strip()
engine = TranscriptionEngine()
```
**Step 4: Run tests to verify they pass**
```bash
pytest tests/test_transcription.py -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add transcription.py tests/test_transcription.py
git commit -m "feat: transcription module — faster-whisper with ROCm auto-detect"
```
---
### Task 6: Audio Module
**Files:**
- Create: `audio.py`
- Create: `tests/test_audio.py`
**Step 1: Write failing tests**
```python
# tests/test_audio.py
import numpy as np
from unittest.mock import patch, MagicMock
def test_recorder_starts_and_stops():
from audio import AudioRecorder
with patch("sounddevice.InputStream") as MockStream:
mock_stream = MagicMock()
MockStream.return_value.start = MagicMock()
MockStream.return_value.stop = MagicMock()
MockStream.return_value.close = MagicMock()
recorder = AudioRecorder(sample_rate=16000)
assert not recorder.is_recording
recorder._stream = MockStream.return_value
recorder.is_recording = True
recorder.stop()
assert not recorder.is_recording
def test_recorder_save_wav(tmp_path):
import wave
from audio import AudioRecorder
recorder = AudioRecorder(sample_rate=16000)
recorder._buffer = [np.zeros(1600, dtype=np.int16)]
out = str(tmp_path / "test.wav")
recorder.save_wav(out)
with wave.open(out) as wf:
assert wf.getframerate() == 16000
assert wf.getnchannels() == 1
```
**Step 2: Run to verify failure**
```bash
pytest tests/test_audio.py -v
```
Expected: FAIL
**Step 3: Implement audio.py**
```python
import wave
import threading
import numpy as np
class AudioRecorder:
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
self._buffer: list[np.ndarray] = []
self._stream = None
self.is_recording = False
self._lock = threading.Lock()
def _callback(self, indata, frames, time, status):
if self.is_recording:
with self._lock:
self._buffer.append(indata[:, 0].copy().astype(np.int16))
def start(self):
import sounddevice as sd
self._buffer = []
self.is_recording = True
self._stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
dtype="int16",
callback=self._callback,
)
self._stream.start()
def stop(self):
self.is_recording = False
if self._stream:
self._stream.stop()
self._stream.close()
self._stream = None
def save_wav(self, path: str) -> str:
with self._lock:
data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16)
with wave.open(path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(self.sample_rate)
wf.writeframes(data.tobytes())
return path
```
**Step 4: Run tests to verify they pass**
```bash
pytest tests/test_audio.py -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add audio.py tests/test_audio.py
git commit -m "feat: audio module — sounddevice recorder with WAV export"
```
---
### Task 7: App State Module
**Files:**
- Create: `api/__init__.py`
- Create: `api/state.py`
**Step 1: Implement**
```python
# api/__init__.py
# (empty)
```
```python
# api/state.py
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
class Status(str, Enum):
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
ERROR = "error"
@dataclass
class AppState:
status: Status = Status.IDLE
instructions: str = ""
last_transcript_path: str | None = None
last_error: str | None = None
_listeners: list[Callable] = field(default_factory=list, repr=False)
def subscribe(self, callback: Callable):
self._listeners.append(callback)
async def notify(self):
for cb in self._listeners:
if asyncio.iscoroutinefunction(cb):
await cb(self)
else:
cb(self)
async def set_status(self, status: Status):
self.status = status
await self.notify()
state = AppState()
```
**Step 2: Commit**
```bash
git add api/__init__.py api/state.py
git commit -m "feat: app state module with status enum and subscriber pattern"
```
---
### Task 8: API Router + Pipeline
**Files:**
- Create: `api/router.py`
- Create: `api/pipeline.py`
- Create: `tests/test_api.py`
**Step 1: Write failing tests**
```python
# tests/test_api.py
from fastapi.testclient import TestClient
def make_app():
from fastapi import FastAPI
from api.router import router
app = FastAPI()
app.include_router(router)
return app
def test_status_returns_idle():
client = TestClient(make_app())
r = client.get("/status")
assert r.status_code == 200
assert r.json()["status"] == "idle"
def test_config_get_returns_dict():
client = TestClient(make_app())
r = client.get("/config")
assert r.status_code == 200
assert "ollama" in r.json()
def test_transcripts_returns_list():
client = TestClient(make_app())
r = client.get("/transcripts")
assert r.status_code == 200
assert isinstance(r.json(), list)
```
**Step 2: Run to verify failure**
```bash
pytest tests/test_api.py -v
```
Expected: FAIL
**Step 3: Implement api/router.py**
```python
# api/router.py
import asyncio
import os
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from api.state import state, Status
from config import load as load_config
from output import list_transcripts
router = APIRouter()
_ws_clients: list[WebSocket] = []
@router.get("/status")
async def get_status():
return {"status": state.status, "instructions": state.instructions}
@router.post("/toggle")
async def toggle_recording():
from api.pipeline import run_pipeline
if state.status == Status.RECORDING:
asyncio.create_task(run_pipeline())
return {"action": "stopped"}
if state.status == Status.IDLE:
from audio import AudioRecorder
state._recorder = AudioRecorder()
state._recorder.start()
await state.set_status(Status.RECORDING)
return {"action": "started"}
return {"action": "busy", "status": state.status}
@router.post("/instructions")
async def set_instructions(body: dict):
state.instructions = body.get("instructions", "")
return {"ok": True}
@router.get("/transcripts")
async def get_transcripts():
cfg = load_config()
return list_transcripts(cfg["output"]["path"])
@router.get("/config")
async def get_config():
return load_config()
@router.put("/config")
async def put_config(body: dict):
cfg = load_config()
cfg.update(body)
return cfg
@router.post("/open")
async def open_file(body: dict):
import subprocess
path = body.get("path", "")
if path and os.path.exists(path):
subprocess.Popen(["xdg-open", path])
return {"ok": True}
@router.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
_ws_clients.append(ws)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast(message: dict):
for ws in list(_ws_clients):
try:
await ws.send_json(message)
except Exception:
if ws in _ws_clients:
_ws_clients.remove(ws)
```
**Step 4: Implement api/pipeline.py**
```python
# api/pipeline.py
import os
import tempfile
from api.state import state, Status
from config import load as load_config
from transcription import engine as transcription_engine
from llm import OllamaClient
from output import save_transcript
from api.router import broadcast
async def run_pipeline():
cfg = load_config()
recorder = getattr(state, "_recorder", None)
if recorder is None:
return
recorder.stop()
await state.set_status(Status.PROCESSING)
await broadcast({"event": "processing"})
wav_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
wav_path = f.name
recorder.save_wav(wav_path)
raw_text = await transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
)
await broadcast({"event": "transcribed", "raw": raw_text})
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
refined = await client.refine(
raw_text=raw_text,
instructions=state.instructions,
model=cfg["ollama"]["model"],
)
await broadcast({"event": "refined", "markdown": refined})
title = "Diktat"
for line in refined.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
path = save_transcript(
title=title,
content=refined,
output_dir=cfg["output"]["path"],
)
state.last_transcript_path = path
await broadcast({"event": "saved", "path": path, "title": title})
await state.set_status(Status.IDLE)
except Exception as e:
state.last_error = str(e)
await state.set_status(Status.ERROR)
await broadcast({"event": "error", "message": str(e)})
finally:
if wav_path:
try:
os.unlink(wav_path)
except OSError:
pass
```
**Step 5: Run tests to verify they pass**
```bash
pytest tests/test_api.py -v
```
Expected: PASS
**Step 6: Commit**
```bash
git add api/router.py api/pipeline.py tests/test_api.py
git commit -m "feat: API router + pipeline — toggle, status, transcripts, WebSocket"
```
---
### Task 9: Frontend
**Files:**
- Create: `frontend/index.html`
- Create: `frontend/app.js`
**Step 1: Create frontend/index.html**
```html
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>tüit Transkriptor</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link href="https://fonts.googleapis.com/css2?family=Overpass:wght@300;400;600;700&display=swap" rel="stylesheet">
<style>
:root {
--red: #DA251C;
--yellow: #FFD802;
--bg: #111;
--surface: #1a1a1a;
--surface2: #232323;
--text: #e8e8e8;
--muted: #888;
--border: #2e2e2e;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: 'Overpass', system-ui, sans-serif;
background: var(--bg);
color: var(--text);
min-height: 100vh;
display: flex;
flex-direction: column;
}
header {
display: flex;
align-items: center;
gap: 12px;
padding: 16px 24px;
border-bottom: 1px solid var(--border);
}
.logo-dot { width: 12px; height: 12px; background: var(--red); border-radius: 50%; }
header h1 { font-size: 1.1rem; font-weight: 600; letter-spacing: 0.04em; }
header h1 span { color: var(--red); }
.status-badge {
margin-left: auto;
font-size: 0.75rem;
padding: 4px 10px;
border-radius: 20px;
background: var(--surface2);
color: var(--muted);
text-transform: uppercase;
letter-spacing: 0.08em;
}
.status-badge.recording { background: var(--red); color: #fff; }
.status-badge.processing { background: var(--yellow); color: #111; }
main {
flex: 1;
display: flex;
flex-direction: column;
gap: 20px;
padding: 24px;
max-width: 800px;
width: 100%;
margin: 0 auto;
}
.record-section { display: flex; flex-direction: column; align-items: center; gap: 16px; }
#record-btn {
width: 96px; height: 96px; border-radius: 50%;
background: var(--surface2); border: 3px solid var(--border);
cursor: pointer; transition: all 0.15s ease;
display: flex; align-items: center; justify-content: center;
outline: none;
}
#record-btn:hover { border-color: var(--red); }
#record-btn.recording { background: var(--red); border-color: var(--red); animation: pulse 1.4s infinite; }
#record-btn.processing { background: var(--yellow); border-color: var(--yellow); cursor: default; }
@keyframes pulse {
0%,100% { box-shadow: 0 0 0 0 rgba(218,37,28,0.4); }
50% { box-shadow: 0 0 0 16px rgba(218,37,28,0); }
}
.mic-icon { width: 36px; height: 36px; fill: var(--text); }
#record-btn.recording .mic-icon { fill: #fff; }
#record-btn.processing .mic-icon { fill: #111; }
#status-text { font-size: 0.85rem; color: var(--muted); }
.instructions-section { display: flex; flex-direction: column; gap: 8px; }
label { font-size: 0.8rem; color: var(--muted); text-transform: uppercase; letter-spacing: 0.06em; }
textarea {
background: var(--surface); border: 1px solid var(--border);
color: var(--text); border-radius: 8px; padding: 12px;
font-family: inherit; font-size: 0.9rem; resize: vertical;
min-height: 80px; outline: none; transition: border-color 0.15s;
}
textarea:focus { border-color: var(--yellow); }
textarea::placeholder { color: var(--muted); }
.preview-section { display: flex; flex-direction: column; gap: 8px; }
#preview {
background: var(--surface); border: 1px solid var(--border);
border-radius: 8px; padding: 16px;
font-size: 0.85rem; line-height: 1.6; color: var(--muted);
min-height: 60px; white-space: pre-wrap; word-break: break-word;
}
#preview.has-content { color: var(--text); }
.transcripts-section { display: flex; flex-direction: column; gap: 8px; }
#transcript-list { display: flex; flex-direction: column; gap: 6px; }
.transcript-item {
background: var(--surface); border: 1px solid var(--border);
border-radius: 6px; padding: 10px 14px;
display: flex; align-items: center; justify-content: space-between;
font-size: 0.82rem; cursor: pointer; transition: border-color 0.1s;
}
.transcript-item:hover { border-color: var(--red); }
</style>
</head>
<body>
<header>
<div class="logo-dot"></div>
<h1>tüit <span>Transkriptor</span></h1>
<span class="status-badge" id="header-status">Bereit</span>
</header>
<main>
<section class="record-section">
<button id="record-btn" title="Aufnahme starten / stoppen">
<svg class="mic-icon" viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg">
<path d="M12 1a4 4 0 0 1 4 4v6a4 4 0 0 1-8 0V5a4 4 0 0 1 4-4zm0 2a2 2 0 0 0-2 2v6a2 2 0 0 0 4 0V5a2 2 0 0 0-2-2zM6.5 10.5A5.5 5.5 0 0 0 12 16a5.5 5.5 0 0 0 5.5-5.5h2A7.5 7.5 0 0 1 13 17.93V21h2v2H9v-2h2v-3.07A7.5 7.5 0 0 1 4.5 10.5h2z"/>
</svg>
</button>
<span id="status-text">Klicken zum Starten</span>
</section>
<section class="instructions-section">
<label for="instructions">Instruktionen für den Sekretär</label>
<textarea
id="instructions"
placeholder="z.B. &quot;Heb die wichtigsten Punkte hervor&quot; · &quot;Erstelle ein Ticket&quot; · &quot;Mach ein Angebot daraus&quot;"
></textarea>
</section>
<section class="preview-section">
<label>Vorschau</label>
<div id="preview">Noch keine Aufnahme verarbeitet.</div>
</section>
<section class="transcripts-section">
<label>Letzte Transkripte</label>
<div id="transcript-list"></div>
</section>
</main>
<script src="/app.js"></script>
</body>
</html>
```
**Step 2: Create frontend/app.js**
Note: All DOM manipulation uses `textContent` and `createElement` — no `innerHTML` with untrusted data to prevent XSS.
```javascript
const btn = document.getElementById('record-btn');
const statusText = document.getElementById('status-text');
const headerStatus = document.getElementById('header-status');
const preview = document.getElementById('preview');
const instructionsEl = document.getElementById('instructions');
const transcriptList = document.getElementById('transcript-list');
const STATUS_LABELS = {
idle: 'Bereit',
recording: 'Aufnahme läuft\u2026',
processing: 'Wird verarbeitet\u2026',
error: 'Fehler',
};
instructionsEl.addEventListener('input', async () => {
await fetch('/instructions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ instructions: instructionsEl.value }),
});
});
function setStatus(status) {
btn.className = status;
headerStatus.className = `status-badge ${status}`;
const label = STATUS_LABELS[status] || status;
statusText.textContent = label;
headerStatus.textContent = label;
btn.disabled = status === 'processing';
}
btn.addEventListener('click', () => fetch('/toggle', { method: 'POST' }));
function connectWs() {
const ws = new WebSocket(`ws://${location.host}/ws`);
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.event === 'processing') setStatus('processing');
if (msg.event === 'transcribed' || msg.event === 'refined') {
const text = msg.raw || msg.markdown || '';
preview.textContent = text;
preview.classList.add('has-content');
}
if (msg.event === 'saved') {
setStatus('idle');
loadTranscripts();
}
if (msg.event === 'error') {
setStatus('idle');
preview.textContent = `Fehler: ${msg.message}`;
}
};
ws.onclose = () => setTimeout(connectWs, 2000);
}
async function loadTranscripts() {
const r = await fetch('/transcripts');
const items = await r.json();
// Build DOM nodes — no innerHTML with untrusted data
transcriptList.replaceChildren(
...items.map((t) => {
const div = document.createElement('div');
div.className = 'transcript-item';
const name = document.createElement('span');
name.textContent = t.filename.replace('.md', '');
const meta = document.createElement('span');
meta.className = 'meta';
meta.textContent = `${Math.round(t.size / 1024 * 10) / 10} KB`;
div.append(name, meta);
div.addEventListener('click', () => {
fetch('/open', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ path: t.path }),
});
});
return div;
})
);
}
(async () => {
const r = await fetch('/status');
const data = await r.json();
setStatus(data.status);
instructionsEl.value = data.instructions || '';
connectWs();
loadTranscripts();
})();
```
**Step 3: Commit**
```bash
git add frontend/
git commit -m "feat: browser UI — tüit CI dark theme, XSS-safe DOM rendering"
```
---
### Task 10: Main Entry Point + Tray + Signal Handler
**Files:**
- Create: `main.py`
**Step 1: Implement main.py**
```python
import asyncio
import os
import signal
import threading
import time
import webbrowser
from pathlib import Path
import uvicorn
from fastapi import FastAPI
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
import pystray
from PIL import Image, ImageDraw
from api.router import router
from api.state import state, Status
from config import load as load_config
# ── FastAPI ────────────────────────────────────────────────────────────────────
app = FastAPI(title="tüit Transkriptor")
app.include_router(router)
FRONTEND_DIR = Path(__file__).parent / "frontend"
@app.get("/")
async def index():
return FileResponse(str(FRONTEND_DIR / "index.html"))
@app.get("/app.js")
async def appjs():
return FileResponse(str(FRONTEND_DIR / "app.js"))
# ── PID file ───────────────────────────────────────────────────────────────────
def write_pid():
cfg = load_config()
pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid"))
os.makedirs(os.path.dirname(pid_path), exist_ok=True)
Path(pid_path).write_text(str(os.getpid()))
def remove_pid():
cfg = load_config()
pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid"))
try:
os.unlink(pid_path)
except FileNotFoundError:
pass
# ── SIGUSR1 → toggle ──────────────────────────────────────────────────────────
_loop: asyncio.AbstractEventLoop | None = None
def _sigusr1_handler(signum, frame):
if _loop:
_loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_async_toggle()))
async def _async_toggle():
from api.router import toggle_recording
await toggle_recording()
# ── Tray ───────────────────────────────────────────────────────────────────────
def _make_icon(recording: bool = False) -> Image.Image:
img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
color = (218, 37, 28, 255) if recording else (80, 80, 80, 255)
draw.ellipse([8, 8, 56, 56], fill=color)
return img
def run_tray(port: int):
icon = pystray.Icon(
"tueit-transcriber",
_make_icon(False),
"tüit Transkriptor",
menu=pystray.Menu(
pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: (
_loop and _loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_async_toggle())
)
), default=True),
pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")),
pystray.MenuItem("Beenden", lambda i, it: (remove_pid(), icon.stop(), os._exit(0))),
),
)
def update_icon(s):
icon.icon = _make_icon(s.status == Status.RECORDING)
state.subscribe(update_icon)
icon.run()
# ── Server ─────────────────────────────────────────────────────────────────────
def run_server(port: int):
uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning")
# ── Entrypoint ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
cfg = load_config()
port = cfg["server"]["port"]
write_pid()
signal.signal(signal.SIGUSR1, _sigusr1_handler)
# Store event loop reference for signal handler
server_thread = threading.Thread(target=run_server, args=(port,), daemon=True)
server_thread.start()
# Give uvicorn a moment to bind
time.sleep(0.8)
# Capture the event loop uvicorn created
# (uvicorn runs its own loop in the server thread — we need a separate loop
# for the signal handler; SIGUSR1 triggers in the main thread)
_loop = asyncio.new_event_loop()
webbrowser.open(f"http://localhost:{port}")
try:
run_tray(port)
finally:
remove_pid()
```
**Step 2: Commit**
```bash
git add main.py
git commit -m "feat: main entry point — FastAPI + pystray tray + SIGUSR1 signal handler"
```
---
### Task 11: install.sh + systemd User Service
**Files:**
- Create: `install.sh`
**Step 1: Create install.sh**
```bash
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SERVICE_NAME="tueit-transcriber"
SERVICE_FILE="$HOME/.config/systemd/user/${SERVICE_NAME}.service"
echo "=== tüit Transkriptor Installer ==="
command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; }
if ! command -v ollama >/dev/null 2>&1; then
echo "WARNING: ollama not found. Install from https://ollama.com"
echo " After install: ollama pull gemma3:12b"
fi
if command -v rocminfo >/dev/null 2>&1; then
echo "ROCm detected — GPU acceleration available"
else
echo "INFO: ROCm not found — Whisper will run on CPU (slower)"
echo " To enable GPU: sudo pacman -S rocm-hip-sdk"
fi
echo "Installing Python dependencies..."
pip install --user -r "$SCRIPT_DIR/requirements.txt"
mkdir -p "$HOME/.config/systemd/user"
cat > "$SERVICE_FILE" <<EOF
[Unit]
Description=tüit Transkriptor
After=graphical-session.target
[Service]
ExecStart=$(command -v python3) ${SCRIPT_DIR}/main.py
Restart=on-failure
RestartSec=5
Environment=DISPLAY=:0
Environment=DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/$(id -u)/bus
[Install]
WantedBy=default.target
EOF
systemctl --user daemon-reload
systemctl --user enable "$SERVICE_NAME"
systemctl --user start "$SERVICE_NAME"
echo ""
echo "=== Done ==="
echo "Status: systemctl --user status $SERVICE_NAME"
echo ""
echo "KDE Hotkey setup:"
echo " System Settings → Shortcuts → Custom Shortcuts"
echo " Command: pkill -USR1 -f main.py"
echo ""
echo "First run: ollama pull gemma3:12b"
```
**Step 2: Commit**
```bash
chmod +x install.sh
git add install.sh
git commit -m "feat: install.sh — ROCm check, systemd user service, hotkey instructions"
```
---
### Task 12: Smoke Test + Gitea Remote
**Step 1: Run full test suite**
```bash
cd /home/templis/work/tueit_Transkriptor
pip install -r requirements.txt
pytest -v
```
Expected: All tests PASS
**Step 2: Verify clean import**
```bash
python -c "from main import app; print('OK')"
```
Expected: `OK`
**Step 3: Push to Gitea**
Use the `tueit-gitea` skill for repo creation. Namespace: `thomas.kopp` or a `tools` group.
```bash
git remote add origin git@git.tueit.de:thomas.kopp/tueit_Transkriptor.git
git push -u origin main
```
**Step 4: Start and manually verify**
```bash
python main.py
# Browser opens at http://localhost:8765
# Tray icon appears in system tray
# Test SIGUSR1:
pkill -USR1 -f main.py # → red tray icon, "Aufnahme läuft…"
pkill -USR1 -f main.py # → processing → idle
```
---
## Summary
| Task | Component | Tests |
|------|-----------|-------|
| 1 | Scaffold | — |
| 2 | Config (TOML) | `tests/test_config.py` |
| 3 | Output (Markdown writer) | `tests/test_output.py` |
| 4 | LLM (Ollama client) | `tests/test_llm.py` |
| 5 | Transcription (Whisper) | `tests/test_transcription.py` |
| 6 | Audio (sounddevice) | `tests/test_audio.py` |
| 7 | App state | — |
| 8 | API router + pipeline | `tests/test_api.py` |
| 9 | Frontend (HTML/JS, XSS-safe) | — |
| 10 | Main + tray + SIGUSR1 | — |
| 11 | install.sh + systemd | — |
| 12 | Smoke test + Gitea push | manual |