# tüit Transkriptor Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Build a local AI transcription desktop tool with system tray icon, audio capture, faster-whisper transcription, Ollama LLM post-processing, and a browser-based UI that saves Markdown files to a Nextcloud-synced folder. **Architecture:** pystray tray icon + FastAPI local server (port 8765) + browser UI. Audio captured via sounddevice, transcribed via faster-whisper (ROCm), refined via Ollama (gemma3:12b). SIGUSR1 toggles recording for Wayland-compatible hotkey support. **Tech Stack:** Python 3.11+, FastAPI, uvicorn, pystray, Pillow, sounddevice, faster-whisper, httpx, tomllib (stdlib 3.11+) --- ### Task 1: Project Scaffold **Files:** - Create: `requirements.txt` - Create: `.gitignore` - Create: `CLAUDE.md` **Step 1: Create requirements.txt** ``` fastapi>=0.111 uvicorn[standard]>=0.29 pystray>=0.19 Pillow>=10.0 sounddevice>=0.4.6 faster-whisper>=1.0.3 httpx>=0.27 numpy>=1.26 tomli_w>=1.0 pytest>=8.0 pytest-asyncio>=0.23 ``` **Step 2: Create .gitignore** ``` __pycache__/ *.pyc *.pyo .venv/ venv/ *.egg-info/ dist/ .env data/ ``` **Step 3: Create pytest.ini** ```ini [pytest] asyncio_mode = auto ``` **Step 4: Create CLAUDE.md** ```markdown # CLAUDE.md — tüit Transkriptor Desktop transcription tool. Python, no Docker. ## Key Commands # Install dependencies pip install -r requirements.txt # Run python main.py # Run tests pytest -v # Trigger recording toggle via signal pkill -USR1 -f main.py ## Architecture See docs/plans/2026-04-01-desktop-transcription-design.md ``` **Step 5: Commit** ```bash git add requirements.txt .gitignore CLAUDE.md pytest.ini git commit -m "chore: project scaffold" ``` --- ### Task 2: Config Module **Files:** - Create: `config.py` - Create: `tests/__init__.py` - Create: `tests/test_config.py` **Step 1: Write failing tests** ```python # tests/test_config.py import os import tempfile from unittest.mock import patch def test_config_loads_defaults(): with tempfile.TemporaryDirectory() as tmpdir: cfg_path = os.path.join(tmpdir, "config.toml") with patch("config.CONFIG_PATH", cfg_path): import importlib, config importlib.reload(config) cfg = config.load() assert cfg["ollama"]["model"] == "gemma3:12b" assert cfg["whisper"]["model"] == "large-v3" assert cfg["server"]["port"] == 8765 def test_config_creates_file_on_first_run(): with tempfile.TemporaryDirectory() as tmpdir: cfg_path = os.path.join(tmpdir, "config.toml") with patch("config.CONFIG_PATH", cfg_path): import importlib, config importlib.reload(config) config.load() assert os.path.exists(cfg_path) ``` **Step 2: Run tests to verify they fail** ```bash pytest tests/test_config.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'config'` **Step 3: Implement config.py** ```python import os import tomllib CONFIG_PATH = os.path.expanduser("~/.config/tueit-transcriber/config.toml") DEFAULTS = { "ollama": { "base_url": "http://localhost:11434", "model": "gemma3:12b", }, "whisper": { "model": "large-v3", "language": "de", "device": "auto", # "auto" = use GPU if ROCm available, else CPU }, "server": { "port": 8765, }, "output": { "path": os.path.expanduser( "~/cloud.shron.de/Hetzner Storagebox/work" ), }, "pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"), } def load() -> dict: os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True) if not os.path.exists(CONFIG_PATH): _write_defaults() with open(CONFIG_PATH, "rb") as f: on_disk = tomllib.load(f) return _deep_merge(DEFAULTS, on_disk) def _deep_merge(base: dict, override: dict) -> dict: result = dict(base) for k, v in override.items(): if k in result and isinstance(result[k], dict) and isinstance(v, dict): result[k] = _deep_merge(result[k], v) else: result[k] = v return result def _write_defaults(): try: import tomli_w with open(CONFIG_PATH, "wb") as f: tomli_w.dump(DEFAULTS, f) except ImportError: with open(CONFIG_PATH, "w") as f: f.write("# tüit Transkriptor config\n\n") f.write('[ollama]\nbase_url = "http://localhost:11434"\nmodel = "gemma3:12b"\n\n') f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\n\n') f.write('[server]\nport = 8765\n\n') f.write(f'[output]\npath = "{DEFAULTS["output"]["path"]}"\n') ``` **Step 4: Run tests to verify they pass** ```bash pytest tests/test_config.py -v ``` Expected: PASS **Step 5: Commit** ```bash git add config.py tests/__init__.py tests/test_config.py git commit -m "feat: config module with TOML defaults" ``` --- ### Task 3: Output Module **Files:** - Create: `output.py` - Create: `tests/test_output.py` **Step 1: Write failing tests** ```python # tests/test_output.py import os import tempfile from datetime import datetime def test_save_transcript_creates_file(): with tempfile.TemporaryDirectory() as tmpdir: from output import save_transcript path = save_transcript( title="Test Aufnahme", content="Dies ist ein Test.", output_dir=tmpdir, dt=datetime(2026, 4, 1, 14, 32, 0), ) assert os.path.exists(path) def test_save_transcript_filename_format(): with tempfile.TemporaryDirectory() as tmpdir: from output import save_transcript path = save_transcript( title="Mein erstes Diktat", content="Inhalt.", output_dir=tmpdir, dt=datetime(2026, 4, 1, 14, 32, 0), ) assert os.path.basename(path) == "2026-04-01-1432-mein-erstes-diktat.md" def test_save_transcript_contains_frontmatter(): with tempfile.TemporaryDirectory() as tmpdir: from output import save_transcript path = save_transcript( title="Test", content="Inhalt.", output_dir=tmpdir, dt=datetime(2026, 4, 1, 14, 32, 0), ) text = open(path).read() assert "---" in text assert "date:" in text assert "transkript" in text def test_save_transcript_contains_content(): with tempfile.TemporaryDirectory() as tmpdir: from output import save_transcript path = save_transcript( title="Test", content="Das ist der Inhalt.", output_dir=tmpdir, dt=datetime(2026, 4, 1, 14, 32, 0), ) assert "Das ist der Inhalt." in open(path).read() def test_slugify(): from output import slugify assert slugify("Mein erstes Diktat") == "mein-erstes-diktat" assert slugify("test -- foo") == "test-foo" ``` **Step 2: Run to verify failure** ```bash pytest tests/test_output.py -v ``` Expected: FAIL **Step 3: Implement output.py** ```python import os import re import unicodedata from datetime import datetime def slugify(text: str) -> str: for src, dst in [("ä","a"),("ö","o"),("ü","u"),("Ä","a"),("Ö","o"),("Ü","u"),("ß","ss")]: text = text.replace(src, dst) text = unicodedata.normalize("NFKD", text) text = "".join(c for c in text if unicodedata.category(c) != "Mn") text = text.lower() text = re.sub(r"[^a-z0-9]+", "-", text) return text.strip("-") def save_transcript( title: str, content: str, output_dir: str, dt: datetime | None = None, ) -> str: if dt is None: dt = datetime.now() slug = slugify(title)[:60] filename = f"{dt.strftime('%Y-%m-%d-%H%M')}-{slug}.md" os.makedirs(output_dir, exist_ok=True) path = os.path.join(output_dir, filename) with open(path, "w", encoding="utf-8") as f: f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n") f.write(f"# {title}\n\n") f.write(content) if not content.endswith("\n"): f.write("\n") return path def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]: if not os.path.exists(output_dir): return [] files = sorted( [f for f in os.listdir(output_dir) if f.endswith(".md")], reverse=True, )[:limit] result = [] for f in files: full = os.path.join(output_dir, f) stat = os.stat(full) result.append({"filename": f, "path": full, "size": stat.st_size, "mtime": stat.st_mtime}) return result ``` **Step 4: Run tests to verify they pass** ```bash pytest tests/test_output.py -v ``` Expected: PASS **Step 5: Commit** ```bash git add output.py tests/test_output.py git commit -m "feat: output module — Markdown file writer with slugified filenames" ``` --- ### Task 4: LLM Module **Files:** - Create: `llm.py` - Create: `tests/test_llm.py` **Step 1: Write failing tests** ```python # tests/test_llm.py import pytest from unittest.mock import AsyncMock, patch, MagicMock @pytest.mark.asyncio async def test_refine_calls_ollama(): from llm import OllamaClient mock_response = MagicMock() mock_response.json.return_value = {"response": "# Titel\n\nInhalt."} mock_response.raise_for_status = MagicMock() with patch("httpx.AsyncClient") as MockClient: instance = MockClient.return_value.__aenter__.return_value instance.post = AsyncMock(return_value=mock_response) client = OllamaClient(base_url="http://localhost:11434") result = await client.refine( raw_text="Das ist ein test.", instructions="Mach eine Zusammenfassung.", model="gemma3:12b", ) assert "Inhalt" in result instance.post.assert_called_once() @pytest.mark.asyncio async def test_list_models_returns_list(): from llm import OllamaClient mock_response = MagicMock() mock_response.json.return_value = {"models": [{"name": "gemma3:12b"}, {"name": "mistral:7b"}]} mock_response.raise_for_status = MagicMock() with patch("httpx.AsyncClient") as MockClient: instance = MockClient.return_value.__aenter__.return_value instance.get = AsyncMock(return_value=mock_response) client = OllamaClient(base_url="http://localhost:11434") models = await client.list_models() assert "gemma3:12b" in models ``` **Step 2: Run to verify failure** ```bash pytest tests/test_llm.py -v ``` Expected: FAIL **Step 3: Implement llm.py** ```python import httpx SYSTEM_PROMPT = """Du bist ein präziser Schreibassistent. Du bekommst einen rohen Sprachtranskript und optionale Instruktionen des Nutzers. Deine Aufgabe: 1. Bereinige den Text (Füllwörter, Wiederholungen, Tippfehler) 2. Strukturiere ihn mit Markdown-Überschriften wenn sinnvoll 3. Erzeuge einen passenden deutschen Titel als H1 4. Beachte Instruktionen des Nutzers wenn vorhanden 5. Antworte NUR mit dem fertigen Markdown — kein Kommentar, keine Erklärung Format: # Titel Inhalt... """ class OllamaClient: def __init__(self, base_url: str = "http://localhost:11434"): self.base_url = base_url async def list_models(self) -> list[str]: async with httpx.AsyncClient() as client: r = await client.get(f"{self.base_url}/api/tags") r.raise_for_status() return [m["name"] for m in r.json().get("models", [])] async def refine( self, raw_text: str, instructions: str = "", model: str = "gemma3:12b", ) -> str: prompt = f"Transkript:\n{raw_text}" if instructions.strip(): prompt += f"\n\nInstruktionen:\n{instructions.strip()}" async with httpx.AsyncClient(timeout=120) as client: r = await client.post( f"{self.base_url}/api/generate", json={"model": model, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False}, ) r.raise_for_status() return r.json()["response"] ``` **Step 4: Run tests to verify they pass** ```bash pytest tests/test_llm.py -v ``` Expected: PASS **Step 5: Commit** ```bash git add llm.py tests/test_llm.py git commit -m "feat: LLM module — Ollama client with transcript refinement" ``` --- ### Task 5: Transcription Module **Files:** - Create: `transcription.py` - Create: `tests/test_transcription.py` **Step 1: Write failing tests** ```python # tests/test_transcription.py import asyncio from unittest.mock import MagicMock def test_transcription_engine_is_singleton(): from transcription import engine, TranscriptionEngine assert isinstance(engine, TranscriptionEngine) def test_transcribe_file_calls_whisper(tmp_path): wav = tmp_path / "test.wav" wav.write_bytes(b"\x00" * 100) mock_model = MagicMock() mock_segment = MagicMock() mock_segment.text = " Hallo Welt" mock_model.transcribe.return_value = ([mock_segment], MagicMock()) from transcription import TranscriptionEngine eng = TranscriptionEngine() eng._model = mock_model result = asyncio.run(eng.transcribe_file(str(wav), language="de")) assert result == "Hallo Welt" mock_model.transcribe.assert_called_once_with(str(wav), language="de") ``` **Step 2: Run to verify failure** ```bash pytest tests/test_transcription.py -v ``` Expected: FAIL **Step 3: Implement transcription.py** ```python import asyncio class TranscriptionEngine: _model = None def _get_model(self, model_name: str = "large-v3", device: str = "auto"): if self._model is None: from faster_whisper import WhisperModel if device == "auto": try: self._model = WhisperModel(model_name, device="cuda", compute_type="float16") except Exception: self._model = WhisperModel(model_name, device="cpu", compute_type="int8") else: compute = "float16" if device in ("cuda", "rocm") else "int8" self._model = WhisperModel(model_name, device=device, compute_type=compute) return self._model async def transcribe_file( self, audio_path: str, language: str = "de", model_name: str = "large-v3", device: str = "auto", ) -> str: loop = asyncio.get_event_loop() model = self._get_model(model_name, device) segments, _ = await loop.run_in_executor( None, lambda: model.transcribe(audio_path, language=language), ) return "".join(seg.text for seg in segments).strip() engine = TranscriptionEngine() ``` **Step 4: Run tests to verify they pass** ```bash pytest tests/test_transcription.py -v ``` Expected: PASS **Step 5: Commit** ```bash git add transcription.py tests/test_transcription.py git commit -m "feat: transcription module — faster-whisper with ROCm auto-detect" ``` --- ### Task 6: Audio Module **Files:** - Create: `audio.py` - Create: `tests/test_audio.py` **Step 1: Write failing tests** ```python # tests/test_audio.py import numpy as np from unittest.mock import patch, MagicMock def test_recorder_starts_and_stops(): from audio import AudioRecorder with patch("sounddevice.InputStream") as MockStream: mock_stream = MagicMock() MockStream.return_value.start = MagicMock() MockStream.return_value.stop = MagicMock() MockStream.return_value.close = MagicMock() recorder = AudioRecorder(sample_rate=16000) assert not recorder.is_recording recorder._stream = MockStream.return_value recorder.is_recording = True recorder.stop() assert not recorder.is_recording def test_recorder_save_wav(tmp_path): import wave from audio import AudioRecorder recorder = AudioRecorder(sample_rate=16000) recorder._buffer = [np.zeros(1600, dtype=np.int16)] out = str(tmp_path / "test.wav") recorder.save_wav(out) with wave.open(out) as wf: assert wf.getframerate() == 16000 assert wf.getnchannels() == 1 ``` **Step 2: Run to verify failure** ```bash pytest tests/test_audio.py -v ``` Expected: FAIL **Step 3: Implement audio.py** ```python import wave import threading import numpy as np class AudioRecorder: def __init__(self, sample_rate: int = 16000): self.sample_rate = sample_rate self._buffer: list[np.ndarray] = [] self._stream = None self.is_recording = False self._lock = threading.Lock() def _callback(self, indata, frames, time, status): if self.is_recording: with self._lock: self._buffer.append(indata[:, 0].copy().astype(np.int16)) def start(self): import sounddevice as sd self._buffer = [] self.is_recording = True self._stream = sd.InputStream( samplerate=self.sample_rate, channels=1, dtype="int16", callback=self._callback, ) self._stream.start() def stop(self): self.is_recording = False if self._stream: self._stream.stop() self._stream.close() self._stream = None def save_wav(self, path: str) -> str: with self._lock: data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16) with wave.open(path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(self.sample_rate) wf.writeframes(data.tobytes()) return path ``` **Step 4: Run tests to verify they pass** ```bash pytest tests/test_audio.py -v ``` Expected: PASS **Step 5: Commit** ```bash git add audio.py tests/test_audio.py git commit -m "feat: audio module — sounddevice recorder with WAV export" ``` --- ### Task 7: App State Module **Files:** - Create: `api/__init__.py` - Create: `api/state.py` **Step 1: Implement** ```python # api/__init__.py # (empty) ``` ```python # api/state.py import asyncio from dataclasses import dataclass, field from enum import Enum from typing import Callable class Status(str, Enum): IDLE = "idle" RECORDING = "recording" PROCESSING = "processing" ERROR = "error" @dataclass class AppState: status: Status = Status.IDLE instructions: str = "" last_transcript_path: str | None = None last_error: str | None = None _listeners: list[Callable] = field(default_factory=list, repr=False) def subscribe(self, callback: Callable): self._listeners.append(callback) async def notify(self): for cb in self._listeners: if asyncio.iscoroutinefunction(cb): await cb(self) else: cb(self) async def set_status(self, status: Status): self.status = status await self.notify() state = AppState() ``` **Step 2: Commit** ```bash git add api/__init__.py api/state.py git commit -m "feat: app state module with status enum and subscriber pattern" ``` --- ### Task 8: API Router + Pipeline **Files:** - Create: `api/router.py` - Create: `api/pipeline.py` - Create: `tests/test_api.py` **Step 1: Write failing tests** ```python # tests/test_api.py from fastapi.testclient import TestClient def make_app(): from fastapi import FastAPI from api.router import router app = FastAPI() app.include_router(router) return app def test_status_returns_idle(): client = TestClient(make_app()) r = client.get("/status") assert r.status_code == 200 assert r.json()["status"] == "idle" def test_config_get_returns_dict(): client = TestClient(make_app()) r = client.get("/config") assert r.status_code == 200 assert "ollama" in r.json() def test_transcripts_returns_list(): client = TestClient(make_app()) r = client.get("/transcripts") assert r.status_code == 200 assert isinstance(r.json(), list) ``` **Step 2: Run to verify failure** ```bash pytest tests/test_api.py -v ``` Expected: FAIL **Step 3: Implement api/router.py** ```python # api/router.py import asyncio import os from fastapi import APIRouter, WebSocket, WebSocketDisconnect from api.state import state, Status from config import load as load_config from output import list_transcripts router = APIRouter() _ws_clients: list[WebSocket] = [] @router.get("/status") async def get_status(): return {"status": state.status, "instructions": state.instructions} @router.post("/toggle") async def toggle_recording(): from api.pipeline import run_pipeline if state.status == Status.RECORDING: asyncio.create_task(run_pipeline()) return {"action": "stopped"} if state.status == Status.IDLE: from audio import AudioRecorder state._recorder = AudioRecorder() state._recorder.start() await state.set_status(Status.RECORDING) return {"action": "started"} return {"action": "busy", "status": state.status} @router.post("/instructions") async def set_instructions(body: dict): state.instructions = body.get("instructions", "") return {"ok": True} @router.get("/transcripts") async def get_transcripts(): cfg = load_config() return list_transcripts(cfg["output"]["path"]) @router.get("/config") async def get_config(): return load_config() @router.put("/config") async def put_config(body: dict): cfg = load_config() cfg.update(body) return cfg @router.post("/open") async def open_file(body: dict): import subprocess path = body.get("path", "") if path and os.path.exists(path): subprocess.Popen(["xdg-open", path]) return {"ok": True} @router.websocket("/ws") async def websocket_endpoint(ws: WebSocket): await ws.accept() _ws_clients.append(ws) try: while True: await ws.receive_text() except WebSocketDisconnect: if ws in _ws_clients: _ws_clients.remove(ws) async def broadcast(message: dict): for ws in list(_ws_clients): try: await ws.send_json(message) except Exception: if ws in _ws_clients: _ws_clients.remove(ws) ``` **Step 4: Implement api/pipeline.py** ```python # api/pipeline.py import os import tempfile from api.state import state, Status from config import load as load_config from transcription import engine as transcription_engine from llm import OllamaClient from output import save_transcript from api.router import broadcast async def run_pipeline(): cfg = load_config() recorder = getattr(state, "_recorder", None) if recorder is None: return recorder.stop() await state.set_status(Status.PROCESSING) await broadcast({"event": "processing"}) wav_path = None try: with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: wav_path = f.name recorder.save_wav(wav_path) raw_text = await transcription_engine.transcribe_file( wav_path, language=cfg["whisper"]["language"], model_name=cfg["whisper"]["model"], device=cfg["whisper"]["device"], ) await broadcast({"event": "transcribed", "raw": raw_text}) client = OllamaClient(base_url=cfg["ollama"]["base_url"]) refined = await client.refine( raw_text=raw_text, instructions=state.instructions, model=cfg["ollama"]["model"], ) await broadcast({"event": "refined", "markdown": refined}) title = "Diktat" for line in refined.splitlines(): if line.startswith("# "): title = line[2:].strip() break path = save_transcript( title=title, content=refined, output_dir=cfg["output"]["path"], ) state.last_transcript_path = path await broadcast({"event": "saved", "path": path, "title": title}) await state.set_status(Status.IDLE) except Exception as e: state.last_error = str(e) await state.set_status(Status.ERROR) await broadcast({"event": "error", "message": str(e)}) finally: if wav_path: try: os.unlink(wav_path) except OSError: pass ``` **Step 5: Run tests to verify they pass** ```bash pytest tests/test_api.py -v ``` Expected: PASS **Step 6: Commit** ```bash git add api/router.py api/pipeline.py tests/test_api.py git commit -m "feat: API router + pipeline — toggle, status, transcripts, WebSocket" ``` --- ### Task 9: Frontend **Files:** - Create: `frontend/index.html` - Create: `frontend/app.js` **Step 1: Create frontend/index.html** ```html tüit Transkriptor

