Files
tueit_Transkriptor/docs/plans/2026-04-02-diarization.md

44 KiB
Raw Permalink Blame History

Speaker Diarization & Name Identification Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Add speaker diarization (pyannote.audio) and automatic name identification (Ollama) to the pipeline, producing three documents per meeting: an index, a raw transcript with speaker labels, and a polished summary.

Architecture: After recording, Whisper and pyannote run on the same WAV file; their timestamp-based segments are aligned to produce speaker-annotated text. Ollama tries to identify speaker names from context; if it can't, the frontend shows a speaker-naming card with excerpt navigation. The pipeline produces three linked markdown files per meeting.

Tech Stack: pyannote.audio 3.x, faster-whisper (already present), httpx, asyncio.Event for pipeline pause, vanilla JS for speaker card


Task 1: Add diarization config defaults

Files:

  • Modify: config.py
  • Test: tests/test_config.py

Step 1: Write the failing test

Add to tests/test_config.py:

def test_config_has_diarization_defaults():
    from unittest.mock import patch
    import tempfile, os
    with tempfile.TemporaryDirectory() as tmpdir:
        cfg_path = os.path.join(tmpdir, "config.toml")
        with patch("config.CONFIG_PATH", cfg_path):
            import config
            cfg = config.load()
            assert "diarization" in cfg
            assert cfg["diarization"]["enabled"] is False
            assert cfg["diarization"]["hf_token"] == ""

Step 2: Run to verify it fails

cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest tests/test_config.py::test_config_has_diarization_defaults -v

Expected: FAIL — KeyError

Step 3: Add to config.py DEFAULTS

"diarization": {
    "enabled": False,
    "hf_token": "",
},

Also add to the _write_defaults fallback string:

f.write('[diarization]\nenabled = false\nhf_token = ""\n\n')

Step 4: Run all config tests

.venv/bin/pytest tests/test_config.py -v

Expected: all PASS

Step 5: Commit

git add config.py tests/test_config.py
git commit -m "feat: add diarization config defaults (enabled=false, hf_token)"

Task 2: Extend transcription.py to return segments with timestamps

Files:

  • Modify: transcription.py
  • Test: tests/test_transcription.py

Context: The pipeline needs timestamps to align Whisper segments with pyannote speaker segments. Add with_segments: bool = False — when True, return list[dict] with {start, end, text} instead of a plain string. Backward compatible: default False keeps existing callers working.

Step 1: Write the failing tests

Add to tests/test_transcription.py:

def test_transcribe_file_returns_segments_when_requested(tmp_path):
    wav = tmp_path / "test.wav"
    wav.write_bytes(b"\x00" * 100)

    mock_model = MagicMock()
    mock_seg = MagicMock()
    mock_seg.text = " Hallo Welt"
    mock_seg.start = 0.0
    mock_seg.end = 1.5
    mock_model.transcribe.return_value = ([mock_seg], MagicMock())

    from transcription import TranscriptionEngine
    eng = TranscriptionEngine()
    eng._model = mock_model

    result = asyncio.run(eng.transcribe_file(str(wav), language="de", with_segments=True))
    assert isinstance(result, list)
    assert result[0]["text"] == "Hallo Welt"
    assert result[0]["start"] == 0.0
    assert result[0]["end"] == 1.5


@pytest.mark.asyncio
async def test_transcribe_remote_returns_segments_when_requested(tmp_path):
    import wave, struct
    wav = tmp_path / "test.wav"
    with wave.open(str(wav), "wb") as wf:
        wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(16000)
        wf.writeframes(struct.pack("<100h", *([0] * 100)))

    import respx, httpx
    from transcription import TranscriptionEngine
    eng = TranscriptionEngine()

    with respx.mock:
        respx.post("http://beastix:8000/v1/audio/transcriptions").mock(
            return_value=httpx.Response(200, json={
                "text": "Hallo Welt",
                "segments": [{"start": 0.0, "end": 1.5, "text": " Hallo Welt"}],
            })
        )
        result = await eng.transcribe_file(
            str(wav), language="de", model_name="large-v3",
            device="auto", base_url="http://beastix:8000", with_segments=True,
        )
    assert isinstance(result, list)
    assert result[0]["text"] == "Hallo Welt"

Step 2: Run to verify they fail

.venv/bin/pytest tests/test_transcription.py::test_transcribe_file_returns_segments_when_requested tests/test_transcription.py::test_transcribe_remote_returns_segments_when_requested -v

Step 3: Update transcription.py

Replace the entire file:

import asyncio
import httpx
from typing import Union


