44 KiB
Speaker Diarization & Name Identification Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Add speaker diarization (pyannote.audio) and automatic name identification (Ollama) to the pipeline, producing three documents per meeting: an index, a raw transcript with speaker labels, and a polished summary.
Architecture: After recording, Whisper and pyannote run on the same WAV file; their timestamp-based segments are aligned to produce speaker-annotated text. Ollama tries to identify speaker names from context; if it can't, the frontend shows a speaker-naming card with excerpt navigation. The pipeline produces three linked markdown files per meeting.
Tech Stack: pyannote.audio 3.x, faster-whisper (already present), httpx, asyncio.Event for pipeline pause, vanilla JS for speaker card
Task 1: Add diarization config defaults
Files:
- Modify:
config.py - Test:
tests/test_config.py
Step 1: Write the failing test
Add to tests/test_config.py:
def test_config_has_diarization_defaults():
from unittest.mock import patch
import tempfile, os
with tempfile.TemporaryDirectory() as tmpdir:
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
import config
cfg = config.load()
assert "diarization" in cfg
assert cfg["diarization"]["enabled"] is False
assert cfg["diarization"]["hf_token"] == ""
Step 2: Run to verify it fails
cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest tests/test_config.py::test_config_has_diarization_defaults -v
Expected: FAIL — KeyError
Step 3: Add to config.py DEFAULTS
"diarization": {
"enabled": False,
"hf_token": "",
},
Also add to the _write_defaults fallback string:
f.write('[diarization]\nenabled = false\nhf_token = ""\n\n')
Step 4: Run all config tests
.venv/bin/pytest tests/test_config.py -v
Expected: all PASS
Step 5: Commit
git add config.py tests/test_config.py
git commit -m "feat: add diarization config defaults (enabled=false, hf_token)"
Task 2: Extend transcription.py to return segments with timestamps
Files:
- Modify:
transcription.py - Test:
tests/test_transcription.py
Context: The pipeline needs timestamps to align Whisper segments with pyannote speaker segments. Add with_segments: bool = False — when True, return list[dict] with {start, end, text} instead of a plain string. Backward compatible: default False keeps existing callers working.
Step 1: Write the failing tests
Add to tests/test_transcription.py:
def test_transcribe_file_returns_segments_when_requested(tmp_path):
wav = tmp_path / "test.wav"
wav.write_bytes(b"\x00" * 100)
mock_model = MagicMock()
mock_seg = MagicMock()
mock_seg.text = " Hallo Welt"
mock_seg.start = 0.0
mock_seg.end = 1.5
mock_model.transcribe.return_value = ([mock_seg], MagicMock())
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
eng._model = mock_model
result = asyncio.run(eng.transcribe_file(str(wav), language="de", with_segments=True))
assert isinstance(result, list)
assert result[0]["text"] == "Hallo Welt"
assert result[0]["start"] == 0.0
assert result[0]["end"] == 1.5
@pytest.mark.asyncio
async def test_transcribe_remote_returns_segments_when_requested(tmp_path):
import wave, struct
wav = tmp_path / "test.wav"
with wave.open(str(wav), "wb") as wf:
wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(16000)
wf.writeframes(struct.pack("<100h", *([0] * 100)))
import respx, httpx
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
with respx.mock:
respx.post("http://beastix:8000/v1/audio/transcriptions").mock(
return_value=httpx.Response(200, json={
"text": "Hallo Welt",
"segments": [{"start": 0.0, "end": 1.5, "text": " Hallo Welt"}],
})
)
result = await eng.transcribe_file(
str(wav), language="de", model_name="large-v3",
device="auto", base_url="http://beastix:8000", with_segments=True,
)
assert isinstance(result, list)
assert result[0]["text"] == "Hallo Welt"
Step 2: Run to verify they fail
.venv/bin/pytest tests/test_transcription.py::test_transcribe_file_returns_segments_when_requested tests/test_transcription.py::test_transcribe_remote_returns_segments_when_requested -v
Step 3: Update transcription.py
Replace the entire file:
import asyncio
import httpx
from typing import Union
class TranscriptionEngine:
_model = None
def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
if self._model is None:
from faster_whisper import WhisperModel
if device == "auto":
try:
self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
except Exception:
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
else:
compute = "float16" if device in ("cuda", "rocm") else "int8"
self._model = WhisperModel(model_name, device=device, compute_type=compute)
return self._model
async def transcribe_file(
self,
audio_path: str,
language: str = "de",
model_name: str = "large-v3",
device: str = "auto",
base_url: str = "",
with_segments: bool = False,
) -> Union[str, list[dict]]:
if base_url:
return await self._transcribe_remote(
audio_path, language, model_name, base_url, with_segments
)
return await self._transcribe_local(
audio_path, language, model_name, device, with_segments
)
async def _transcribe_remote(
self,
audio_path: str,
language: str,
model_name: str,
base_url: str,
with_segments: bool,
) -> Union[str, list[dict]]:
async with httpx.AsyncClient(timeout=300) as client:
with open(audio_path, "rb") as f:
data = {"model": model_name, "language": language}
if with_segments:
data["timestamp_granularities[]"] = "segment"
data["response_format"] = "verbose_json"
r = await client.post(
f"{base_url}/v1/audio/transcriptions",
files={"file": ("audio.wav", f, "audio/wav")},
data=data,
)
r.raise_for_status()
body = r.json()
if not with_segments:
return body["text"]
raw_segs = body.get("segments") or []
if raw_segs:
return [
{"start": s["start"], "end": s["end"], "text": s["text"].strip()}
for s in raw_segs
]
# fallback: single segment covering whole file
return [{"start": 0.0, "end": 9999.0, "text": body["text"].strip()}]
async def _transcribe_local(
self,
audio_path: str,
language: str,
model_name: str,
device: str,
with_segments: bool,
) -> Union[str, list[dict]]:
loop = asyncio.get_running_loop()
model = self._get_model(model_name, device)
segments, _ = await loop.run_in_executor(
None,
lambda: model.transcribe(audio_path, language=language),
)
segments = list(segments)
if not with_segments:
return "".join(seg.text for seg in segments).strip()
return [
{"start": seg.start, "end": seg.end, "text": seg.text.strip()}
for seg in segments
if seg.text.strip()
]
engine = TranscriptionEngine()
Step 4: Run all transcription tests
.venv/bin/pytest tests/test_transcription.py -v
Expected: all PASS
Step 5: Commit
git add transcription.py tests/test_transcription.py
git commit -m "feat: transcribe_file returns timestamped segments when with_segments=True"
Task 3: diarization.py — Diarizer class
Files:
- Create:
diarization.py - Create:
tests/test_diarization.py
Context: Wraps pyannote.audio. Returns list[tuple[float, float, str]] — each entry is (start_sec, end_sec, speaker_label). Loaded lazily. Runs in executor to avoid blocking.
Step 1: Install pyannote.audio
cd /home/templis/work/tueit_Transkriptor && .venv/bin/pip install pyannote.audio
Add to requirements.txt:
pyannote.audio>=3.3
Step 2: Write the failing test
Create tests/test_diarization.py:
from unittest.mock import MagicMock, patch
import pytest
def test_diarizer_returns_list_of_tuples(tmp_path):
"""Diarizer.diarize() returns [(start, end, speaker), ...]"""
wav = tmp_path / "test.wav"
wav.write_bytes(b"\x00" * 100)
mock_turn_1 = MagicMock()
mock_turn_1.start = 0.0
mock_turn_1.end = 2.5
mock_track_1 = "A"
mock_label_1 = "SPEAKER_00"
mock_turn_2 = MagicMock()
mock_turn_2.start = 2.6
mock_turn_2.end = 5.0
mock_track_2 = "B"
mock_label_2 = "SPEAKER_01"
mock_annotation = MagicMock()
mock_annotation.itertracks.return_value = [
(mock_turn_1, mock_track_1, mock_label_1),
(mock_turn_2, mock_track_2, mock_label_2),
]
mock_pipeline = MagicMock(return_value=mock_annotation)
import asyncio
from diarization import Diarizer
d = Diarizer.__new__(Diarizer)
d._pipeline = mock_pipeline
result = asyncio.run(d.diarize(str(wav)))
assert result == [(0.0, 2.5, "SPEAKER_00"), (2.6, 5.0, "SPEAKER_01")]
def test_diarizer_requires_hf_token():
from diarization import Diarizer
with pytest.raises(ValueError, match="hf_token"):
Diarizer(hf_token="")
Step 3: Run to verify it fails
.venv/bin/pytest tests/test_diarization.py -v
Expected: FAIL — diarization module not found
Step 4: Create diarization.py
import asyncio
class Diarizer:
def __init__(self, hf_token: str):
if not hf_token:
raise ValueError("hf_token is required for diarization")
self._hf_token = hf_token
self._pipeline = None
def _load_pipeline(self):
if self._pipeline is None:
from pyannote.audio import Pipeline
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=self._hf_token,
)
return self._pipeline
async def diarize(self, wav_path: str) -> list[tuple[float, float, str]]:
loop = asyncio.get_running_loop()
pipeline = await loop.run_in_executor(None, self._load_pipeline)
annotation = await loop.run_in_executor(None, lambda: pipeline(wav_path))
return [
(turn.start, turn.end, speaker)
for turn, _, speaker in annotation.itertracks(yield_label=True)
]
Step 5: Run tests
.venv/bin/pytest tests/test_diarization.py -v
Expected: all PASS
Step 6: Commit
git add diarization.py tests/test_diarization.py requirements.txt
git commit -m "feat: Diarizer class wrapping pyannote/speaker-diarization-3.1"
Task 4: Alignment — align Whisper segments to pyannote speakers
Files:
- Create:
alignment.py - Create:
tests/test_alignment.py
Context: For each Whisper segment, find the pyannote speaker with the greatest time overlap. Merge consecutive same-speaker segments into one block. Return list[tuple[str, str]] — (speaker_label, text).
Step 1: Write the failing tests
Create tests/test_alignment.py:
def test_align_assigns_speaker_by_overlap():
from alignment import align_segments
whisper = [
{"start": 0.0, "end": 2.0, "text": "Hallo"},
{"start": 2.1, "end": 4.0, "text": "Wie geht es"},
]
speakers = [
(0.0, 2.5, "SPEAKER_00"),
(2.5, 5.0, "SPEAKER_01"),
]
result = align_segments(whisper, speakers)
assert result[0] == ("SPEAKER_00", "Hallo")
assert result[1] == ("SPEAKER_01", "Wie geht es")
def test_align_merges_consecutive_same_speaker():
from alignment import align_segments
whisper = [
{"start": 0.0, "end": 1.0, "text": "Hallo"},
{"start": 1.1, "end": 2.0, "text": "Welt"},
]
speakers = [(0.0, 3.0, "SPEAKER_00")]
result = align_segments(whisper, speakers)
assert len(result) == 1
assert result[0] == ("SPEAKER_00", "Hallo Welt")
def test_align_fallback_when_no_speaker_overlap():
from alignment import align_segments
whisper = [{"start": 0.0, "end": 1.0, "text": "Hallo"}]
speakers = []
result = align_segments(whisper, speakers)
assert result[0][0] == "SPEAKER_00"
Step 2: Run to verify they fail
.venv/bin/pytest tests/test_alignment.py -v
Step 3: Create alignment.py
def align_segments(
whisper_segs: list[dict],
speaker_segs: list[tuple[float, float, str]],
) -> list[tuple[str, str]]:
"""Assign each Whisper segment to the speaker with the greatest time overlap.
Consecutive segments from the same speaker are merged into one block."""
result: list[tuple[str, str]] = []
for seg in whisper_segs:
speaker = _best_speaker(seg["start"], seg["end"], speaker_segs)
text = seg["text"].strip()
if not text:
continue
if result and result[-1][0] == speaker:
result[-1] = (speaker, result[-1][1] + " " + text)
else:
result.append((speaker, text))
return result
def _best_speaker(
start: float,
end: float,
speaker_segs: list[tuple[float, float, str]],
) -> str:
best_label = "SPEAKER_00"
best_overlap = 0.0
for s_start, s_end, label in speaker_segs:
overlap = max(0.0, min(end, s_end) - max(start, s_start))
if overlap > best_overlap:
best_overlap = overlap
best_label = label
return best_label
Step 4: Run tests
.venv/bin/pytest tests/test_alignment.py -v
Expected: all PASS
Step 5: Commit
git add alignment.py tests/test_alignment.py
git commit -m "feat: align_segments() — map Whisper timestamps to pyannote speakers"
Task 5: llm.py — identify_speakers() and summarize()
Files:
- Modify:
llm.py - Test:
tests/test_llm.py
Step 1: Write the failing tests
Add to tests/test_llm.py:
@pytest.mark.asyncio
async def test_identify_speakers_returns_dict():
import respx, httpx, json
from llm import OllamaClient
client = OllamaClient()
mapping = {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
transcript_excerpt = "SPEAKER_00: Gut, Herr Möller.\nSPEAKER_01: Danke, Thomas."
with respx.mock:
respx.post("http://localhost:11434/api/generate").mock(
return_value=httpx.Response(200, json={"response": json.dumps(mapping)})
)
result = await client.identify_speakers(transcript_excerpt)
assert result == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
@pytest.mark.asyncio
async def test_identify_speakers_returns_empty_on_parse_failure():
import respx, httpx
from llm import OllamaClient
client = OllamaClient()
with respx.mock:
respx.post("http://localhost:11434/api/generate").mock(
return_value=httpx.Response(200, json={"response": "kein json hier"})
)
result = await client.identify_speakers("irgendwas")
assert result == {}
@pytest.mark.asyncio
async def test_summarize_returns_string():
import respx, httpx
from llm import OllamaClient
client = OllamaClient()
with respx.mock:
respx.post("http://localhost:11434/api/generate").mock(
return_value=httpx.Response(200, json={"response": "# Zusammenfassung\n\nKurzer Text."})
)
result = await client.summarize("Thomas: Hallo.\nMöller: Hi.", model="gemma3:12b")
assert "Zusammenfassung" in result
Step 2: Run to verify they fail
.venv/bin/pytest tests/test_llm.py::test_identify_speakers_returns_dict tests/test_llm.py::test_identify_speakers_returns_empty_on_parse_failure tests/test_llm.py::test_summarize_returns_string -v
Step 3: Add methods to llm.py
IDENTIFY_SPEAKERS_PROMPT = """Du bekommst den Anfang eines Gesprächstranskripts mit Sprecher-Labels (SPEAKER_00, SPEAKER_01, ...).
Ermittle, welche echten Namen den Sprechern zugeordnet werden können — z.B. durch direkte Anrede ("Herr Möller", "Frank").
Antworte NUR mit einem JSON-Objekt: {"SPEAKER_00": "Name oder null", "SPEAKER_01": "Name oder null"}
Kein weiterer Text, keine Erklärung."""
SUMMARIZE_PROMPT = """Du bist ein präziser Assistent für Business-Kommunikation.
Du bekommst ein Gesprächstranskript mit Sprecher-Labels.
Erstelle eine strukturierte Zusammenfassung auf Deutsch mit:
1. Einem passenden H1-Titel
2. ## Wichtigste Punkte (Aufzählung)
3. ## Offene Fragen (Aufzählung, falls vorhanden)
4. ## Nächste Schritte / Ideen (Aufzählung, falls vorhanden)
Antworte NUR mit dem fertigen Markdown."""
Add to OllamaClient:
async def identify_speakers(
self,
transcript_excerpt: str,
model: str = "gemma3:12b",
) -> dict[str, str]:
"""Try to map SPEAKER_XX labels to real names. Returns {} on failure."""
import json
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": f"Transkript-Anfang:\n{transcript_excerpt[:2000]}",
"system": IDENTIFY_SPEAKERS_PROMPT,
"stream": False,
},
)
r.raise_for_status()
raw = r.json()["response"].strip()
try:
data = json.loads(raw)
if not isinstance(data, dict):
return {}
return {k: v for k, v in data.items() if v}
except (json.JSONDecodeError, Exception):
return {}
async def summarize(
self,
annotated_transcript: str,
model: str = "gemma3:12b",
) -> str:
async with httpx.AsyncClient(timeout=180) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": f"Transkript:\n{annotated_transcript}",
"system": SUMMARIZE_PROMPT,
"stream": False,
},
)
r.raise_for_status()
return r.json()["response"].strip()
Step 4: Run all llm tests
.venv/bin/pytest tests/test_llm.py -v
Expected: all PASS
Step 5: Commit
git add llm.py tests/test_llm.py
git commit -m "feat: OllamaClient.identify_speakers() and summarize() for diarization pipeline"
Task 6: output.py — write_meeting_docs()
Files:
- Modify:
output.py - Test:
tests/test_output.py
Context: Writes three files: {base}-index.md, {base}-transkript.md, {base}-zusammenfassung.md. Returns all three paths.
Step 1: Write the failing test
Add to tests/test_output.py:
def test_write_meeting_docs_creates_three_files(tmp_path):
from output import write_meeting_docs
from datetime import datetime
aligned = [("Thomas", "Gut, dann fangen wir an."), ("Möller", "Ich hab das vorbereitet.")]
paths = write_meeting_docs(
aligned_segments=aligned,
summary="# Meeting\n\n## Wichtigste Punkte\n- Budget besprochen",
speakers=["Thomas", "Möller"],
duration_min=5,
output_dir=str(tmp_path),
dt=datetime(2026, 4, 2, 14, 30),
)
assert len(paths) == 3
index_content = open(paths["index"]).read()
assert "Thomas" in index_content
assert "transkript" in index_content
transcript_content = open(paths["transkript"]).read()
assert "**Thomas:**" in transcript_content
assert "Gut, dann fangen wir an." in transcript_content
summary_content = open(paths["zusammenfassung"]).read()
assert "Budget besprochen" in summary_content
Step 2: Run to verify it fails
.venv/bin/pytest tests/test_output.py::test_write_meeting_docs_creates_three_files -v
Step 3: Add to output.py
def write_meeting_docs(
aligned_segments: list[tuple[str, str]],
summary: str,
speakers: list[str],
duration_min: int,
output_dir: str,
dt: "datetime | None" = None,
) -> dict[str, str]:
"""Write index, transkript, and zusammenfassung. Returns {type: path}."""
from datetime import datetime
if dt is None:
dt = datetime.now()
os.makedirs(output_dir, exist_ok=True)
base = dt.strftime("%Y-%m-%d-%H%M") + "-meeting"
date_str = dt.strftime("%d.%m.%Y %H:%M")
frontmatter_base = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting]\n---\n\n"
# --- transkript ---
transcript_lines = []
for speaker, text in aligned_segments:
transcript_lines.append(f"**{speaker}:** {text}\n")
transcript_content = "\n".join(transcript_lines)
transkript_filename = f"{base}-transkript.md"
transkript_path = os.path.join(output_dir, transkript_filename)
with open(transkript_path, "w", encoding="utf-8") as f:
f.write(frontmatter_base)
f.write(transcript_content)
if not transcript_content.endswith("\n"):
f.write("\n")
# --- zusammenfassung ---
zusammenfassung_filename = f"{base}-zusammenfassung.md"
zusammenfassung_path = os.path.join(output_dir, zusammenfassung_filename)
with open(zusammenfassung_path, "w", encoding="utf-8") as f:
f.write(frontmatter_base)
f.write(summary)
if not summary.endswith("\n"):
f.write("\n")
# --- index ---
speaker_str = ", ".join(speakers) if speakers else "Unbekannt"
tl_dr = _extract_tldr(summary)
index_content = (
f"# Meeting — {date_str}\n\n"
f"**Sprecher:** {speaker_str} \n"
f"**Dauer:** {duration_min} min\n\n"
f"> {tl_dr}\n\n"
f"- [Transkript]({transkript_filename})\n"
f"- [Zusammenfassung]({zusammenfassung_filename})\n"
)
index_filename = f"{base}-index.md"
index_path = os.path.join(output_dir, index_filename)
with open(index_path, "w", encoding="utf-8") as f:
f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting, index]\n---\n\n")
f.write(index_content)
return {"index": index_path, "transkript": transkript_path, "zusammenfassung": zusammenfassung_path}
def _extract_tldr(summary: str) -> str:
"""Return the first non-heading, non-empty line from the summary as TL;DR."""
for line in summary.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
return stripped[:200]
return "Kein TL;DR verfügbar."
Step 4: Run all output tests
.venv/bin/pytest tests/test_output.py -v
Expected: all PASS
Step 5: Commit
git add output.py tests/test_output.py
git commit -m "feat: write_meeting_docs() — creates index, transkript, zusammenfassung"
Task 7: api/state.py — add pending speaker state
Files:
- Modify:
api/state.py - Test:
tests/test_api.py
Context: The pipeline pauses after alignment, waiting for speaker names. It uses an asyncio.Event stored in state. The /speakers endpoint sets names and signals the event.
Step 1: Write the failing test
Add to tests/test_api.py:
def test_state_has_speaker_fields():
from api.state import AppState
s = AppState()
assert hasattr(s, "_speakers_event")
assert hasattr(s, "_pending_aligned_segments")
assert hasattr(s, "_speaker_names")
assert s._speakers_event is None
assert s._pending_aligned_segments is None
assert s._speaker_names is None
Step 2: Run to verify it fails
.venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v
Step 3: Update api/state.py
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
class Status(str, Enum):
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
AWAITING_SPEAKERS = "awaiting_speakers"
ERROR = "error"
@dataclass
class AppState:
status: Status = Status.IDLE
recording_user: str | None = None
last_error: str | None = None
_listeners: list[Callable] = field(default_factory=list, repr=False)
# Diarization pipeline pause
_speakers_event: asyncio.Event | None = None
_pending_aligned_segments: list[tuple[str, str]] | None = None
_speaker_names: dict[str, str] | None = None
def subscribe(self, callback: Callable):
self._listeners.append(callback)
async def notify(self):
for cb in self._listeners:
if asyncio.iscoroutinefunction(cb):
await cb(self)
else:
cb(self)
async def set_status(self, status: Status):
self.status = status
await self.notify()
state = AppState()
Note: AWAITING_SPEAKERS status is added so the UI can show a distinct state.
Step 4: Run tests
.venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v
Step 5: Commit
git add api/state.py tests/test_api.py
git commit -m "feat: AppState gains speaker pause fields and AWAITING_SPEAKERS status"
Task 8: api/router.py — POST /speakers endpoint
Files:
- Modify:
api/router.py - Test:
tests/test_api.py
Step 1: Write the failing test
Add to tests/test_api.py:
import asyncio as _asyncio
def test_post_speakers_resolves_pipeline_pause():
from main import app
from api.router import current_user
from api.state import state
import asyncio
# Simulate pipeline waiting for speakers
state._speakers_event = asyncio.Event()
state._speaker_names = None
app.dependency_overrides[current_user] = lambda: {"username": "u", "output_dir": "/tmp", "is_admin": False}
try:
from fastapi.testclient import TestClient
client = TestClient(app)
r = client.post("/speakers", json={"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"})
assert r.status_code == 200
assert state._speaker_names == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
assert state._speakers_event.is_set()
finally:
app.dependency_overrides.pop(current_user, None)
state._speakers_event = None
state._speaker_names = None
Step 2: Run to verify it fails
.venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v
Step 3: Add endpoint to api/router.py
Add after the existing endpoints (before the websocket):
@router.post("/speakers")
async def post_speakers(body: dict, user: dict = Depends(current_user)):
if state._speakers_event is None:
raise HTTPException(status_code=409, detail="Keine ausstehende Sprecher-Zuordnung")
state._speaker_names = {k: v for k, v in body.items() if isinstance(k, str)}
state._speakers_event.set()
return {"ok": True}
Step 4: Run tests
.venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v
Step 5: Commit
git add api/router.py tests/test_api.py
git commit -m "feat: POST /speakers — resolves pipeline pause with speaker name mapping"
Task 9: api/pipeline.py — extend with diarization path
Files:
- Modify:
api/pipeline.py
Context: When diarization.enabled is true and hf_token is set, run Whisper (with segments) and pyannote in parallel, align, try Ollama name identification, emit speakers_unknown if needed, then write three documents. If diarization is disabled, run the old single-document path unchanged.
Step 1: Read current api/pipeline.py — already read above.
Step 2: No new test here — the pipeline is tested through integration. The individual components (alignment, diarization, llm) are tested separately.
Step 3: Rewrite api/pipeline.py
import asyncio
import logging
import os
import tempfile
import traceback
from datetime import datetime
from api.state import state, Status
from api.router import broadcast
from config import load as load_config
from transcription import engine as transcription_engine
from llm import OllamaClient
from output import save_transcript, write_meeting_docs
logger = logging.getLogger(__name__)
async def run_pipeline():
cfg = load_config()
recorder = getattr(state, "_recorder", None)
if recorder is None:
return
output_dir = getattr(state, "_recording_output_dir", cfg["output"]["path"])
instructions = getattr(state, "_recording_instructions", "")
diar_cfg = cfg.get("diarization", {})
use_diarization = diar_cfg.get("enabled") and diar_cfg.get("hf_token")
recorder.stop()
await state.set_status(Status.PROCESSING)
await broadcast({"event": "processing"})
wav_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
wav_path = f.name
recorder.save_wav(wav_path)
if use_diarization:
await _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg)
else:
await _run_solo_pipeline(cfg, wav_path, output_dir, instructions)
except Exception as e:
tb = traceback.format_exc()
logger.error("Pipeline error:\n%s", tb)
state.last_error = str(e)
await state.set_status(Status.ERROR)
await broadcast({"event": "error", "message": str(e)})
finally:
state.recording_user = None
state._recording_output_dir = None
state._recording_instructions = ""
state._speakers_event = None
state._pending_aligned_segments = None
state._speaker_names = None
if wav_path:
try:
os.unlink(wav_path)
except OSError:
pass
async def _run_solo_pipeline(cfg, wav_path, output_dir, instructions):
"""Original single-document pipeline (no diarization)."""
raw_text = await transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
base_url=cfg["whisper"].get("base_url", ""),
)
await broadcast({"event": "transcribed", "raw": raw_text})
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
refined = await client.refine(
raw_text=raw_text,
instructions=instructions,
model=cfg["ollama"]["model"],
)
title = "Diktat"
for line in refined.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
path = save_transcript(title=title, content=refined, output_dir=output_dir)
await broadcast({"event": "saved", "path": path, "title": title})
await state.set_status(Status.IDLE)
async def _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg):
"""Diarization pipeline: 3 documents, speaker identification."""
from diarization import Diarizer
from alignment import align_segments
# Run Whisper and pyannote in parallel
diarizer = Diarizer(hf_token=diar_cfg["hf_token"])
whisper_task = asyncio.create_task(
transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
base_url=cfg["whisper"].get("base_url", ""),
with_segments=True,
)
)
diar_task = asyncio.create_task(diarizer.diarize(wav_path))
whisper_segs, speaker_segs = await asyncio.gather(whisper_task, diar_task)
# Align
aligned = align_segments(whisper_segs, speaker_segs)
await broadcast({"event": "transcribed", "raw": " ".join(t for _, t in aligned)})
# Try Ollama name identification
excerpt = "\n".join(f"{s}: {t}" for s, t in aligned[:20])
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
name_map = await client.identify_speakers(excerpt, model=cfg["ollama"]["model"])
if not name_map:
# Pause and ask user
excerpts_per_speaker = _build_excerpts(aligned)
state._speakers_event = asyncio.Event()
state._pending_aligned_segments = aligned
await state.set_status(Status.AWAITING_SPEAKERS)
await broadcast({"event": "speakers_unknown", "speakers": [
{"id": spk, "excerpts": exs}
for spk, exs in excerpts_per_speaker.items()
]})
await state._speakers_event.wait()
name_map = state._speaker_names or {}
# Apply names
def resolve(label):
return name_map.get(label) or label.replace("SPEAKER_0", "Sprecher ").replace("SPEAKER_", "Sprecher ")
named_aligned = [(resolve(spk), text) for spk, text in aligned]
speakers = sorted({spk for spk, _ in named_aligned})
# Duration
total_secs = sum(s["end"] - s["start"] for s in whisper_segs) if whisper_segs else 0
duration_min = max(1, round(total_secs / 60))
# Full transcript text for summarization
transcript_text = "\n\n".join(f"**{spk}:** {txt}" for spk, txt in named_aligned)
# Summarize
summary = await client.summarize(transcript_text, model=cfg["ollama"]["model"])
# Write three documents
dt = datetime.now()
paths = write_meeting_docs(
aligned_segments=named_aligned,
summary=summary,
speakers=speakers,
duration_min=duration_min,
output_dir=output_dir,
dt=dt,
)
await state.set_status(Status.IDLE)
await broadcast({
"event": "saved",
"path": paths["index"],
"title": f"Meeting {dt.strftime('%d.%m.%Y %H:%M')}",
"meeting": True,
"paths": paths,
})
def _build_excerpts(aligned: list[tuple[str, str]], max_per_speaker: int = 4) -> dict[str, list[str]]:
"""Build a dict of speaker → list of text excerpts (3-4 sentences each)."""
from collections import defaultdict
buckets: dict[str, list[str]] = defaultdict(list)
for spk, text in aligned:
if len(buckets[spk]) < max_per_speaker:
buckets[spk].append(text[:200])
return dict(buckets)
Step 4: Run full test suite to check nothing broke
.venv/bin/pytest -v 2>&1 | tail -20
Expected: all PASS
Step 5: Commit
git add api/pipeline.py
git commit -m "feat: meeting pipeline — diarization, speaker ID, 3-doc output"
Task 10: Frontend — speaker naming card
Files:
- Modify:
frontend/index.html(CSS + HTML) - Modify:
frontend/app.js(WS handler + card logic)
Step 1: Add CSS to frontend/index.html
Add inside <style> (before </style>):
.speaker-card {
background: var(--surface); border: 1px solid var(--yellow);
border-radius: 10px; padding: 20px; display: flex; flex-direction: column; gap: 16px;
}
.speaker-card.hidden { display: none; }
.speaker-card h3 { font-size: .8rem; color: var(--yellow); text-transform: uppercase; letter-spacing: .08em; margin: 0; }
.speaker-row { display: flex; flex-direction: column; gap: 8px; }
.speaker-excerpt {
font-size: .82rem; color: var(--muted); background: var(--surface2);
border-radius: 6px; padding: 8px 12px; min-height: 48px;
}
.excerpt-nav { display: flex; align-items: center; gap: 8px; }
.excerpt-nav button {
background: none; border: 1px solid var(--border); color: var(--muted);
border-radius: 4px; width: 28px; height: 28px; cursor: pointer;
font-size: 1rem; display: flex; align-items: center; justify-content: center;
transition: border-color .15s, color .15s;
}
.excerpt-nav button:hover { border-color: var(--yellow); color: var(--yellow); }
.excerpt-counter { font-size: .75rem; color: var(--muted); min-width: 30px; text-align: center; }
.speaker-name-input {
background: var(--surface2); border: 1px solid var(--border); color: var(--text);
border-radius: 6px; padding: 8px 12px; font-family: inherit; font-size: .9rem;
outline: none; width: 100%; transition: border-color .15s;
}
.speaker-name-input:focus { border-color: var(--yellow); }
.speaker-card-actions { display: flex; gap: 10px; }
.card-btn {
font-size: .82rem; padding: 8px 16px; border-radius: 8px;
border: 1px solid var(--border); background: var(--surface2); color: var(--text);
cursor: pointer; font-family: inherit; transition: border-color .15s;
}
.card-btn:hover { border-color: var(--red); }
.card-btn.primary { background: var(--yellow); border-color: var(--yellow); color: #111; }
.card-btn.primary:hover { background: #e6c200; border-color: #e6c200; }
Step 2: Add HTML to frontend/index.html
Add just before <section class="record-section">:
<div id="speaker-card" class="speaker-card hidden">
<h3>Wer hat gesprochen?</h3>
<div id="speaker-rows"></div>
<div class="speaker-card-actions">
<button class="card-btn primary" id="speaker-confirm-btn">Übernehmen</button>
<button class="card-btn" id="speaker-anon-btn">Anonym lassen</button>
</div>
</div>
Step 3: Add JS to frontend/app.js
Add after the existing constants at the top:
const speakerCard = document.getElementById('speaker-card');
const speakerRows = document.getElementById('speaker-rows');
const speakerConfirmBtn = document.getElementById('speaker-confirm-btn');
const speakerAnonBtn = document.getElementById('speaker-anon-btn');
let _speakerData = []; // [{id, excerpts, inputEl, currentIdx}, ...]
Add to the STATUS_LABELS:
awaiting_speakers: 'Sprecher zuordnen\u2026',
Replace the ws.onmessage handler — add handling for speakers_unknown:
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.event === 'processing') setStatus('processing');
if (msg.event === 'saved') {
setStatus('idle');
hideSpeakerCard();
loadTranscripts();
}
if (msg.event === 'error') setStatus('error');
if (msg.event === 'speakers_unknown') showSpeakerCard(msg.speakers);
};
Add functions for the speaker card:
function showSpeakerCard(speakers) {
_speakerData = [];
speakerRows.replaceChildren();
speakers.forEach(function(s) {
const row = document.createElement('div');
row.className = 'speaker-row';
const excerptEl = document.createElement('div');
excerptEl.className = 'speaker-excerpt';
excerptEl.textContent = s.excerpts[0] || '';
const counter = document.createElement('span');
counter.className = 'excerpt-counter';
counter.textContent = s.excerpts.length > 1 ? '1/' + s.excerpts.length : '';
let idx = 0;
const prev = document.createElement('button');
prev.textContent = '\u2039';
const next = document.createElement('button');
next.textContent = '\u203a';
function updateExcerpt() {
excerptEl.textContent = s.excerpts[idx] || '';
counter.textContent = s.excerpts.length > 1 ? (idx + 1) + '/' + s.excerpts.length : '';
}
prev.addEventListener('click', function() {
if (idx > 0) { idx--; updateExcerpt(); }
});
next.addEventListener('click', function() {
if (idx < s.excerpts.length - 1) { idx++; updateExcerpt(); }
});
const nav = document.createElement('div');
nav.className = 'excerpt-nav';
if (s.excerpts.length > 1) { nav.append(prev, counter, next); }
const input = document.createElement('input');
input.type = 'text';
input.className = 'speaker-name-input';
input.placeholder = s.id.replace('SPEAKER_0', 'Sprecher ').replace('SPEAKER_', 'Sprecher ');
row.append(excerptEl, nav, input);
speakerRows.appendChild(row);
_speakerData.push({ id: s.id, input: input });
});
speakerCard.classList.remove('hidden');
setStatus('awaiting_speakers');
}
function hideSpeakerCard() {
speakerCard.classList.add('hidden');
_speakerData = [];
}
async function submitSpeakers(useNames) {
const body = {};
_speakerData.forEach(function(s) {
body[s.id] = useNames ? s.input.value.trim() : '';
});
await apiFetch('/speakers', { method: 'POST', body: JSON.stringify(body) });
}
speakerConfirmBtn.addEventListener('click', function() { submitSpeakers(true); });
speakerAnonBtn.addEventListener('click', function() { submitSpeakers(false); });
Step 4: Run full suite (no automated test for UI, visual check at step 5)
.venv/bin/pytest -v 2>&1 | tail -15
Expected: all PASS (no test for UI JS)
Step 5: Commit
git add frontend/index.html frontend/app.js
git commit -m "feat: speaker naming card with excerpt navigator in main UI"
Task 11: Settings page — diarization section
Files:
- Modify:
frontend/settings.html - Modify:
frontend/settings.js
Step 1: Add HTML section to frontend/settings.html
After the </section> of the Processing section, add:
<section>
<h2>Diarisierung</h2>
<div class="field">
<label style="display:flex;align-items:center;gap:10px;cursor:pointer;">
<input type="checkbox" id="diar-enabled" style="width:auto;">
Sprecher-Erkennung aktivieren
</label>
</div>
<div class="field">
<label>HuggingFace Token</label>
<input type="text" id="diar-token" placeholder="hf_...">
</div>
<p style="font-size:.78rem;color:var(--muted);margin-top:4px;">
Einmalig: <a href="https://huggingface.co/pyannote/speaker-diarization-3.1"
target="_blank" style="color:var(--muted);">pyannote-Modell freischalten</a>
und Token mit <strong>Read</strong>-Berechtigung erstellen.
</p>
</section>
Step 2: Add to frontend/settings.js
In loadConfig(), add after the ollama lines:
document.getElementById('diar-enabled').checked = !!(cfg.diarization && cfg.diarization.enabled);
document.getElementById('diar-token').value = (cfg.diarization && cfg.diarization.hf_token) || '';
In the save button handler, add to body:
diarization: {
enabled: document.getElementById('diar-enabled').checked,
hf_token: document.getElementById('diar-token').value,
},
Step 3: Run full suite
.venv/bin/pytest -v 2>&1 | tail -15
Expected: all PASS
Step 4: Commit
git add frontend/settings.html frontend/settings.js
git commit -m "feat: diarization section in settings — hf_token and enabled toggle"
Task 12: Update SETUP.md with HuggingFace instructions
Files:
- Modify:
docs/SETUP.md
Step 1: Add section to docs/SETUP.md
Add after the "Firewall" section:
### 5. HuggingFace — pyannote-Modell freischalten (für Diarisierung)
1. Account erstellen auf [huggingface.co](https://huggingface.co)
2. Modell-Seite öffnen: https://huggingface.co/pyannote/speaker-diarization-3.1
→ **"Access repository"** klicken und Nutzungsbedingungen bestätigen
3. Token erstellen: https://huggingface.co/settings/tokens
→ **New token** → Typ: **Read** → Token kopieren
4. Im Transkriptor: Einstellungen → Diarisierung → Token einfügen + aktivieren
Step 2: Commit
git add docs/SETUP.md
git commit -m "docs: HuggingFace setup instructions for pyannote diarization"
Task 13: Full test suite + push
Step 1: Run full test suite
cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest -v
Expected: all tests PASS
Step 2: Manual smoke test checklist
Restart app (pkill -f main.py && .venv/bin/python main.py &), then:
- Einstellungen → Diarisierung: Token eintragen, aktivieren, speichern
- Aufnahme starten, kurzes Gespräch führen
- Status wechselt zu "Sprecher zuordnen…", Karte erscheint
- Excerpts durchblättern (‹ ›)
- Namen eingeben → Übernehmen
- Drei Einträge in der Transkript-Liste (index, transkript, zusammenfassung)
- Index zeigt TL;DR + Links zu den anderen beiden
- Transcript zeigt
**Thomas:** …Absätze - Zusammenfassung hat ## Abschnitte
- Diarisierung deaktiviert → normales Diktat-Verhalten
Step 3: Push
git push