# Speaker Diarization & Name Identification Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Add speaker diarization (pyannote.audio) and automatic name identification (Ollama) to the pipeline, producing three documents per meeting: an index, a raw transcript with speaker labels, and a polished summary. **Architecture:** After recording, Whisper and pyannote run on the same WAV file; their timestamp-based segments are aligned to produce speaker-annotated text. Ollama tries to identify speaker names from context; if it can't, the frontend shows a speaker-naming card with excerpt navigation. The pipeline produces three linked markdown files per meeting. **Tech Stack:** pyannote.audio 3.x, faster-whisper (already present), httpx, asyncio.Event for pipeline pause, vanilla JS for speaker card --- ### Task 1: Add diarization config defaults **Files:** - Modify: `config.py` - Test: `tests/test_config.py` **Step 1: Write the failing test** Add to `tests/test_config.py`: ```python def test_config_has_diarization_defaults(): from unittest.mock import patch import tempfile, os with tempfile.TemporaryDirectory() as tmpdir: cfg_path = os.path.join(tmpdir, "config.toml") with patch("config.CONFIG_PATH", cfg_path): import config cfg = config.load() assert "diarization" in cfg assert cfg["diarization"]["enabled"] is False assert cfg["diarization"]["hf_token"] == "" ``` **Step 2: Run to verify it fails** ```bash cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest tests/test_config.py::test_config_has_diarization_defaults -v ``` Expected: FAIL — KeyError **Step 3: Add to `config.py` DEFAULTS** ```python "diarization": { "enabled": False, "hf_token": "", }, ``` Also add to the `_write_defaults` fallback string: ```python f.write('[diarization]\nenabled = false\nhf_token = ""\n\n') ``` **Step 4: Run all config tests** ```bash .venv/bin/pytest tests/test_config.py -v ``` Expected: all PASS **Step 5: Commit** ```bash git add config.py tests/test_config.py git commit -m "feat: add diarization config defaults (enabled=false, hf_token)" ``` --- ### Task 2: Extend transcription.py to return segments with timestamps **Files:** - Modify: `transcription.py` - Test: `tests/test_transcription.py` **Context:** The pipeline needs timestamps to align Whisper segments with pyannote speaker segments. Add `with_segments: bool = False` — when True, return `list[dict]` with `{start, end, text}` instead of a plain string. Backward compatible: default False keeps existing callers working. **Step 1: Write the failing tests** Add to `tests/test_transcription.py`: ```python def test_transcribe_file_returns_segments_when_requested(tmp_path): wav = tmp_path / "test.wav" wav.write_bytes(b"\x00" * 100) mock_model = MagicMock() mock_seg = MagicMock() mock_seg.text = " Hallo Welt" mock_seg.start = 0.0 mock_seg.end = 1.5 mock_model.transcribe.return_value = ([mock_seg], MagicMock()) from transcription import TranscriptionEngine eng = TranscriptionEngine() eng._model = mock_model result = asyncio.run(eng.transcribe_file(str(wav), language="de", with_segments=True)) assert isinstance(result, list) assert result[0]["text"] == "Hallo Welt" assert result[0]["start"] == 0.0 assert result[0]["end"] == 1.5 @pytest.mark.asyncio async def test_transcribe_remote_returns_segments_when_requested(tmp_path): import wave, struct wav = tmp_path / "test.wav" with wave.open(str(wav), "wb") as wf: wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(16000) wf.writeframes(struct.pack("<100h", *([0] * 100))) import respx, httpx from transcription import TranscriptionEngine eng = TranscriptionEngine() with respx.mock: respx.post("http://beastix:8000/v1/audio/transcriptions").mock( return_value=httpx.Response(200, json={ "text": "Hallo Welt", "segments": [{"start": 0.0, "end": 1.5, "text": " Hallo Welt"}], }) ) result = await eng.transcribe_file( str(wav), language="de", model_name="large-v3", device="auto", base_url="http://beastix:8000", with_segments=True, ) assert isinstance(result, list) assert result[0]["text"] == "Hallo Welt" ``` **Step 2: Run to verify they fail** ```bash .venv/bin/pytest tests/test_transcription.py::test_transcribe_file_returns_segments_when_requested tests/test_transcription.py::test_transcribe_remote_returns_segments_when_requested -v ``` **Step 3: Update `transcription.py`** Replace the entire file: ```python import asyncio import httpx from typing import Union class TranscriptionEngine: _model = None def _get_model(self, model_name: str = "large-v3", device: str = "auto"): if self._model is None: from faster_whisper import WhisperModel if device == "auto": try: self._model = WhisperModel(model_name, device="cuda", compute_type="float16") except Exception: self._model = WhisperModel(model_name, device="cpu", compute_type="int8") else: compute = "float16" if device in ("cuda", "rocm") else "int8" self._model = WhisperModel(model_name, device=device, compute_type=compute) return self._model async def transcribe_file( self, audio_path: str, language: str = "de", model_name: str = "large-v3", device: str = "auto", base_url: str = "", with_segments: bool = False, ) -> Union[str, list[dict]]: if base_url: return await self._transcribe_remote( audio_path, language, model_name, base_url, with_segments ) return await self._transcribe_local( audio_path, language, model_name, device, with_segments ) async def _transcribe_remote( self, audio_path: str, language: str, model_name: str, base_url: str, with_segments: bool, ) -> Union[str, list[dict]]: async with httpx.AsyncClient(timeout=300) as client: with open(audio_path, "rb") as f: data = {"model": model_name, "language": language} if with_segments: data["timestamp_granularities[]"] = "segment" data["response_format"] = "verbose_json" r = await client.post( f"{base_url}/v1/audio/transcriptions", files={"file": ("audio.wav", f, "audio/wav")}, data=data, ) r.raise_for_status() body = r.json() if not with_segments: return body["text"] raw_segs = body.get("segments") or [] if raw_segs: return [ {"start": s["start"], "end": s["end"], "text": s["text"].strip()} for s in raw_segs ] # fallback: single segment covering whole file return [{"start": 0.0, "end": 9999.0, "text": body["text"].strip()}] async def _transcribe_local( self, audio_path: str, language: str, model_name: str, device: str, with_segments: bool, ) -> Union[str, list[dict]]: loop = asyncio.get_running_loop() model = self._get_model(model_name, device) segments, _ = await loop.run_in_executor( None, lambda: model.transcribe(audio_path, language=language), ) segments = list(segments) if not with_segments: return "".join(seg.text for seg in segments).strip() return [ {"start": seg.start, "end": seg.end, "text": seg.text.strip()} for seg in segments if seg.text.strip() ] engine = TranscriptionEngine() ``` **Step 4: Run all transcription tests** ```bash .venv/bin/pytest tests/test_transcription.py -v ``` Expected: all PASS **Step 5: Commit** ```bash git add transcription.py tests/test_transcription.py git commit -m "feat: transcribe_file returns timestamped segments when with_segments=True" ``` --- ### Task 3: diarization.py — Diarizer class **Files:** - Create: `diarization.py` - Create: `tests/test_diarization.py` **Context:** Wraps pyannote.audio. Returns `list[tuple[float, float, str]]` — each entry is `(start_sec, end_sec, speaker_label)`. Loaded lazily. Runs in executor to avoid blocking. **Step 1: Install pyannote.audio** ```bash cd /home/templis/work/tueit_Transkriptor && .venv/bin/pip install pyannote.audio ``` Add to `requirements.txt`: ``` pyannote.audio>=3.3 ``` **Step 2: Write the failing test** Create `tests/test_diarization.py`: ```python from unittest.mock import MagicMock, patch import pytest def test_diarizer_returns_list_of_tuples(tmp_path): """Diarizer.diarize() returns [(start, end, speaker), ...]""" wav = tmp_path / "test.wav" wav.write_bytes(b"\x00" * 100) mock_turn_1 = MagicMock() mock_turn_1.start = 0.0 mock_turn_1.end = 2.5 mock_track_1 = "A" mock_label_1 = "SPEAKER_00" mock_turn_2 = MagicMock() mock_turn_2.start = 2.6 mock_turn_2.end = 5.0 mock_track_2 = "B" mock_label_2 = "SPEAKER_01" mock_annotation = MagicMock() mock_annotation.itertracks.return_value = [ (mock_turn_1, mock_track_1, mock_label_1), (mock_turn_2, mock_track_2, mock_label_2), ] mock_pipeline = MagicMock(return_value=mock_annotation) import asyncio from diarization import Diarizer d = Diarizer.__new__(Diarizer) d._pipeline = mock_pipeline result = asyncio.run(d.diarize(str(wav))) assert result == [(0.0, 2.5, "SPEAKER_00"), (2.6, 5.0, "SPEAKER_01")] def test_diarizer_requires_hf_token(): from diarization import Diarizer with pytest.raises(ValueError, match="hf_token"): Diarizer(hf_token="") ``` **Step 3: Run to verify it fails** ```bash .venv/bin/pytest tests/test_diarization.py -v ``` Expected: FAIL — `diarization` module not found **Step 4: Create `diarization.py`** ```python import asyncio class Diarizer: def __init__(self, hf_token: str): if not hf_token: raise ValueError("hf_token is required for diarization") self._hf_token = hf_token self._pipeline = None def _load_pipeline(self): if self._pipeline is None: from pyannote.audio import Pipeline self._pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", use_auth_token=self._hf_token, ) return self._pipeline async def diarize(self, wav_path: str) -> list[tuple[float, float, str]]: loop = asyncio.get_running_loop() pipeline = await loop.run_in_executor(None, self._load_pipeline) annotation = await loop.run_in_executor(None, lambda: pipeline(wav_path)) return [ (turn.start, turn.end, speaker) for turn, _, speaker in annotation.itertracks(yield_label=True) ] ``` **Step 5: Run tests** ```bash .venv/bin/pytest tests/test_diarization.py -v ``` Expected: all PASS **Step 6: Commit** ```bash git add diarization.py tests/test_diarization.py requirements.txt git commit -m "feat: Diarizer class wrapping pyannote/speaker-diarization-3.1" ``` --- ### Task 4: Alignment — align Whisper segments to pyannote speakers **Files:** - Create: `alignment.py` - Create: `tests/test_alignment.py` **Context:** For each Whisper segment, find the pyannote speaker with the greatest time overlap. Merge consecutive same-speaker segments into one block. Return `list[tuple[str, str]]` — `(speaker_label, text)`. **Step 1: Write the failing tests** Create `tests/test_alignment.py`: ```python def test_align_assigns_speaker_by_overlap(): from alignment import align_segments whisper = [ {"start": 0.0, "end": 2.0, "text": "Hallo"}, {"start": 2.1, "end": 4.0, "text": "Wie geht es"}, ] speakers = [ (0.0, 2.5, "SPEAKER_00"), (2.5, 5.0, "SPEAKER_01"), ] result = align_segments(whisper, speakers) assert result[0] == ("SPEAKER_00", "Hallo") assert result[1] == ("SPEAKER_01", "Wie geht es") def test_align_merges_consecutive_same_speaker(): from alignment import align_segments whisper = [ {"start": 0.0, "end": 1.0, "text": "Hallo"}, {"start": 1.1, "end": 2.0, "text": "Welt"}, ] speakers = [(0.0, 3.0, "SPEAKER_00")] result = align_segments(whisper, speakers) assert len(result) == 1 assert result[0] == ("SPEAKER_00", "Hallo Welt") def test_align_fallback_when_no_speaker_overlap(): from alignment import align_segments whisper = [{"start": 0.0, "end": 1.0, "text": "Hallo"}] speakers = [] result = align_segments(whisper, speakers) assert result[0][0] == "SPEAKER_00" ``` **Step 2: Run to verify they fail** ```bash .venv/bin/pytest tests/test_alignment.py -v ``` **Step 3: Create `alignment.py`** ```python def align_segments( whisper_segs: list[dict], speaker_segs: list[tuple[float, float, str]], ) -> list[tuple[str, str]]: """Assign each Whisper segment to the speaker with the greatest time overlap. Consecutive segments from the same speaker are merged into one block.""" result: list[tuple[str, str]] = [] for seg in whisper_segs: speaker = _best_speaker(seg["start"], seg["end"], speaker_segs) text = seg["text"].strip() if not text: continue if result and result[-1][0] == speaker: result[-1] = (speaker, result[-1][1] + " " + text) else: result.append((speaker, text)) return result def _best_speaker( start: float, end: float, speaker_segs: list[tuple[float, float, str]], ) -> str: best_label = "SPEAKER_00" best_overlap = 0.0 for s_start, s_end, label in speaker_segs: overlap = max(0.0, min(end, s_end) - max(start, s_start)) if overlap > best_overlap: best_overlap = overlap best_label = label return best_label ``` **Step 4: Run tests** ```bash .venv/bin/pytest tests/test_alignment.py -v ``` Expected: all PASS **Step 5: Commit** ```bash git add alignment.py tests/test_alignment.py git commit -m "feat: align_segments() — map Whisper timestamps to pyannote speakers" ``` --- ### Task 5: llm.py — identify_speakers() and summarize() **Files:** - Modify: `llm.py` - Test: `tests/test_llm.py` **Step 1: Write the failing tests** Add to `tests/test_llm.py`: ```python @pytest.mark.asyncio async def test_identify_speakers_returns_dict(): import respx, httpx, json from llm import OllamaClient client = OllamaClient() mapping = {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"} transcript_excerpt = "SPEAKER_00: Gut, Herr Möller.\nSPEAKER_01: Danke, Thomas." with respx.mock: respx.post("http://localhost:11434/api/generate").mock( return_value=httpx.Response(200, json={"response": json.dumps(mapping)}) ) result = await client.identify_speakers(transcript_excerpt) assert result == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"} @pytest.mark.asyncio async def test_identify_speakers_returns_empty_on_parse_failure(): import respx, httpx from llm import OllamaClient client = OllamaClient() with respx.mock: respx.post("http://localhost:11434/api/generate").mock( return_value=httpx.Response(200, json={"response": "kein json hier"}) ) result = await client.identify_speakers("irgendwas") assert result == {} @pytest.mark.asyncio async def test_summarize_returns_string(): import respx, httpx from llm import OllamaClient client = OllamaClient() with respx.mock: respx.post("http://localhost:11434/api/generate").mock( return_value=httpx.Response(200, json={"response": "# Zusammenfassung\n\nKurzer Text."}) ) result = await client.summarize("Thomas: Hallo.\nMöller: Hi.", model="gemma3:12b") assert "Zusammenfassung" in result ``` **Step 2: Run to verify they fail** ```bash .venv/bin/pytest tests/test_llm.py::test_identify_speakers_returns_dict tests/test_llm.py::test_identify_speakers_returns_empty_on_parse_failure tests/test_llm.py::test_summarize_returns_string -v ``` **Step 3: Add methods to `llm.py`** ```python IDENTIFY_SPEAKERS_PROMPT = """Du bekommst den Anfang eines Gesprächstranskripts mit Sprecher-Labels (SPEAKER_00, SPEAKER_01, ...). Ermittle, welche echten Namen den Sprechern zugeordnet werden können — z.B. durch direkte Anrede ("Herr Möller", "Frank"). Antworte NUR mit einem JSON-Objekt: {"SPEAKER_00": "Name oder null", "SPEAKER_01": "Name oder null"} Kein weiterer Text, keine Erklärung.""" SUMMARIZE_PROMPT = """Du bist ein präziser Assistent für Business-Kommunikation. Du bekommst ein Gesprächstranskript mit Sprecher-Labels. Erstelle eine strukturierte Zusammenfassung auf Deutsch mit: 1. Einem passenden H1-Titel 2. ## Wichtigste Punkte (Aufzählung) 3. ## Offene Fragen (Aufzählung, falls vorhanden) 4. ## Nächste Schritte / Ideen (Aufzählung, falls vorhanden) Antworte NUR mit dem fertigen Markdown.""" ``` Add to `OllamaClient`: ```python async def identify_speakers( self, transcript_excerpt: str, model: str = "gemma3:12b", ) -> dict[str, str]: """Try to map SPEAKER_XX labels to real names. Returns {} on failure.""" import json async with httpx.AsyncClient(timeout=60) as client: r = await client.post( f"{self.base_url}/api/generate", json={ "model": model, "prompt": f"Transkript-Anfang:\n{transcript_excerpt[:2000]}", "system": IDENTIFY_SPEAKERS_PROMPT, "stream": False, }, ) r.raise_for_status() raw = r.json()["response"].strip() try: data = json.loads(raw) if not isinstance(data, dict): return {} return {k: v for k, v in data.items() if v} except (json.JSONDecodeError, Exception): return {} async def summarize( self, annotated_transcript: str, model: str = "gemma3:12b", ) -> str: async with httpx.AsyncClient(timeout=180) as client: r = await client.post( f"{self.base_url}/api/generate", json={ "model": model, "prompt": f"Transkript:\n{annotated_transcript}", "system": SUMMARIZE_PROMPT, "stream": False, }, ) r.raise_for_status() return r.json()["response"].strip() ``` **Step 4: Run all llm tests** ```bash .venv/bin/pytest tests/test_llm.py -v ``` Expected: all PASS **Step 5: Commit** ```bash git add llm.py tests/test_llm.py git commit -m "feat: OllamaClient.identify_speakers() and summarize() for diarization pipeline" ``` --- ### Task 6: output.py — write_meeting_docs() **Files:** - Modify: `output.py` - Test: `tests/test_output.py` **Context:** Writes three files: `{base}-index.md`, `{base}-transkript.md`, `{base}-zusammenfassung.md`. Returns all three paths. **Step 1: Write the failing test** Add to `tests/test_output.py`: ```python def test_write_meeting_docs_creates_three_files(tmp_path): from output import write_meeting_docs from datetime import datetime aligned = [("Thomas", "Gut, dann fangen wir an."), ("Möller", "Ich hab das vorbereitet.")] paths = write_meeting_docs( aligned_segments=aligned, summary="# Meeting\n\n## Wichtigste Punkte\n- Budget besprochen", speakers=["Thomas", "Möller"], duration_min=5, output_dir=str(tmp_path), dt=datetime(2026, 4, 2, 14, 30), ) assert len(paths) == 3 index_content = open(paths["index"]).read() assert "Thomas" in index_content assert "transkript" in index_content transcript_content = open(paths["transkript"]).read() assert "**Thomas:**" in transcript_content assert "Gut, dann fangen wir an." in transcript_content summary_content = open(paths["zusammenfassung"]).read() assert "Budget besprochen" in summary_content ``` **Step 2: Run to verify it fails** ```bash .venv/bin/pytest tests/test_output.py::test_write_meeting_docs_creates_three_files -v ``` **Step 3: Add to `output.py`** ```python def write_meeting_docs( aligned_segments: list[tuple[str, str]], summary: str, speakers: list[str], duration_min: int, output_dir: str, dt: "datetime | None" = None, ) -> dict[str, str]: """Write index, transkript, and zusammenfassung. Returns {type: path}.""" from datetime import datetime if dt is None: dt = datetime.now() os.makedirs(output_dir, exist_ok=True) base = dt.strftime("%Y-%m-%d-%H%M") + "-meeting" date_str = dt.strftime("%d.%m.%Y %H:%M") frontmatter_base = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting]\n---\n\n" # --- transkript --- transcript_lines = [] for speaker, text in aligned_segments: transcript_lines.append(f"**{speaker}:** {text}\n") transcript_content = "\n".join(transcript_lines) transkript_filename = f"{base}-transkript.md" transkript_path = os.path.join(output_dir, transkript_filename) with open(transkript_path, "w", encoding="utf-8") as f: f.write(frontmatter_base) f.write(transcript_content) if not transcript_content.endswith("\n"): f.write("\n") # --- zusammenfassung --- zusammenfassung_filename = f"{base}-zusammenfassung.md" zusammenfassung_path = os.path.join(output_dir, zusammenfassung_filename) with open(zusammenfassung_path, "w", encoding="utf-8") as f: f.write(frontmatter_base) f.write(summary) if not summary.endswith("\n"): f.write("\n") # --- index --- speaker_str = ", ".join(speakers) if speakers else "Unbekannt" tl_dr = _extract_tldr(summary) index_content = ( f"# Meeting — {date_str}\n\n" f"**Sprecher:** {speaker_str} \n" f"**Dauer:** {duration_min} min\n\n" f"> {tl_dr}\n\n" f"- [Transkript]({transkript_filename})\n" f"- [Zusammenfassung]({zusammenfassung_filename})\n" ) index_filename = f"{base}-index.md" index_path = os.path.join(output_dir, index_filename) with open(index_path, "w", encoding="utf-8") as f: f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting, index]\n---\n\n") f.write(index_content) return {"index": index_path, "transkript": transkript_path, "zusammenfassung": zusammenfassung_path} def _extract_tldr(summary: str) -> str: """Return the first non-heading, non-empty line from the summary as TL;DR.""" for line in summary.splitlines(): stripped = line.strip() if stripped and not stripped.startswith("#"): return stripped[:200] return "Kein TL;DR verfügbar." ``` **Step 4: Run all output tests** ```bash .venv/bin/pytest tests/test_output.py -v ``` Expected: all PASS **Step 5: Commit** ```bash git add output.py tests/test_output.py git commit -m "feat: write_meeting_docs() — creates index, transkript, zusammenfassung" ``` --- ### Task 7: api/state.py — add pending speaker state **Files:** - Modify: `api/state.py` - Test: `tests/test_api.py` **Context:** The pipeline pauses after alignment, waiting for speaker names. It uses an `asyncio.Event` stored in state. The `/speakers` endpoint sets names and signals the event. **Step 1: Write the failing test** Add to `tests/test_api.py`: ```python def test_state_has_speaker_fields(): from api.state import AppState s = AppState() assert hasattr(s, "_speakers_event") assert hasattr(s, "_pending_aligned_segments") assert hasattr(s, "_speaker_names") assert s._speakers_event is None assert s._pending_aligned_segments is None assert s._speaker_names is None ``` **Step 2: Run to verify it fails** ```bash .venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v ``` **Step 3: Update `api/state.py`** ```python import asyncio from dataclasses import dataclass, field from enum import Enum from typing import Callable class Status(str, Enum): IDLE = "idle" RECORDING = "recording" PROCESSING = "processing" AWAITING_SPEAKERS = "awaiting_speakers" ERROR = "error" @dataclass class AppState: status: Status = Status.IDLE recording_user: str | None = None last_error: str | None = None _listeners: list[Callable] = field(default_factory=list, repr=False) # Diarization pipeline pause _speakers_event: asyncio.Event | None = None _pending_aligned_segments: list[tuple[str, str]] | None = None _speaker_names: dict[str, str] | None = None def subscribe(self, callback: Callable): self._listeners.append(callback) async def notify(self): for cb in self._listeners: if asyncio.iscoroutinefunction(cb): await cb(self) else: cb(self) async def set_status(self, status: Status): self.status = status await self.notify() state = AppState() ``` Note: `AWAITING_SPEAKERS` status is added so the UI can show a distinct state. **Step 4: Run tests** ```bash .venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v ``` **Step 5: Commit** ```bash git add api/state.py tests/test_api.py git commit -m "feat: AppState gains speaker pause fields and AWAITING_SPEAKERS status" ``` --- ### Task 8: api/router.py — POST /speakers endpoint **Files:** - Modify: `api/router.py` - Test: `tests/test_api.py` **Step 1: Write the failing test** Add to `tests/test_api.py`: ```python import asyncio as _asyncio def test_post_speakers_resolves_pipeline_pause(): from main import app from api.router import current_user from api.state import state import asyncio # Simulate pipeline waiting for speakers state._speakers_event = asyncio.Event() state._speaker_names = None app.dependency_overrides[current_user] = lambda: {"username": "u", "output_dir": "/tmp", "is_admin": False} try: from fastapi.testclient import TestClient client = TestClient(app) r = client.post("/speakers", json={"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}) assert r.status_code == 200 assert state._speaker_names == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"} assert state._speakers_event.is_set() finally: app.dependency_overrides.pop(current_user, None) state._speakers_event = None state._speaker_names = None ``` **Step 2: Run to verify it fails** ```bash .venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v ``` **Step 3: Add endpoint to `api/router.py`** Add after the existing endpoints (before the websocket): ```python @router.post("/speakers") async def post_speakers(body: dict, user: dict = Depends(current_user)): if state._speakers_event is None: raise HTTPException(status_code=409, detail="Keine ausstehende Sprecher-Zuordnung") state._speaker_names = {k: v for k, v in body.items() if isinstance(k, str)} state._speakers_event.set() return {"ok": True} ``` **Step 4: Run tests** ```bash .venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v ``` **Step 5: Commit** ```bash git add api/router.py tests/test_api.py git commit -m "feat: POST /speakers — resolves pipeline pause with speaker name mapping" ``` --- ### Task 9: api/pipeline.py — extend with diarization path **Files:** - Modify: `api/pipeline.py` **Context:** When `diarization.enabled` is true and `hf_token` is set, run Whisper (with segments) and pyannote in parallel, align, try Ollama name identification, emit `speakers_unknown` if needed, then write three documents. If diarization is disabled, run the old single-document path unchanged. **Step 1: Read current `api/pipeline.py`** — already read above. **Step 2: No new test here** — the pipeline is tested through integration. The individual components (alignment, diarization, llm) are tested separately. **Step 3: Rewrite `api/pipeline.py`** ```python import asyncio import logging import os import tempfile import traceback from datetime import datetime from api.state import state, Status from api.router import broadcast from config import load as load_config from transcription import engine as transcription_engine from llm import OllamaClient from output import save_transcript, write_meeting_docs logger = logging.getLogger(__name__) async def run_pipeline(): cfg = load_config() recorder = getattr(state, "_recorder", None) if recorder is None: return output_dir = getattr(state, "_recording_output_dir", cfg["output"]["path"]) instructions = getattr(state, "_recording_instructions", "") diar_cfg = cfg.get("diarization", {}) use_diarization = diar_cfg.get("enabled") and diar_cfg.get("hf_token") recorder.stop() await state.set_status(Status.PROCESSING) await broadcast({"event": "processing"}) wav_path = None try: with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f: wav_path = f.name recorder.save_wav(wav_path) if use_diarization: await _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg) else: await _run_solo_pipeline(cfg, wav_path, output_dir, instructions) except Exception as e: tb = traceback.format_exc() logger.error("Pipeline error:\n%s", tb) state.last_error = str(e) await state.set_status(Status.ERROR) await broadcast({"event": "error", "message": str(e)}) finally: state.recording_user = None state._recording_output_dir = None state._recording_instructions = "" state._speakers_event = None state._pending_aligned_segments = None state._speaker_names = None if wav_path: try: os.unlink(wav_path) except OSError: pass async def _run_solo_pipeline(cfg, wav_path, output_dir, instructions): """Original single-document pipeline (no diarization).""" raw_text = await transcription_engine.transcribe_file( wav_path, language=cfg["whisper"]["language"], model_name=cfg["whisper"]["model"], device=cfg["whisper"]["device"], base_url=cfg["whisper"].get("base_url", ""), ) await broadcast({"event": "transcribed", "raw": raw_text}) client = OllamaClient(base_url=cfg["ollama"]["base_url"]) refined = await client.refine( raw_text=raw_text, instructions=instructions, model=cfg["ollama"]["model"], ) title = "Diktat" for line in refined.splitlines(): if line.startswith("# "): title = line[2:].strip() break path = save_transcript(title=title, content=refined, output_dir=output_dir) await broadcast({"event": "saved", "path": path, "title": title}) await state.set_status(Status.IDLE) async def _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg): """Diarization pipeline: 3 documents, speaker identification.""" from diarization import Diarizer from alignment import align_segments # Run Whisper and pyannote in parallel diarizer = Diarizer(hf_token=diar_cfg["hf_token"]) whisper_task = asyncio.create_task( transcription_engine.transcribe_file( wav_path, language=cfg["whisper"]["language"], model_name=cfg["whisper"]["model"], device=cfg["whisper"]["device"], base_url=cfg["whisper"].get("base_url", ""), with_segments=True, ) ) diar_task = asyncio.create_task(diarizer.diarize(wav_path)) whisper_segs, speaker_segs = await asyncio.gather(whisper_task, diar_task) # Align aligned = align_segments(whisper_segs, speaker_segs) await broadcast({"event": "transcribed", "raw": " ".join(t for _, t in aligned)}) # Try Ollama name identification excerpt = "\n".join(f"{s}: {t}" for s, t in aligned[:20]) client = OllamaClient(base_url=cfg["ollama"]["base_url"]) name_map = await client.identify_speakers(excerpt, model=cfg["ollama"]["model"]) if not name_map: # Pause and ask user excerpts_per_speaker = _build_excerpts(aligned) state._speakers_event = asyncio.Event() state._pending_aligned_segments = aligned await state.set_status(Status.AWAITING_SPEAKERS) await broadcast({"event": "speakers_unknown", "speakers": [ {"id": spk, "excerpts": exs} for spk, exs in excerpts_per_speaker.items() ]}) await state._speakers_event.wait() name_map = state._speaker_names or {} # Apply names def resolve(label): return name_map.get(label) or label.replace("SPEAKER_0", "Sprecher ").replace("SPEAKER_", "Sprecher ") named_aligned = [(resolve(spk), text) for spk, text in aligned] speakers = sorted({spk for spk, _ in named_aligned}) # Duration total_secs = sum(s["end"] - s["start"] for s in whisper_segs) if whisper_segs else 0 duration_min = max(1, round(total_secs / 60)) # Full transcript text for summarization transcript_text = "\n\n".join(f"**{spk}:** {txt}" for spk, txt in named_aligned) # Summarize summary = await client.summarize(transcript_text, model=cfg["ollama"]["model"]) # Write three documents dt = datetime.now() paths = write_meeting_docs( aligned_segments=named_aligned, summary=summary, speakers=speakers, duration_min=duration_min, output_dir=output_dir, dt=dt, ) await state.set_status(Status.IDLE) await broadcast({ "event": "saved", "path": paths["index"], "title": f"Meeting {dt.strftime('%d.%m.%Y %H:%M')}", "meeting": True, "paths": paths, }) def _build_excerpts(aligned: list[tuple[str, str]], max_per_speaker: int = 4) -> dict[str, list[str]]: """Build a dict of speaker → list of text excerpts (3-4 sentences each).""" from collections import defaultdict buckets: dict[str, list[str]] = defaultdict(list) for spk, text in aligned: if len(buckets[spk]) < max_per_speaker: buckets[spk].append(text[:200]) return dict(buckets) ``` **Step 4: Run full test suite to check nothing broke** ```bash .venv/bin/pytest -v 2>&1 | tail -20 ``` Expected: all PASS **Step 5: Commit** ```bash git add api/pipeline.py git commit -m "feat: meeting pipeline — diarization, speaker ID, 3-doc output" ``` --- ### Task 10: Frontend — speaker naming card **Files:** - Modify: `frontend/index.html` (CSS + HTML) - Modify: `frontend/app.js` (WS handler + card logic) **Step 1: Add CSS to `frontend/index.html`** Add inside ``): ```css .speaker-card { background: var(--surface); border: 1px solid var(--yellow); border-radius: 10px; padding: 20px; display: flex; flex-direction: column; gap: 16px; } .speaker-card.hidden { display: none; } .speaker-card h3 { font-size: .8rem; color: var(--yellow); text-transform: uppercase; letter-spacing: .08em; margin: 0; } .speaker-row { display: flex; flex-direction: column; gap: 8px; } .speaker-excerpt { font-size: .82rem; color: var(--muted); background: var(--surface2); border-radius: 6px; padding: 8px 12px; min-height: 48px; } .excerpt-nav { display: flex; align-items: center; gap: 8px; } .excerpt-nav button { background: none; border: 1px solid var(--border); color: var(--muted); border-radius: 4px; width: 28px; height: 28px; cursor: pointer; font-size: 1rem; display: flex; align-items: center; justify-content: center; transition: border-color .15s, color .15s; } .excerpt-nav button:hover { border-color: var(--yellow); color: var(--yellow); } .excerpt-counter { font-size: .75rem; color: var(--muted); min-width: 30px; text-align: center; } .speaker-name-input { background: var(--surface2); border: 1px solid var(--border); color: var(--text); border-radius: 6px; padding: 8px 12px; font-family: inherit; font-size: .9rem; outline: none; width: 100%; transition: border-color .15s; } .speaker-name-input:focus { border-color: var(--yellow); } .speaker-card-actions { display: flex; gap: 10px; } .card-btn { font-size: .82rem; padding: 8px 16px; border-radius: 8px; border: 1px solid var(--border); background: var(--surface2); color: var(--text); cursor: pointer; font-family: inherit; transition: border-color .15s; } .card-btn:hover { border-color: var(--red); } .card-btn.primary { background: var(--yellow); border-color: var(--yellow); color: #111; } .card-btn.primary:hover { background: #e6c200; border-color: #e6c200; } ``` **Step 2: Add HTML to `frontend/index.html`** Add just before `
`: ```html ``` **Step 3: Add JS to `frontend/app.js`** Add after the existing constants at the top: ```javascript const speakerCard = document.getElementById('speaker-card'); const speakerRows = document.getElementById('speaker-rows'); const speakerConfirmBtn = document.getElementById('speaker-confirm-btn'); const speakerAnonBtn = document.getElementById('speaker-anon-btn'); let _speakerData = []; // [{id, excerpts, inputEl, currentIdx}, ...] ``` Add to the `STATUS_LABELS`: ```javascript awaiting_speakers: 'Sprecher zuordnen\u2026', ``` Replace the `ws.onmessage` handler — add handling for `speakers_unknown`: ```javascript ws.onmessage = (e) => { const msg = JSON.parse(e.data); if (msg.event === 'processing') setStatus('processing'); if (msg.event === 'saved') { setStatus('idle'); hideSpeakerCard(); loadTranscripts(); } if (msg.event === 'error') setStatus('error'); if (msg.event === 'speakers_unknown') showSpeakerCard(msg.speakers); }; ``` Add functions for the speaker card: ```javascript function showSpeakerCard(speakers) { _speakerData = []; speakerRows.replaceChildren(); speakers.forEach(function(s) { const row = document.createElement('div'); row.className = 'speaker-row'; const excerptEl = document.createElement('div'); excerptEl.className = 'speaker-excerpt'; excerptEl.textContent = s.excerpts[0] || ''; const counter = document.createElement('span'); counter.className = 'excerpt-counter'; counter.textContent = s.excerpts.length > 1 ? '1/' + s.excerpts.length : ''; let idx = 0; const prev = document.createElement('button'); prev.textContent = '\u2039'; const next = document.createElement('button'); next.textContent = '\u203a'; function updateExcerpt() { excerptEl.textContent = s.excerpts[idx] || ''; counter.textContent = s.excerpts.length > 1 ? (idx + 1) + '/' + s.excerpts.length : ''; } prev.addEventListener('click', function() { if (idx > 0) { idx--; updateExcerpt(); } }); next.addEventListener('click', function() { if (idx < s.excerpts.length - 1) { idx++; updateExcerpt(); } }); const nav = document.createElement('div'); nav.className = 'excerpt-nav'; if (s.excerpts.length > 1) { nav.append(prev, counter, next); } const input = document.createElement('input'); input.type = 'text'; input.className = 'speaker-name-input'; input.placeholder = s.id.replace('SPEAKER_0', 'Sprecher ').replace('SPEAKER_', 'Sprecher '); row.append(excerptEl, nav, input); speakerRows.appendChild(row); _speakerData.push({ id: s.id, input: input }); }); speakerCard.classList.remove('hidden'); setStatus('awaiting_speakers'); } function hideSpeakerCard() { speakerCard.classList.add('hidden'); _speakerData = []; } async function submitSpeakers(useNames) { const body = {}; _speakerData.forEach(function(s) { body[s.id] = useNames ? s.input.value.trim() : ''; }); await apiFetch('/speakers', { method: 'POST', body: JSON.stringify(body) }); } speakerConfirmBtn.addEventListener('click', function() { submitSpeakers(true); }); speakerAnonBtn.addEventListener('click', function() { submitSpeakers(false); }); ``` **Step 4: Run full suite (no automated test for UI, visual check at step 5)** ```bash .venv/bin/pytest -v 2>&1 | tail -15 ``` Expected: all PASS (no test for UI JS) **Step 5: Commit** ```bash git add frontend/index.html frontend/app.js git commit -m "feat: speaker naming card with excerpt navigator in main UI" ``` --- ### Task 11: Settings page — diarization section **Files:** - Modify: `frontend/settings.html` - Modify: `frontend/settings.js` **Step 1: Add HTML section to `frontend/settings.html`** After the `
` of the Processing section, add: ```html