class TranscriptionEngine:
    _model = None

    def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
        if self._model is None:
            from faster_whisper import WhisperModel
            if device == "auto":
                try:
                    self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
                except Exception:
                    self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
            else:
                compute = "float16" if device in ("cuda", "rocm") else "int8"
                self._model = WhisperModel(model_name, device=device, compute_type=compute)
        return self._model

    async def transcribe_file(
        self,
        audio_path: str,
        language: str = "de",
        model_name: str = "large-v3",
        device: str = "auto",
        base_url: str = "",
        with_segments: bool = False,
    ) -> Union[str, list[dict]]:
        if base_url:
            return await self._transcribe_remote(
                audio_path, language, model_name, base_url, with_segments
            )
        return await self._transcribe_local(
            audio_path, language, model_name, device, with_segments
        )

    async def _transcribe_remote(
        self,
        audio_path: str,
        language: str,
        model_name: str,
        base_url: str,
        with_segments: bool,
    ) -> Union[str, list[dict]]:
        async with httpx.AsyncClient(timeout=300) as client:
            with open(audio_path, "rb") as f:
                data = {"model": model_name, "language": language}
                if with_segments:
                    data["timestamp_granularities[]"] = "segment"
                    data["response_format"] = "verbose_json"
                r = await client.post(
                    f"{base_url}/v1/audio/transcriptions",
                    files={"file": ("audio.wav", f, "audio/wav")},
                    data=data,
                )
            r.raise_for_status()
            body = r.json()
        if not with_segments:
            return body["text"]
        raw_segs = body.get("segments") or []
        if raw_segs:
            return [
                {"start": s["start"], "end": s["end"], "text": s["text"].strip()}
                for s in raw_segs
            ]
        # fallback: single segment covering whole file
        return [{"start": 0.0, "end": 9999.0, "text": body["text"].strip()}]

    async def _transcribe_local(
        self,
        audio_path: str,
        language: str,
        model_name: str,
        device: str,
        with_segments: bool,
    ) -> Union[str, list[dict]]:
        loop = asyncio.get_running_loop()
        model = self._get_model(model_name, device)
        segments, _ = await loop.run_in_executor(
            None,
            lambda: model.transcribe(audio_path, language=language),
        )
        segments = list(segments)
        if not with_segments:
            return "".join(seg.text for seg in segments).strip()
        return [
            {"start": seg.start, "end": seg.end, "text": seg.text.strip()}
            for seg in segments
            if seg.text.strip()
        ]


engine = TranscriptionEngine()

Step 4: Run all transcription tests

.venv/bin/pytest tests/test_transcription.py -v

Expected: all PASS

Step 5: Commit

git add transcription.py tests/test_transcription.py
git commit -m "feat: transcribe_file returns timestamped segments when with_segments=True"

Task 3: diarization.py — Diarizer class

Files:

  • Create: diarization.py
  • Create: tests/test_diarization.py

Context: Wraps pyannote.audio. Returns list[tuple[float, float, str]] — each entry is (start_sec, end_sec, speaker_label). Loaded lazily. Runs in executor to avoid blocking.

Step 1: Install pyannote.audio

cd /home/templis/work/tueit_Transkriptor && .venv/bin/pip install pyannote.audio

Add to requirements.txt:

pyannote.audio>=3.3

Step 2: Write the failing test

Create tests/test_diarization.py:

from unittest.mock import MagicMock, patch
import pytest


def test_diarizer_returns_list_of_tuples(tmp_path):
    """Diarizer.diarize() returns [(start, end, speaker), ...]"""
    wav = tmp_path / "test.wav"
    wav.write_bytes(b"\x00" * 100)

    mock_turn_1 = MagicMock()
    mock_turn_1.start = 0.0
    mock_turn_1.end = 2.5
    mock_track_1 = "A"
    mock_label_1 = "SPEAKER_00"

    mock_turn_2 = MagicMock()
    mock_turn_2.start = 2.6
    mock_turn_2.end = 5.0
    mock_track_2 = "B"
    mock_label_2 = "SPEAKER_01"

    mock_annotation = MagicMock()
    mock_annotation.itertracks.return_value = [
        (mock_turn_1, mock_track_1, mock_label_1),
        (mock_turn_2, mock_track_2, mock_label_2),
    ]

    mock_pipeline = MagicMock(return_value=mock_annotation)

    import asyncio
    from diarization import Diarizer
    d = Diarizer.__new__(Diarizer)
    d._pipeline = mock_pipeline

    result = asyncio.run(d.diarize(str(wav)))
    assert result == [(0.0, 2.5, "SPEAKER_00"), (2.6, 5.0, "SPEAKER_01")]


def test_diarizer_requires_hf_token():
    from diarization import Diarizer
    with pytest.raises(ValueError, match="hf_token"):
        Diarizer(hf_token="")

Step 3: Run to verify it fails

.venv/bin/pytest tests/test_diarization.py -v

Expected: FAIL — diarization module not found

Step 4: Create diarization.py

import asyncio


class Diarizer:
    def __init__(self, hf_token: str):
        if not hf_token:
            raise ValueError("hf_token is required for diarization")
        self._hf_token = hf_token
        self._pipeline = None

    def _load_pipeline(self):
        if self._pipeline is None:
            from pyannote.audio import Pipeline
            self._pipeline = Pipeline.from_pretrained(
                "pyannote/speaker-diarization-3.1",
                use_auth_token=self._hf_token,
            )
        return self._pipeline

    async def diarize(self, wav_path: str) -> list[tuple[float, float, str]]:
        loop = asyncio.get_running_loop()
        pipeline = await loop.run_in_executor(None, self._load_pipeline)
        annotation = await loop.run_in_executor(None, lambda: pipeline(wav_path))
        return [
            (turn.start, turn.end, speaker)
            for turn, _, speaker in annotation.itertracks(yield_label=True)
        ]

Step 5: Run tests

.venv/bin/pytest tests/test_diarization.py -v

Expected: all PASS

Step 6: Commit

git add diarization.py tests/test_diarization.py requirements.txt
git commit -m "feat: Diarizer class wrapping pyannote/speaker-diarization-3.1"

Task 4: Alignment — align Whisper segments to pyannote speakers