tüit Transkriptor

Bereit
Klicken zum Starten
Noch keine Aufnahme verarbeitet.
``` **Step 2: Create frontend/app.js** Note: All DOM manipulation uses `textContent` and `createElement` — no `innerHTML` with untrusted data to prevent XSS. ```javascript const btn = document.getElementById('record-btn'); const statusText = document.getElementById('status-text'); const headerStatus = document.getElementById('header-status'); const preview = document.getElementById('preview'); const instructionsEl = document.getElementById('instructions'); const transcriptList = document.getElementById('transcript-list'); const STATUS_LABELS = { idle: 'Bereit', recording: 'Aufnahme läuft\u2026', processing: 'Wird verarbeitet\u2026', error: 'Fehler', }; instructionsEl.addEventListener('input', async () => { await fetch('/instructions', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ instructions: instructionsEl.value }), }); }); function setStatus(status) { btn.className = status; headerStatus.className = `status-badge ${status}`; const label = STATUS_LABELS[status] || status; statusText.textContent = label; headerStatus.textContent = label; btn.disabled = status === 'processing'; } btn.addEventListener('click', () => fetch('/toggle', { method: 'POST' })); function connectWs() { const ws = new WebSocket(`ws://${location.host}/ws`); ws.onmessage = (e) => { const msg = JSON.parse(e.data); if (msg.event === 'processing') setStatus('processing'); if (msg.event === 'transcribed' || msg.event === 'refined') { const text = msg.raw || msg.markdown || ''; preview.textContent = text; preview.classList.add('has-content'); } if (msg.event === 'saved') { setStatus('idle'); loadTranscripts(); } if (msg.event === 'error') { setStatus('idle'); preview.textContent = `Fehler: ${msg.message}`; } }; ws.onclose = () => setTimeout(connectWs, 2000); } async function loadTranscripts() { const r = await fetch('/transcripts'); const items = await r.json(); // Build DOM nodes — no innerHTML with untrusted data transcriptList.replaceChildren( ...items.map((t) => { const div = document.createElement('div'); div.className = 'transcript-item'; const name = document.createElement('span'); name.textContent = t.filename.replace('.md', ''); const meta = document.createElement('span'); meta.className = 'meta'; meta.textContent = `${Math.round(t.size / 1024 * 10) / 10} KB`; div.append(name, meta); div.addEventListener('click', () => { fetch('/open', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ path: t.path }), }); }); return div; }) ); } (async () => { const r = await fetch('/status'); const data = await r.json(); setStatus(data.status); instructionsEl.value = data.instructions || ''; connectWs(); loadTranscripts(); })(); ``` **Step 3: Commit** ```bash git add frontend/ git commit -m "feat: browser UI — tüit CI dark theme, XSS-safe DOM rendering" ``` --- ### Task 10: Main Entry Point + Tray + Signal Handler **Files:** - Create: `main.py` **Step 1: Implement main.py** ```python import asyncio import os import signal import threading import time import webbrowser from pathlib import Path import uvicorn from fastapi import FastAPI from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles import pystray from PIL import Image, ImageDraw from api.router import router from api.state import state, Status from config import load as load_config # ── FastAPI ──────────────────────────────────────────────────────────────────── app = FastAPI(title="tüit Transkriptor") app.include_router(router) FRONTEND_DIR = Path(__file__).parent / "frontend" @app.get("/") async def index(): return FileResponse(str(FRONTEND_DIR / "index.html")) @app.get("/app.js") async def appjs(): return FileResponse(str(FRONTEND_DIR / "app.js")) # ── PID file ─────────────────────────────────────────────────────────────────── def write_pid(): cfg = load_config() pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid")) os.makedirs(os.path.dirname(pid_path), exist_ok=True) Path(pid_path).write_text(str(os.getpid())) def remove_pid(): cfg = load_config() pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid")) try: os.unlink(pid_path) except FileNotFoundError: pass # ── SIGUSR1 → toggle ────────────────────────────────────────────────────────── _loop: asyncio.AbstractEventLoop | None = None def _sigusr1_handler(signum, frame): if _loop: _loop.call_soon_threadsafe(lambda: asyncio.ensure_future(_async_toggle())) async def _async_toggle(): from api.router import toggle_recording await toggle_recording() # ── Tray ─────────────────────────────────────────────────────────────────────── def _make_icon(recording: bool = False) -> Image.Image: img = Image.new("RGBA", (64, 64), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) color = (218, 37, 28, 255) if recording else (80, 80, 80, 255) draw.ellipse([8, 8, 56, 56], fill=color) return img def run_tray(port: int): icon = pystray.Icon( "tueit-transcriber", _make_icon(False), "tüit Transkriptor", menu=pystray.Menu( pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: ( _loop and _loop.call_soon_threadsafe( lambda: asyncio.ensure_future(_async_toggle()) ) ), default=True), pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")), pystray.MenuItem("Beenden", lambda i, it: (remove_pid(), icon.stop(), os._exit(0))), ), ) def update_icon(s): icon.icon = _make_icon(s.status == Status.RECORDING) state.subscribe(update_icon) icon.run() # ── Server ───────────────────────────────────────────────────────────────────── def run_server(port: int): uvicorn.run(app, host="127.0.0.1", port=port, log_level="warning") # ── Entrypoint ───────────────────────────────────────────────────────────────── if __name__ == "__main__": cfg = load_config() port = cfg["server"]["port"] write_pid() signal.signal(signal.SIGUSR1, _sigusr1_handler) # Store event loop reference for signal handler server_thread = threading.Thread(target=run_server, args=(port,), daemon=True) server_thread.start() # Give uvicorn a moment to bind time.sleep(0.8) # Capture the event loop uvicorn created # (uvicorn runs its own loop in the server thread — we need a separate loop # for the signal handler; SIGUSR1 triggers in the main thread) _loop = asyncio.new_event_loop() webbrowser.open(f"http://localhost:{port}") try: run_tray(port) finally: remove_pid() ``` **Step 2: Commit** ```bash git add main.py git commit -m "feat: main entry point — FastAPI + pystray tray + SIGUSR1 signal handler" ``` --- ### Task 11: install.sh + systemd User Service **Files:** - Create: `install.sh` **Step 1: Create install.sh** ```bash #!/usr/bin/env bash set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SERVICE_NAME="tueit-transcriber" SERVICE_FILE="$HOME/.config/systemd/user/${SERVICE_NAME}.service" echo "=== tüit Transkriptor Installer ===" command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; } if ! command -v ollama >/dev/null 2>&1; then echo "WARNING: ollama not found. Install from https://ollama.com" echo " After install: ollama pull gemma3:12b" fi if command -v rocminfo >/dev/null 2>&1; then echo "ROCm detected — GPU acceleration available" else echo "INFO: ROCm not found — Whisper will run on CPU (slower)" echo " To enable GPU: sudo pacman -S rocm-hip-sdk" fi echo "Installing Python dependencies..." pip install --user -r "$SCRIPT_DIR/requirements.txt" mkdir -p "$HOME/.config/systemd/user" cat > "$SERVICE_FILE" <