Diarisierung

Einmalig: pyannote-Modell freischalten und Token mit Read-Berechtigung erstellen.

``` **Step 2: Add to `frontend/settings.js`** In `loadConfig()`, add after the ollama lines: ```javascript document.getElementById('diar-enabled').checked = !!(cfg.diarization && cfg.diarization.enabled); document.getElementById('diar-token').value = (cfg.diarization && cfg.diarization.hf_token) || ''; ``` In the save button handler, add to `body`: ```javascript diarization: { enabled: document.getElementById('diar-enabled').checked, hf_token: document.getElementById('diar-token').value, }, ``` **Step 3: Run full suite** ```bash .venv/bin/pytest -v 2>&1 | tail -15 ``` Expected: all PASS **Step 4: Commit** ```bash git add frontend/settings.html frontend/settings.js git commit -m "feat: diarization section in settings — hf_token and enabled toggle" ``` --- ### Task 12: Update SETUP.md with HuggingFace instructions **Files:** - Modify: `docs/SETUP.md` **Step 1: Add section to `docs/SETUP.md`** Add after the "Firewall" section: ```markdown ### 5. HuggingFace — pyannote-Modell freischalten (für Diarisierung) 1. Account erstellen auf [huggingface.co](https://huggingface.co) 2. Modell-Seite öffnen: https://huggingface.co/pyannote/speaker-diarization-3.1 → **"Access repository"** klicken und Nutzungsbedingungen bestätigen 3. Token erstellen: https://huggingface.co/settings/tokens → **New token** → Typ: **Read** → Token kopieren 4. Im Transkriptor: Einstellungen → Diarisierung → Token einfügen + aktivieren ``` **Step 2: Commit** ```bash git add docs/SETUP.md git commit -m "docs: HuggingFace setup instructions for pyannote diarization" ``` --- ### Task 13: Full test suite + push **Step 1: Run full test suite** ```bash cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest -v ``` Expected: all tests PASS **Step 2: Manual smoke test checklist** Restart app (`pkill -f main.py && .venv/bin/python main.py &`), then: - [ ] Einstellungen → Diarisierung: Token eintragen, aktivieren, speichern - [ ] Aufnahme starten, kurzes Gespräch führen - [ ] Status wechselt zu "Sprecher zuordnen…", Karte erscheint - [ ] Excerpts durchblättern (‹ ›) - [ ] Namen eingeben → Übernehmen - [ ] Drei Einträge in der Transkript-Liste (index, transkript, zusammenfassung) - [ ] Index zeigt TL;DR + Links zu den anderen beiden - [ ] Transcript zeigt `**Thomas:** …` Absätze - [ ] Zusammenfassung hat ## Abschnitte - [ ] Diarisierung deaktiviert → normales Diktat-Verhalten **Step 3: Push** ```bash git push ```