Files:

  • Create: alignment.py
  • Create: tests/test_alignment.py

Context: For each Whisper segment, find the pyannote speaker with the greatest time overlap. Merge consecutive same-speaker segments into one block. Return list[tuple[str, str]](speaker_label, text).

Step 1: Write the failing tests

Create tests/test_alignment.py:

def test_align_assigns_speaker_by_overlap():
    from alignment import align_segments
    whisper = [
        {"start": 0.0, "end": 2.0, "text": "Hallo"},
        {"start": 2.1, "end": 4.0, "text": "Wie geht es"},
    ]
    speakers = [
        (0.0, 2.5, "SPEAKER_00"),
        (2.5, 5.0, "SPEAKER_01"),
    ]
    result = align_segments(whisper, speakers)
    assert result[0] == ("SPEAKER_00", "Hallo")
    assert result[1] == ("SPEAKER_01", "Wie geht es")


def test_align_merges_consecutive_same_speaker():
    from alignment import align_segments
    whisper = [
        {"start": 0.0, "end": 1.0, "text": "Hallo"},
        {"start": 1.1, "end": 2.0, "text": "Welt"},
    ]
    speakers = [(0.0, 3.0, "SPEAKER_00")]
    result = align_segments(whisper, speakers)
    assert len(result) == 1
    assert result[0] == ("SPEAKER_00", "Hallo Welt")


def test_align_fallback_when_no_speaker_overlap():
    from alignment import align_segments
    whisper = [{"start": 0.0, "end": 1.0, "text": "Hallo"}]
    speakers = []
    result = align_segments(whisper, speakers)
    assert result[0][0] == "SPEAKER_00"

Step 2: Run to verify they fail

.venv/bin/pytest tests/test_alignment.py -v

Step 3: Create alignment.py

def align_segments(
    whisper_segs: list[dict],
    speaker_segs: list[tuple[float, float, str]],
) -> list[tuple[str, str]]:
    """Assign each Whisper segment to the speaker with the greatest time overlap.
    Consecutive segments from the same speaker are merged into one block."""
    result: list[tuple[str, str]] = []
    for seg in whisper_segs:
        speaker = _best_speaker(seg["start"], seg["end"], speaker_segs)
        text = seg["text"].strip()
        if not text:
            continue
        if result and result[-1][0] == speaker:
            result[-1] = (speaker, result[-1][1] + " " + text)
        else:
            result.append((speaker, text))
    return result


def _best_speaker(
    start: float,
    end: float,
    speaker_segs: list[tuple[float, float, str]],
) -> str:
    best_label = "SPEAKER_00"
    best_overlap = 0.0
    for s_start, s_end, label in speaker_segs:
        overlap = max(0.0, min(end, s_end) - max(start, s_start))
        if overlap > best_overlap:
            best_overlap = overlap
            best_label = label
    return best_label

Step 4: Run tests

.venv/bin/pytest tests/test_alignment.py -v

Expected: all PASS

Step 5: Commit

git add alignment.py tests/test_alignment.py
git commit -m "feat: align_segments() — map Whisper timestamps to pyannote speakers"

Task 5: llm.py — identify_speakers() and summarize()

Files:

  • Modify: llm.py
  • Test: tests/test_llm.py

Step 1: Write the failing tests

Add to tests/test_llm.py:

@pytest.mark.asyncio
async def test_identify_speakers_returns_dict():
    import respx, httpx, json
    from llm import OllamaClient
    client = OllamaClient()
    mapping = {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
    transcript_excerpt = "SPEAKER_00: Gut, Herr Möller.\nSPEAKER_01: Danke, Thomas."

    with respx.mock:
        respx.post("http://localhost:11434/api/generate").mock(
            return_value=httpx.Response(200, json={"response": json.dumps(mapping)})
        )
        result = await client.identify_speakers(transcript_excerpt)
    assert result == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}


@pytest.mark.asyncio
async def test_identify_speakers_returns_empty_on_parse_failure():
    import respx, httpx
    from llm import OllamaClient
    client = OllamaClient()

    with respx.mock:
        respx.post("http://localhost:11434/api/generate").mock(
            return_value=httpx.Response(200, json={"response": "kein json hier"})
        )
        result = await client.identify_speakers("irgendwas")
    assert result == {}


@pytest.mark.asyncio
async def test_summarize_returns_string():
    import respx, httpx
    from llm import OllamaClient
    client = OllamaClient()

    with respx.mock:
        respx.post("http://localhost:11434/api/generate").mock(
            return_value=httpx.Response(200, json={"response": "# Zusammenfassung\n\nKurzer Text."})
        )
        result = await client.summarize("Thomas: Hallo.\nMöller: Hi.", model="gemma3:12b")
    assert "Zusammenfassung" in result

Step 2: Run to verify they fail

.venv/bin/pytest tests/test_llm.py::test_identify_speakers_returns_dict tests/test_llm.py::test_identify_speakers_returns_empty_on_parse_failure tests/test_llm.py::test_summarize_returns_string -v

Step 3: Add methods to llm.py

IDENTIFY_SPEAKERS_PROMPT = """Du bekommst den Anfang eines Gesprächstranskripts mit Sprecher-Labels (SPEAKER_00, SPEAKER_01, ...).
Ermittle, welche echten Namen den Sprechern zugeordnet werden können — z.B. durch direkte Anrede ("Herr Möller", "Frank").
Antworte NUR mit einem JSON-Objekt: {"SPEAKER_00": "Name oder null", "SPEAKER_01": "Name oder null"}
Kein weiterer Text, keine Erklärung."""

SUMMARIZE_PROMPT = """Du bist ein präziser Assistent für Business-Kommunikation.
Du bekommst ein Gesprächstranskript mit Sprecher-Labels.
Erstelle eine strukturierte Zusammenfassung auf Deutsch mit:
1. Einem passenden H1-Titel
2. ## Wichtigste Punkte (Aufzählung)
3. ## Offene Fragen (Aufzählung, falls vorhanden)
4. ## Nächste Schritte / Ideen (Aufzählung, falls vorhanden)
Antworte NUR mit dem fertigen Markdown."""

Add to OllamaClient:

async def identify_speakers(
    self,
    transcript_excerpt: str,
    model: str = "gemma3:12b",
) -> dict[str, str]:
    """Try to map SPEAKER_XX labels to real names. Returns {} on failure."""
    import json
    async with httpx.AsyncClient(timeout=60) as client:
        r = await client.post(
            f"{self.base_url}/api/generate",
            json={
                "model": model,
                "prompt": f"Transkript-Anfang:\n{transcript_excerpt[:2000]}",
                "system": IDENTIFY_SPEAKERS_PROMPT,
                "stream": False,
            },
        )
        r.raise_for_status()
        raw = r.json()["response"].strip()
    try:
        data = json.loads(raw)
        if not isinstance(data, dict):
            return {}
        return {k: v for k, v in data.items() if v}
    except (json.JSONDecodeError, Exception):
        return {}

async def summarize(
    self,
    annotated_transcript: str,
    model: str = "gemma3:12b",
) -> str:
    async with httpx.AsyncClient(timeout=180) as client:
        r = await client.post(
            f"{self.base_url}/api/generate",
            json={
                "model": model,
                "prompt": f"Transkript:\n{annotated_transcript}",
                "system": SUMMARIZE_PROMPT,
                "stream": False,
            },
        )
        r.raise_for_status()
        return r.json()["response"].strip()

Step 4: Run all llm tests

.venv/bin/pytest tests/test_llm.py -v

Expected: all PASS

Step 5: Commit

git add llm.py tests/test_llm.py
git commit -m "feat: OllamaClient.identify_speakers() and summarize() for diarization pipeline"

Task 6: output.py — write_meeting_docs()

Files:

  • Modify: output.py
  • Test: tests/test_output.py

Context: Writes three files: {base}-index.md, {base}-transkript.md, {base}-zusammenfassung.md. Returns all three paths.

Step 1: Write the failing test

Add to tests/test_output.py:

def test_write_meeting_docs_creates_three_files(tmp_path):
    from output import write_meeting_docs
    from datetime import datetime
    aligned = [("Thomas", "Gut, dann fangen wir an."), ("Möller", "Ich hab das vorbereitet.")]
    paths = write_meeting_docs(
        aligned_segments=aligned,
        summary="# Meeting\n\n## Wichtigste Punkte\n- Budget besprochen",
        speakers=["Thomas", "Möller"],
        duration_min=5,
        output_dir=str(tmp_path),
        dt=datetime(2026, 4, 2, 14, 30),
    )
    assert len(paths) == 3
    index_content = open(paths["index"]).read()
    assert "Thomas" in index_content
    assert "transkript" in index_content
    transcript_content = open(paths["transkript"]).read()
    assert "**Thomas:**" in transcript_content
    assert "Gut, dann fangen wir an." in transcript_content
    summary_content = open(paths["zusammenfassung"]).read()
    assert "Budget besprochen" in summary_content

Step 2: Run to verify it fails

.venv/bin/pytest tests/test_output.py::test_write_meeting_docs_creates_three_files -v

Step 3: Add to output.py

def write_meeting_docs(
    aligned_segments: list[tuple[str, str]],
    summary: str,
    speakers: list[str],
    duration_min: int,
    output_dir: str,
    dt: "datetime | None" = None,
) -> dict[str, str]:
    """Write index, transkript, and zusammenfassung. Returns {type: path}."""
    from datetime import datetime
    if dt is None:
        dt = datetime.now()
    os.makedirs(output_dir, exist_ok=True)
    base = dt.strftime("%Y-%m-%d-%H%M") + "-meeting"
    date_str = dt.strftime("%d.%m.%Y %H:%M")
    frontmatter_base = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting]\n---\n\n"

    # --- transkript ---
    transcript_lines = []
    for speaker, text in aligned_segments:
        transcript_lines.append(f"**{speaker}:** {text}\n")
    transcript_content = "\n".join(transcript_lines)
    transkript_filename = f"{base}-transkript.md"
    transkript_path = os.path.join(output_dir, transkript_filename)
    with open(transkript_path, "w", encoding="utf-8") as f:
        f.write(frontmatter_base)
        f.write(transcript_content)
        if not transcript_content.endswith("\n"):
            f.write("\n")

    # --- zusammenfassung ---
    zusammenfassung_filename = f"{base}-zusammenfassung.md"
    zusammenfassung_path = os.path.join(output_dir, zusammenfassung_filename)
    with open(zusammenfassung_path, "w", encoding="utf-8") as f:
        f.write(frontmatter_base)
        f.write(summary)
        if not summary.endswith("\n"):
            f.write("\n")

    # --- index ---
    speaker_str = ", ".join(speakers) if speakers else "Unbekannt"
    tl_dr = _extract_tldr(summary)
    index_content = (
        f"# Meeting — {date_str}\n\n"
        f"**Sprecher:** {speaker_str}  \n"
        f"**Dauer:** {duration_min} min\n\n"
        f"> {tl_dr}\n\n"
        f"- [Transkript]({transkript_filename})\n"
        f"- [Zusammenfassung]({zusammenfassung_filename})\n"
    )
    index_filename = f"{base}-index.md"
    index_path = os.path.join(output_dir, index_filename)
    with open(index_path, "w", encoding="utf-8") as f:
        f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting, index]\n---\n\n")
        f.write(index_content)

    return {"index": index_path, "transkript": transkript_path, "zusammenfassung": zusammenfassung_path}


def _extract_tldr(summary: str) -> str:
    """Return the first non-heading, non-empty line from the summary as TL;DR."""
    for line in summary.splitlines():
        stripped = line.strip()
        if stripped and not stripped.startswith("#"):
            return stripped[:200]
    return "Kein TL;DR verfügbar."

Step 4: Run all output tests

.venv/bin/pytest tests/test_output.py -v

Expected: all PASS

Step 5: Commit

git add output.py tests/test_output.py
git commit -m "feat: write_meeting_docs() — creates index, transkript, zusammenfassung"

Task 7: api/state.py — add pending speaker state

Files:

  • Modify: api/state.py
  • Test: tests/test_api.py

Context: The pipeline pauses after alignment, waiting for speaker names. It uses an asyncio.Event stored in state. The /speakers endpoint sets names and signals the event.

Step 1: Write the failing test

Add to tests/test_api.py:

def test_state_has_speaker_fields():
    from api.state import AppState
    s = AppState()
    assert hasattr(s, "_speakers_event")
    assert hasattr(s, "_pending_aligned_segments")
    assert hasattr(s, "_speaker_names")
    assert s._speakers_event is None
    assert s._pending_aligned_segments is None
    assert s._speaker_names is None

Step 2: Run to verify it fails

.venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v

Step 3: Update api/state.py

import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable


class Status(str, Enum):
    IDLE = "idle"
    RECORDING = "recording"
    PROCESSING = "processing"
    AWAITING_SPEAKERS = "awaiting_speakers"
    ERROR = "error"


@dataclass
class AppState:
    status: Status = Status.IDLE
    recording_user: str | None = None
    last_error: str | None = None
    _listeners: list[Callable] = field(default_factory=list, repr=False)
    # Diarization pipeline pause
    _speakers_event: asyncio.Event | None = None
    _pending_aligned_segments: list[tuple[str, str]] | None = None
    _speaker_names: dict[str, str] | None = None

    def subscribe(self, callback: Callable):
        self._listeners.append(callback)

    async def notify(self):
        for cb in self._listeners:
            if asyncio.iscoroutinefunction(cb):
                await cb(self)
            else:
                cb(self)

    async def set_status(self, status: Status):
        self.status = status
        await self.notify()


state = AppState()

Note: AWAITING_SPEAKERS status is added so the UI can show a distinct state.

Step 4: Run tests

.venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v

Step 5: Commit

git add api/state.py tests/test_api.py
git commit -m "feat: AppState gains speaker pause fields and AWAITING_SPEAKERS status"

Task 8: api/router.py — POST /speakers endpoint

Files:

  • Modify: api/router.py
  • Test: tests/test_api.py

Step 1: Write the failing test

Add to tests/test_api.py:

import asyncio as _asyncio

def test_post_speakers_resolves_pipeline_pause():
    from main import app
    from api.router import current_user
    from api.state import state
    import asyncio

    # Simulate pipeline waiting for speakers
    state._speakers_event = asyncio.Event()
    state._speaker_names = None

    app.dependency_overrides[current_user] = lambda: {"username": "u", "output_dir": "/tmp", "is_admin": False}
    try:
        from fastapi.testclient import TestClient
        client = TestClient(app)
        r = client.post("/speakers", json={"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"})
        assert r.status_code == 200
        assert state._speaker_names == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
        assert state._speakers_event.is_set()
    finally:
        app.dependency_overrides.pop(current_user, None)
        state._speakers_event = None
        state._speaker_names = None

Step 2: Run to verify it fails

.venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v

Step 3: Add endpoint to api/router.py

Add after the existing endpoints (before the websocket):

@router.post("/speakers")
async def post_speakers(body: dict, user: dict = Depends(current_user)):
    if state._speakers_event is None:
        raise HTTPException(status_code=409, detail="Keine ausstehende Sprecher-Zuordnung")
    state._speaker_names = {k: v for k, v in body.items() if isinstance(k, str)}
    state._speakers_event.set()
    return {"ok": True}

Step 4: Run tests

.venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v

Step 5: Commit

git add api/router.py tests/test_api.py
git commit -m "feat: POST /speakers — resolves pipeline pause with speaker name mapping"

Task 9: api/pipeline.py — extend with diarization path

Files:

  • Modify: api/pipeline.py

Context: When diarization.enabled is true and hf_token is set, run Whisper (with segments) and pyannote in parallel, align, try Ollama name identification, emit speakers_unknown if needed, then write three documents. If diarization is disabled, run the old single-document path unchanged.

Step 1: Read current api/pipeline.py — already read above.

Step 2: No new test here — the pipeline is tested through integration. The individual components (alignment, diarization, llm) are tested separately.

Step 3: Rewrite api/pipeline.py

import asyncio
import logging
import os
import tempfile
import traceback
from datetime import datetime

from api.state import state, Status
from api.router import broadcast
from config import load as load_config
from transcription import engine as transcription_engine
from llm import OllamaClient
from output import save_transcript, write_meeting_docs

logger = logging.getLogger(__name__)


async def run_pipeline():
    cfg = load_config()
    recorder = getattr(state, "_recorder", None)
    if recorder is None:
        return

    output_dir = getattr(state, "_recording_output_dir", cfg["output"]["path"])
    instructions = getattr(state, "_recording_instructions", "")
    diar_cfg = cfg.get("diarization", {})
    use_diarization = diar_cfg.get("enabled") and diar_cfg.get("hf_token")

    recorder.stop()
    await state.set_status(Status.PROCESSING)
    await broadcast({"event": "processing"})

    wav_path = None
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
            wav_path = f.name
        recorder.save_wav(wav_path)

        if use_diarization:
            await _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg)
        else:
            await _run_solo_pipeline(cfg, wav_path, output_dir, instructions)

    except Exception as e:
        tb = traceback.format_exc()
        logger.error("Pipeline error:\n%s", tb)
        state.last_error = str(e)
        await state.set_status(Status.ERROR)
        await broadcast({"event": "error", "message": str(e)})
    finally:
        state.recording_user = None
        state._recording_output_dir = None
        state._recording_instructions = ""
        state._speakers_event = None
        state._pending_aligned_segments = None
        state._speaker_names = None
        if wav_path:
            try:
                os.unlink(wav_path)
            except OSError:
                pass


async def _run_solo_pipeline(cfg, wav_path, output_dir, instructions):
    """Original single-document pipeline (no diarization)."""
    raw_text = await transcription_engine.transcribe_file(
        wav_path,
        language=cfg["whisper"]["language"],
        model_name=cfg["whisper"]["model"],
        device=cfg["whisper"]["device"],
        base_url=cfg["whisper"].get("base_url", ""),
    )
    await broadcast({"event": "transcribed", "raw": raw_text})

    client = OllamaClient(base_url=cfg["ollama"]["base_url"])
    refined = await client.refine(
        raw_text=raw_text,
        instructions=instructions,
        model=cfg["ollama"]["model"],
    )

    title = "Diktat"
    for line in refined.splitlines():
        if line.startswith("# "):
            title = line[2:].strip()
            break

    path = save_transcript(title=title, content=refined, output_dir=output_dir)
    await broadcast({"event": "saved", "path": path, "title": title})
    await state.set_status(Status.IDLE)


async def _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg):
    """Diarization pipeline: 3 documents, speaker identification."""
    from diarization import Diarizer
    from alignment import align_segments

    # Run Whisper and pyannote in parallel
    diarizer = Diarizer(hf_token=diar_cfg["hf_token"])
    whisper_task = asyncio.create_task(
        transcription_engine.transcribe_file(
            wav_path,
            language=cfg["whisper"]["language"],
            model_name=cfg["whisper"]["model"],
            device=cfg["whisper"]["device"],
            base_url=cfg["whisper"].get("base_url", ""),
            with_segments=True,
        )
    )
    diar_task = asyncio.create_task(diarizer.diarize(wav_path))
    whisper_segs, speaker_segs = await asyncio.gather(whisper_task, diar_task)

    # Align
    aligned = align_segments(whisper_segs, speaker_segs)
    await broadcast({"event": "transcribed", "raw": " ".join(t for _, t in aligned)})

    # Try Ollama name identification
    excerpt = "\n".join(f"{s}: {t}" for s, t in aligned[:20])
    client = OllamaClient(base_url=cfg["ollama"]["base_url"])
    name_map = await client.identify_speakers(excerpt, model=cfg["ollama"]["model"])

    if not name_map:
        # Pause and ask user
        excerpts_per_speaker = _build_excerpts(aligned)
        state._speakers_event = asyncio.Event()
        state._pending_aligned_segments = aligned
        await state.set_status(Status.AWAITING_SPEAKERS)
        await broadcast({"event": "speakers_unknown", "speakers": [
            {"id": spk, "excerpts": exs}
            for spk, exs in excerpts_per_speaker.items()
        ]})
        await state._speakers_event.wait()
        name_map = state._speaker_names or {}

    # Apply names
    def resolve(label):
        return name_map.get(label) or label.replace("SPEAKER_0", "Sprecher ").replace("SPEAKER_", "Sprecher ")

    named_aligned = [(resolve(spk), text) for spk, text in aligned]
    speakers = sorted({spk for spk, _ in named_aligned})

    # Duration
    total_secs = sum(s["end"] - s["start"] for s in whisper_segs) if whisper_segs else 0
    duration_min = max(1, round(total_secs / 60))

    # Full transcript text for summarization
    transcript_text = "\n\n".join(f"**{spk}:** {txt}" for spk, txt in named_aligned)

    # Summarize
    summary = await client.summarize(transcript_text, model=cfg["ollama"]["model"])

    # Write three documents
    dt = datetime.now()
    paths = write_meeting_docs(
        aligned_segments=named_aligned,
        summary=summary,
        speakers=speakers,
        duration_min=duration_min,
        output_dir=output_dir,
        dt=dt,
    )

    await state.set_status(Status.IDLE)
    await broadcast({
        "event": "saved",
        "path": paths["index"],
        "title": f"Meeting {dt.strftime('%d.%m.%Y %H:%M')}",
        "meeting": True,
        "paths": paths,
    })


def _build_excerpts(aligned: list[tuple[str, str]], max_per_speaker: int = 4) -> dict[str, list[str]]:
    """Build a dict of speaker → list of text excerpts (3-4 sentences each)."""
    from collections import defaultdict
    buckets: dict[str, list[str]] = defaultdict(list)
    for spk, text in aligned:
        if len(buckets[spk]) < max_per_speaker:
            buckets[spk].append(text[:200])
    return dict(buckets)

Step 4: Run full test suite to check nothing broke

.venv/bin/pytest -v 2>&1 | tail -20

Expected: all PASS

Step 5: Commit

git add api/pipeline.py
git commit -m "feat: meeting pipeline — diarization, speaker ID, 3-doc output"

Task 10: Frontend — speaker naming card

Files:

  • Modify: frontend/index.html (CSS + HTML)
  • Modify: frontend/app.js (WS handler + card logic)

Step 1: Add CSS to frontend/index.html

Add inside <style> (before </style>):

.speaker-card {
  background: var(--surface); border: 1px solid var(--yellow);
  border-radius: 10px; padding: 20px; display: flex; flex-direction: column; gap: 16px;
}
.speaker-card.hidden { display: none; }
.speaker-card h3 { font-size: .8rem; color: var(--yellow); text-transform: uppercase; letter-spacing: .08em; margin: 0; }
.speaker-row { display: flex; flex-direction: column; gap: 8px; }
.speaker-excerpt {
  font-size: .82rem; color: var(--muted); background: var(--surface2);
  border-radius: 6px; padding: 8px 12px; min-height: 48px;
}
.excerpt-nav { display: flex; align-items: center; gap: 8px; }
.excerpt-nav button {
  background: none; border: 1px solid var(--border); color: var(--muted);
  border-radius: 4px; width: 28px; height: 28px; cursor: pointer;
  font-size: 1rem; display: flex; align-items: center; justify-content: center;
  transition: border-color .15s, color .15s;
}
.excerpt-nav button:hover { border-color: var(--yellow); color: var(--yellow); }
.excerpt-counter { font-size: .75rem; color: var(--muted); min-width: 30px; text-align: center; }
.speaker-name-input {
  background: var(--surface2); border: 1px solid var(--border); color: var(--text);
  border-radius: 6px; padding: 8px 12px; font-family: inherit; font-size: .9rem;
  outline: none; width: 100%; transition: border-color .15s;
}
.speaker-name-input:focus { border-color: var(--yellow); }
.speaker-card-actions { display: flex; gap: 10px; }
.card-btn {
  font-size: .82rem; padding: 8px 16px; border-radius: 8px;
  border: 1px solid var(--border); background: var(--surface2); color: var(--text);
  cursor: pointer; font-family: inherit; transition: border-color .15s;
}
.card-btn:hover { border-color: var(--red); }
.card-btn.primary { background: var(--yellow); border-color: var(--yellow); color: #111; }
.card-btn.primary:hover { background: #e6c200; border-color: #e6c200; }

Step 2: Add HTML to frontend/index.html

Add just before <section class="record-section">:

    <div id="speaker-card" class="speaker-card hidden">
      <h3>Wer hat gesprochen?</h3>
      <div id="speaker-rows"></div>
      <div class="speaker-card-actions">
        <button class="card-btn primary" id="speaker-confirm-btn">Übernehmen</button>
        <button class="card-btn" id="speaker-anon-btn">Anonym lassen</button>
      </div>
    </div>

Step 3: Add JS to frontend/app.js

Add after the existing constants at the top:

const speakerCard = document.getElementById('speaker-card');
const speakerRows = document.getElementById('speaker-rows');
const speakerConfirmBtn = document.getElementById('speaker-confirm-btn');
const speakerAnonBtn = document.getElementById('speaker-anon-btn');
let _speakerData = [];  // [{id, excerpts, inputEl, currentIdx}, ...]

Add to the STATUS_LABELS:

  awaiting_speakers: 'Sprecher zuordnen\u2026',

Replace the ws.onmessage handler — add handling for speakers_unknown:

  ws.onmessage = (e) => {
    const msg = JSON.parse(e.data);
    if (msg.event === 'processing') setStatus('processing');
    if (msg.event === 'saved') {
      setStatus('idle');
      hideSpeakerCard();
      loadTranscripts();
    }
    if (msg.event === 'error') setStatus('error');
    if (msg.event === 'speakers_unknown') showSpeakerCard(msg.speakers);
  };

Add functions for the speaker card:

function showSpeakerCard(speakers) {
  _speakerData = [];
  speakerRows.replaceChildren();
  speakers.forEach(function(s) {
    const row = document.createElement('div');
    row.className = 'speaker-row';

    const excerptEl = document.createElement('div');
    excerptEl.className = 'speaker-excerpt';
    excerptEl.textContent = s.excerpts[0] || '';

    const counter = document.createElement('span');
    counter.className = 'excerpt-counter';
    counter.textContent = s.excerpts.length > 1 ? '1/' + s.excerpts.length : '';

    let idx = 0;
    const prev = document.createElement('button');
    prev.textContent = '\u2039';
    const next = document.createElement('button');
    next.textContent = '\u203a';

    function updateExcerpt() {
      excerptEl.textContent = s.excerpts[idx] || '';
      counter.textContent = s.excerpts.length > 1 ? (idx + 1) + '/' + s.excerpts.length : '';
    }
    prev.addEventListener('click', function() {
      if (idx > 0) { idx--; updateExcerpt(); }
    });
    next.addEventListener('click', function() {
      if (idx < s.excerpts.length - 1) { idx++; updateExcerpt(); }
    });

    const nav = document.createElement('div');
    nav.className = 'excerpt-nav';
    if (s.excerpts.length > 1) { nav.append(prev, counter, next); }

    const input = document.createElement('input');
    input.type = 'text';
    input.className = 'speaker-name-input';
    input.placeholder = s.id.replace('SPEAKER_0', 'Sprecher ').replace('SPEAKER_', 'Sprecher ');

    row.append(excerptEl, nav, input);
    speakerRows.appendChild(row);
    _speakerData.push({ id: s.id, input: input });
  });
  speakerCard.classList.remove('hidden');
  setStatus('awaiting_speakers');
}

function hideSpeakerCard() {
  speakerCard.classList.add('hidden');
  _speakerData = [];
}

async function submitSpeakers(useNames) {
  const body = {};
  _speakerData.forEach(function(s) {
    body[s.id] = useNames ? s.input.value.trim() : '';
  });
  await apiFetch('/speakers', { method: 'POST', body: JSON.stringify(body) });
}

speakerConfirmBtn.addEventListener('click', function() { submitSpeakers(true); });
speakerAnonBtn.addEventListener('click', function() { submitSpeakers(false); });

Step 4: Run full suite (no automated test for UI, visual check at step 5)

.venv/bin/pytest -v 2>&1 | tail -15

Expected: all PASS (no test for UI JS)

Step 5: Commit

git add frontend/index.html frontend/app.js
git commit -m "feat: speaker naming card with excerpt navigator in main UI"

Task 11: Settings page — diarization section

Files:

  • Modify: frontend/settings.html
  • Modify: frontend/settings.js

Step 1: Add HTML section to frontend/settings.html

After the </section> of the Processing section, add:

    <section>
      <h2>Diarisierung</h2>
      <div class="field">
        <label style="display:flex;align-items:center;gap:10px;cursor:pointer;">
          <input type="checkbox" id="diar-enabled" style="width:auto;">
          Sprecher-Erkennung aktivieren
        </label>
      </div>
      <div class="field">
        <label>HuggingFace Token</label>
        <input type="text" id="diar-token" placeholder="hf_...">
      </div>
      <p style="font-size:.78rem;color:var(--muted);margin-top:4px;">
        Einmalig: <a href="https://huggingface.co/pyannote/speaker-diarization-3.1"
          target="_blank" style="color:var(--muted);">pyannote-Modell freischalten</a>
        und Token mit <strong>Read</strong>-Berechtigung erstellen.
      </p>
    </section>

Step 2: Add to frontend/settings.js

In loadConfig(), add after the ollama lines:

  document.getElementById('diar-enabled').checked = !!(cfg.diarization && cfg.diarization.enabled);
  document.getElementById('diar-token').value = (cfg.diarization && cfg.diarization.hf_token) || '';

In the save button handler, add to body:

    diarization: {
      enabled: document.getElementById('diar-enabled').checked,
      hf_token: document.getElementById('diar-token').value,
    },

Step 3: Run full suite

.venv/bin/pytest -v 2>&1 | tail -15

Expected: all PASS

Step 4: Commit

git add frontend/settings.html frontend/settings.js
git commit -m "feat: diarization section in settings — hf_token and enabled toggle"

Task 12: Update SETUP.md with HuggingFace instructions

Files:

  • Modify: docs/SETUP.md

Step 1: Add section to docs/SETUP.md

Add after the "Firewall" section:

### 5. HuggingFace — pyannote-Modell freischalten (für Diarisierung)

1. Account erstellen auf [huggingface.co](https://huggingface.co)
2. Modell-Seite öffnen: https://huggingface.co/pyannote/speaker-diarization-3.1
   → **"Access repository"** klicken und Nutzungsbedingungen bestätigen
3. Token erstellen: https://huggingface.co/settings/tokens
   → **New token** → Typ: **Read** → Token kopieren
4. Im Transkriptor: Einstellungen → Diarisierung → Token einfügen + aktivieren

Step 2: Commit

git add docs/SETUP.md
git commit -m "docs: HuggingFace setup instructions for pyannote diarization"

Task 13: Full test suite + push

Step 1: Run full test suite

cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest -v

Expected: all tests PASS

Step 2: Manual smoke test checklist

Restart app (pkill -f main.py && .venv/bin/python main.py &), then:

  • Einstellungen → Diarisierung: Token eintragen, aktivieren, speichern
  • Aufnahme starten, kurzes Gespräch führen
  • Status wechselt zu "Sprecher zuordnen…", Karte erscheint
  • Excerpts durchblättern ( )
  • Namen eingeben → Übernehmen
  • Drei Einträge in der Transkript-Liste (index, transkript, zusammenfassung)
  • Index zeigt TL;DR + Links zu den anderen beiden
  • Transcript zeigt **Thomas:** … Absätze
  • Zusammenfassung hat ## Abschnitte
  • Diarisierung deaktiviert → normales Diktat-Verhalten

Step 3: Push

git push