Files
tueit_Transkriptor/docs/plans/2026-04-02-diarization.md

1444 lines
44 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Speaker Diarization & Name Identification Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add speaker diarization (pyannote.audio) and automatic name identification (Ollama) to the pipeline, producing three documents per meeting: an index, a raw transcript with speaker labels, and a polished summary.
**Architecture:** After recording, Whisper and pyannote run on the same WAV file; their timestamp-based segments are aligned to produce speaker-annotated text. Ollama tries to identify speaker names from context; if it can't, the frontend shows a speaker-naming card with excerpt navigation. The pipeline produces three linked markdown files per meeting.
**Tech Stack:** pyannote.audio 3.x, faster-whisper (already present), httpx, asyncio.Event for pipeline pause, vanilla JS for speaker card
---
### Task 1: Add diarization config defaults
**Files:**
- Modify: `config.py`
- Test: `tests/test_config.py`
**Step 1: Write the failing test**
Add to `tests/test_config.py`:
```python
def test_config_has_diarization_defaults():
from unittest.mock import patch
import tempfile, os
with tempfile.TemporaryDirectory() as tmpdir:
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
import config
cfg = config.load()
assert "diarization" in cfg
assert cfg["diarization"]["enabled"] is False
assert cfg["diarization"]["hf_token"] == ""
```
**Step 2: Run to verify it fails**
```bash
cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest tests/test_config.py::test_config_has_diarization_defaults -v
```
Expected: FAIL — KeyError
**Step 3: Add to `config.py` DEFAULTS**
```python
"diarization": {
"enabled": False,
"hf_token": "",
},
```
Also add to the `_write_defaults` fallback string:
```python
f.write('[diarization]\nenabled = false\nhf_token = ""\n\n')
```
**Step 4: Run all config tests**
```bash
.venv/bin/pytest tests/test_config.py -v
```
Expected: all PASS
**Step 5: Commit**
```bash
git add config.py tests/test_config.py
git commit -m "feat: add diarization config defaults (enabled=false, hf_token)"
```
---
### Task 2: Extend transcription.py to return segments with timestamps
**Files:**
- Modify: `transcription.py`
- Test: `tests/test_transcription.py`
**Context:** The pipeline needs timestamps to align Whisper segments with pyannote speaker segments. Add `with_segments: bool = False` — when True, return `list[dict]` with `{start, end, text}` instead of a plain string. Backward compatible: default False keeps existing callers working.
**Step 1: Write the failing tests**
Add to `tests/test_transcription.py`:
```python
def test_transcribe_file_returns_segments_when_requested(tmp_path):
wav = tmp_path / "test.wav"
wav.write_bytes(b"\x00" * 100)
mock_model = MagicMock()
mock_seg = MagicMock()
mock_seg.text = " Hallo Welt"
mock_seg.start = 0.0
mock_seg.end = 1.5
mock_model.transcribe.return_value = ([mock_seg], MagicMock())
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
eng._model = mock_model
result = asyncio.run(eng.transcribe_file(str(wav), language="de", with_segments=True))
assert isinstance(result, list)
assert result[0]["text"] == "Hallo Welt"
assert result[0]["start"] == 0.0
assert result[0]["end"] == 1.5
@pytest.mark.asyncio
async def test_transcribe_remote_returns_segments_when_requested(tmp_path):
import wave, struct
wav = tmp_path / "test.wav"
with wave.open(str(wav), "wb") as wf:
wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(16000)
wf.writeframes(struct.pack("<100h", *([0] * 100)))
import respx, httpx
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
with respx.mock:
respx.post("http://beastix:8000/v1/audio/transcriptions").mock(
return_value=httpx.Response(200, json={
"text": "Hallo Welt",
"segments": [{"start": 0.0, "end": 1.5, "text": " Hallo Welt"}],
})
)
result = await eng.transcribe_file(
str(wav), language="de", model_name="large-v3",
device="auto", base_url="http://beastix:8000", with_segments=True,
)
assert isinstance(result, list)
assert result[0]["text"] == "Hallo Welt"
```
**Step 2: Run to verify they fail**
```bash
.venv/bin/pytest tests/test_transcription.py::test_transcribe_file_returns_segments_when_requested tests/test_transcription.py::test_transcribe_remote_returns_segments_when_requested -v
```
**Step 3: Update `transcription.py`**
Replace the entire file:
```python
import asyncio
import httpx
from typing import Union
class TranscriptionEngine:
_model = None
def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
if self._model is None:
from faster_whisper import WhisperModel
if device == "auto":
try:
self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
except Exception:
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
else:
compute = "float16" if device in ("cuda", "rocm") else "int8"
self._model = WhisperModel(model_name, device=device, compute_type=compute)
return self._model
async def transcribe_file(
self,
audio_path: str,
language: str = "de",
model_name: str = "large-v3",
device: str = "auto",
base_url: str = "",
with_segments: bool = False,
) -> Union[str, list[dict]]:
if base_url:
return await self._transcribe_remote(
audio_path, language, model_name, base_url, with_segments
)
return await self._transcribe_local(
audio_path, language, model_name, device, with_segments
)
async def _transcribe_remote(
self,
audio_path: str,
language: str,
model_name: str,
base_url: str,
with_segments: bool,
) -> Union[str, list[dict]]:
async with httpx.AsyncClient(timeout=300) as client:
with open(audio_path, "rb") as f:
data = {"model": model_name, "language": language}
if with_segments:
data["timestamp_granularities[]"] = "segment"
data["response_format"] = "verbose_json"
r = await client.post(
f"{base_url}/v1/audio/transcriptions",
files={"file": ("audio.wav", f, "audio/wav")},
data=data,
)
r.raise_for_status()
body = r.json()
if not with_segments:
return body["text"]
raw_segs = body.get("segments") or []
if raw_segs:
return [
{"start": s["start"], "end": s["end"], "text": s["text"].strip()}
for s in raw_segs
]
# fallback: single segment covering whole file
return [{"start": 0.0, "end": 9999.0, "text": body["text"].strip()}]
async def _transcribe_local(
self,
audio_path: str,
language: str,
model_name: str,
device: str,
with_segments: bool,
) -> Union[str, list[dict]]:
loop = asyncio.get_running_loop()
model = self._get_model(model_name, device)
segments, _ = await loop.run_in_executor(
None,
lambda: model.transcribe(audio_path, language=language),
)
segments = list(segments)
if not with_segments:
return "".join(seg.text for seg in segments).strip()
return [
{"start": seg.start, "end": seg.end, "text": seg.text.strip()}
for seg in segments
if seg.text.strip()
]
engine = TranscriptionEngine()
```
**Step 4: Run all transcription tests**
```bash
.venv/bin/pytest tests/test_transcription.py -v
```
Expected: all PASS
**Step 5: Commit**
```bash
git add transcription.py tests/test_transcription.py
git commit -m "feat: transcribe_file returns timestamped segments when with_segments=True"
```
---
### Task 3: diarization.py — Diarizer class
**Files:**
- Create: `diarization.py`
- Create: `tests/test_diarization.py`
**Context:** Wraps pyannote.audio. Returns `list[tuple[float, float, str]]` — each entry is `(start_sec, end_sec, speaker_label)`. Loaded lazily. Runs in executor to avoid blocking.
**Step 1: Install pyannote.audio**
```bash
cd /home/templis/work/tueit_Transkriptor && .venv/bin/pip install pyannote.audio
```
Add to `requirements.txt`:
```
pyannote.audio>=3.3
```
**Step 2: Write the failing test**
Create `tests/test_diarization.py`:
```python
from unittest.mock import MagicMock, patch
import pytest
def test_diarizer_returns_list_of_tuples(tmp_path):
"""Diarizer.diarize() returns [(start, end, speaker), ...]"""
wav = tmp_path / "test.wav"
wav.write_bytes(b"\x00" * 100)
mock_turn_1 = MagicMock()
mock_turn_1.start = 0.0
mock_turn_1.end = 2.5
mock_track_1 = "A"
mock_label_1 = "SPEAKER_00"
mock_turn_2 = MagicMock()
mock_turn_2.start = 2.6
mock_turn_2.end = 5.0
mock_track_2 = "B"
mock_label_2 = "SPEAKER_01"
mock_annotation = MagicMock()
mock_annotation.itertracks.return_value = [
(mock_turn_1, mock_track_1, mock_label_1),
(mock_turn_2, mock_track_2, mock_label_2),
]
mock_pipeline = MagicMock(return_value=mock_annotation)
import asyncio
from diarization import Diarizer
d = Diarizer.__new__(Diarizer)
d._pipeline = mock_pipeline
result = asyncio.run(d.diarize(str(wav)))
assert result == [(0.0, 2.5, "SPEAKER_00"), (2.6, 5.0, "SPEAKER_01")]
def test_diarizer_requires_hf_token():
from diarization import Diarizer
with pytest.raises(ValueError, match="hf_token"):
Diarizer(hf_token="")
```
**Step 3: Run to verify it fails**
```bash
.venv/bin/pytest tests/test_diarization.py -v
```
Expected: FAIL — `diarization` module not found
**Step 4: Create `diarization.py`**
```python
import asyncio
class Diarizer:
def __init__(self, hf_token: str):
if not hf_token:
raise ValueError("hf_token is required for diarization")
self._hf_token = hf_token
self._pipeline = None
def _load_pipeline(self):
if self._pipeline is None:
from pyannote.audio import Pipeline
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=self._hf_token,
)
return self._pipeline
async def diarize(self, wav_path: str) -> list[tuple[float, float, str]]:
loop = asyncio.get_running_loop()
pipeline = await loop.run_in_executor(None, self._load_pipeline)
annotation = await loop.run_in_executor(None, lambda: pipeline(wav_path))
return [
(turn.start, turn.end, speaker)
for turn, _, speaker in annotation.itertracks(yield_label=True)
]
```
**Step 5: Run tests**
```bash
.venv/bin/pytest tests/test_diarization.py -v
```
Expected: all PASS
**Step 6: Commit**
```bash
git add diarization.py tests/test_diarization.py requirements.txt
git commit -m "feat: Diarizer class wrapping pyannote/speaker-diarization-3.1"
```
---
### Task 4: Alignment — align Whisper segments to pyannote speakers
**Files:**
- Create: `alignment.py`
- Create: `tests/test_alignment.py`
**Context:** For each Whisper segment, find the pyannote speaker with the greatest time overlap. Merge consecutive same-speaker segments into one block. Return `list[tuple[str, str]]``(speaker_label, text)`.
**Step 1: Write the failing tests**
Create `tests/test_alignment.py`:
```python
def test_align_assigns_speaker_by_overlap():
from alignment import align_segments
whisper = [
{"start": 0.0, "end": 2.0, "text": "Hallo"},
{"start": 2.1, "end": 4.0, "text": "Wie geht es"},
]
speakers = [
(0.0, 2.5, "SPEAKER_00"),
(2.5, 5.0, "SPEAKER_01"),
]
result = align_segments(whisper, speakers)
assert result[0] == ("SPEAKER_00", "Hallo")
assert result[1] == ("SPEAKER_01", "Wie geht es")
def test_align_merges_consecutive_same_speaker():
from alignment import align_segments
whisper = [
{"start": 0.0, "end": 1.0, "text": "Hallo"},
{"start": 1.1, "end": 2.0, "text": "Welt"},
]
speakers = [(0.0, 3.0, "SPEAKER_00")]
result = align_segments(whisper, speakers)
assert len(result) == 1
assert result[0] == ("SPEAKER_00", "Hallo Welt")
def test_align_fallback_when_no_speaker_overlap():
from alignment import align_segments
whisper = [{"start": 0.0, "end": 1.0, "text": "Hallo"}]
speakers = []
result = align_segments(whisper, speakers)
assert result[0][0] == "SPEAKER_00"
```
**Step 2: Run to verify they fail**
```bash
.venv/bin/pytest tests/test_alignment.py -v
```
**Step 3: Create `alignment.py`**
```python
def align_segments(
whisper_segs: list[dict],
speaker_segs: list[tuple[float, float, str]],
) -> list[tuple[str, str]]:
"""Assign each Whisper segment to the speaker with the greatest time overlap.
Consecutive segments from the same speaker are merged into one block."""
result: list[tuple[str, str]] = []
for seg in whisper_segs:
speaker = _best_speaker(seg["start"], seg["end"], speaker_segs)
text = seg["text"].strip()
if not text:
continue
if result and result[-1][0] == speaker:
result[-1] = (speaker, result[-1][1] + " " + text)
else:
result.append((speaker, text))
return result
def _best_speaker(
start: float,
end: float,
speaker_segs: list[tuple[float, float, str]],
) -> str:
best_label = "SPEAKER_00"
best_overlap = 0.0
for s_start, s_end, label in speaker_segs:
overlap = max(0.0, min(end, s_end) - max(start, s_start))
if overlap > best_overlap:
best_overlap = overlap
best_label = label
return best_label
```
**Step 4: Run tests**
```bash
.venv/bin/pytest tests/test_alignment.py -v
```
Expected: all PASS
**Step 5: Commit**
```bash
git add alignment.py tests/test_alignment.py
git commit -m "feat: align_segments() — map Whisper timestamps to pyannote speakers"
```
---
### Task 5: llm.py — identify_speakers() and summarize()
**Files:**
- Modify: `llm.py`
- Test: `tests/test_llm.py`
**Step 1: Write the failing tests**
Add to `tests/test_llm.py`:
```python
@pytest.mark.asyncio
async def test_identify_speakers_returns_dict():
import respx, httpx, json
from llm import OllamaClient
client = OllamaClient()
mapping = {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
transcript_excerpt = "SPEAKER_00: Gut, Herr Möller.\nSPEAKER_01: Danke, Thomas."
with respx.mock:
respx.post("http://localhost:11434/api/generate").mock(
return_value=httpx.Response(200, json={"response": json.dumps(mapping)})
)
result = await client.identify_speakers(transcript_excerpt)
assert result == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
@pytest.mark.asyncio
async def test_identify_speakers_returns_empty_on_parse_failure():
import respx, httpx
from llm import OllamaClient
client = OllamaClient()
with respx.mock:
respx.post("http://localhost:11434/api/generate").mock(
return_value=httpx.Response(200, json={"response": "kein json hier"})
)
result = await client.identify_speakers("irgendwas")
assert result == {}
@pytest.mark.asyncio
async def test_summarize_returns_string():
import respx, httpx
from llm import OllamaClient
client = OllamaClient()
with respx.mock:
respx.post("http://localhost:11434/api/generate").mock(
return_value=httpx.Response(200, json={"response": "# Zusammenfassung\n\nKurzer Text."})
)
result = await client.summarize("Thomas: Hallo.\nMöller: Hi.", model="gemma3:12b")
assert "Zusammenfassung" in result
```
**Step 2: Run to verify they fail**
```bash
.venv/bin/pytest tests/test_llm.py::test_identify_speakers_returns_dict tests/test_llm.py::test_identify_speakers_returns_empty_on_parse_failure tests/test_llm.py::test_summarize_returns_string -v
```
**Step 3: Add methods to `llm.py`**
```python
IDENTIFY_SPEAKERS_PROMPT = """Du bekommst den Anfang eines Gesprächstranskripts mit Sprecher-Labels (SPEAKER_00, SPEAKER_01, ...).
Ermittle, welche echten Namen den Sprechern zugeordnet werden können — z.B. durch direkte Anrede ("Herr Möller", "Frank").
Antworte NUR mit einem JSON-Objekt: {"SPEAKER_00": "Name oder null", "SPEAKER_01": "Name oder null"}
Kein weiterer Text, keine Erklärung."""
SUMMARIZE_PROMPT = """Du bist ein präziser Assistent für Business-Kommunikation.
Du bekommst ein Gesprächstranskript mit Sprecher-Labels.
Erstelle eine strukturierte Zusammenfassung auf Deutsch mit:
1. Einem passenden H1-Titel
2. ## Wichtigste Punkte (Aufzählung)
3. ## Offene Fragen (Aufzählung, falls vorhanden)
4. ## Nächste Schritte / Ideen (Aufzählung, falls vorhanden)
Antworte NUR mit dem fertigen Markdown."""
```
Add to `OllamaClient`:
```python
async def identify_speakers(
self,
transcript_excerpt: str,
model: str = "gemma3:12b",
) -> dict[str, str]:
"""Try to map SPEAKER_XX labels to real names. Returns {} on failure."""
import json
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": f"Transkript-Anfang:\n{transcript_excerpt[:2000]}",
"system": IDENTIFY_SPEAKERS_PROMPT,
"stream": False,
},
)
r.raise_for_status()
raw = r.json()["response"].strip()
try:
data = json.loads(raw)
if not isinstance(data, dict):
return {}
return {k: v for k, v in data.items() if v}
except (json.JSONDecodeError, Exception):
return {}
async def summarize(
self,
annotated_transcript: str,
model: str = "gemma3:12b",
) -> str:
async with httpx.AsyncClient(timeout=180) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": f"Transkript:\n{annotated_transcript}",
"system": SUMMARIZE_PROMPT,
"stream": False,
},
)
r.raise_for_status()
return r.json()["response"].strip()
```
**Step 4: Run all llm tests**
```bash
.venv/bin/pytest tests/test_llm.py -v
```
Expected: all PASS
**Step 5: Commit**
```bash
git add llm.py tests/test_llm.py
git commit -m "feat: OllamaClient.identify_speakers() and summarize() for diarization pipeline"
```
---
### Task 6: output.py — write_meeting_docs()
**Files:**
- Modify: `output.py`
- Test: `tests/test_output.py`
**Context:** Writes three files: `{base}-index.md`, `{base}-transkript.md`, `{base}-zusammenfassung.md`. Returns all three paths.
**Step 1: Write the failing test**
Add to `tests/test_output.py`:
```python
def test_write_meeting_docs_creates_three_files(tmp_path):
from output import write_meeting_docs
from datetime import datetime
aligned = [("Thomas", "Gut, dann fangen wir an."), ("Möller", "Ich hab das vorbereitet.")]
paths = write_meeting_docs(
aligned_segments=aligned,
summary="# Meeting\n\n## Wichtigste Punkte\n- Budget besprochen",
speakers=["Thomas", "Möller"],
duration_min=5,
output_dir=str(tmp_path),
dt=datetime(2026, 4, 2, 14, 30),
)
assert len(paths) == 3
index_content = open(paths["index"]).read()
assert "Thomas" in index_content
assert "transkript" in index_content
transcript_content = open(paths["transkript"]).read()
assert "**Thomas:**" in transcript_content
assert "Gut, dann fangen wir an." in transcript_content
summary_content = open(paths["zusammenfassung"]).read()
assert "Budget besprochen" in summary_content
```
**Step 2: Run to verify it fails**
```bash
.venv/bin/pytest tests/test_output.py::test_write_meeting_docs_creates_three_files -v
```
**Step 3: Add to `output.py`**
```python
def write_meeting_docs(
aligned_segments: list[tuple[str, str]],
summary: str,
speakers: list[str],
duration_min: int,
output_dir: str,
dt: "datetime | None" = None,
) -> dict[str, str]:
"""Write index, transkript, and zusammenfassung. Returns {type: path}."""
from datetime import datetime
if dt is None:
dt = datetime.now()
os.makedirs(output_dir, exist_ok=True)
base = dt.strftime("%Y-%m-%d-%H%M") + "-meeting"
date_str = dt.strftime("%d.%m.%Y %H:%M")
frontmatter_base = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting]\n---\n\n"
# --- transkript ---
transcript_lines = []
for speaker, text in aligned_segments:
transcript_lines.append(f"**{speaker}:** {text}\n")
transcript_content = "\n".join(transcript_lines)
transkript_filename = f"{base}-transkript.md"
transkript_path = os.path.join(output_dir, transkript_filename)
with open(transkript_path, "w", encoding="utf-8") as f:
f.write(frontmatter_base)
f.write(transcript_content)
if not transcript_content.endswith("\n"):
f.write("\n")
# --- zusammenfassung ---
zusammenfassung_filename = f"{base}-zusammenfassung.md"
zusammenfassung_path = os.path.join(output_dir, zusammenfassung_filename)
with open(zusammenfassung_path, "w", encoding="utf-8") as f:
f.write(frontmatter_base)
f.write(summary)
if not summary.endswith("\n"):
f.write("\n")
# --- index ---
speaker_str = ", ".join(speakers) if speakers else "Unbekannt"
tl_dr = _extract_tldr(summary)
index_content = (
f"# Meeting — {date_str}\n\n"
f"**Sprecher:** {speaker_str} \n"
f"**Dauer:** {duration_min} min\n\n"
f"> {tl_dr}\n\n"
f"- [Transkript]({transkript_filename})\n"
f"- [Zusammenfassung]({zusammenfassung_filename})\n"
)
index_filename = f"{base}-index.md"
index_path = os.path.join(output_dir, index_filename)
with open(index_path, "w", encoding="utf-8") as f:
f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting, index]\n---\n\n")
f.write(index_content)
return {"index": index_path, "transkript": transkript_path, "zusammenfassung": zusammenfassung_path}
def _extract_tldr(summary: str) -> str:
"""Return the first non-heading, non-empty line from the summary as TL;DR."""
for line in summary.splitlines():
stripped = line.strip()
if stripped and not stripped.startswith("#"):
return stripped[:200]
return "Kein TL;DR verfügbar."
```
**Step 4: Run all output tests**
```bash
.venv/bin/pytest tests/test_output.py -v
```
Expected: all PASS
**Step 5: Commit**
```bash
git add output.py tests/test_output.py
git commit -m "feat: write_meeting_docs() — creates index, transkript, zusammenfassung"
```
---
### Task 7: api/state.py — add pending speaker state
**Files:**
- Modify: `api/state.py`
- Test: `tests/test_api.py`
**Context:** The pipeline pauses after alignment, waiting for speaker names. It uses an `asyncio.Event` stored in state. The `/speakers` endpoint sets names and signals the event.
**Step 1: Write the failing test**
Add to `tests/test_api.py`:
```python
def test_state_has_speaker_fields():
from api.state import AppState
s = AppState()
assert hasattr(s, "_speakers_event")
assert hasattr(s, "_pending_aligned_segments")
assert hasattr(s, "_speaker_names")
assert s._speakers_event is None
assert s._pending_aligned_segments is None
assert s._speaker_names is None
```
**Step 2: Run to verify it fails**
```bash
.venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v
```
**Step 3: Update `api/state.py`**
```python
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
class Status(str, Enum):
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
AWAITING_SPEAKERS = "awaiting_speakers"
ERROR = "error"
@dataclass
class AppState:
status: Status = Status.IDLE
recording_user: str | None = None
last_error: str | None = None
_listeners: list[Callable] = field(default_factory=list, repr=False)
# Diarization pipeline pause
_speakers_event: asyncio.Event | None = None
_pending_aligned_segments: list[tuple[str, str]] | None = None
_speaker_names: dict[str, str] | None = None
def subscribe(self, callback: Callable):
self._listeners.append(callback)
async def notify(self):
for cb in self._listeners:
if asyncio.iscoroutinefunction(cb):
await cb(self)
else:
cb(self)
async def set_status(self, status: Status):
self.status = status
await self.notify()
state = AppState()
```
Note: `AWAITING_SPEAKERS` status is added so the UI can show a distinct state.
**Step 4: Run tests**
```bash
.venv/bin/pytest tests/test_api.py::test_state_has_speaker_fields -v
```
**Step 5: Commit**
```bash
git add api/state.py tests/test_api.py
git commit -m "feat: AppState gains speaker pause fields and AWAITING_SPEAKERS status"
```
---
### Task 8: api/router.py — POST /speakers endpoint
**Files:**
- Modify: `api/router.py`
- Test: `tests/test_api.py`
**Step 1: Write the failing test**
Add to `tests/test_api.py`:
```python
import asyncio as _asyncio
def test_post_speakers_resolves_pipeline_pause():
from main import app
from api.router import current_user
from api.state import state
import asyncio
# Simulate pipeline waiting for speakers
state._speakers_event = asyncio.Event()
state._speaker_names = None
app.dependency_overrides[current_user] = lambda: {"username": "u", "output_dir": "/tmp", "is_admin": False}
try:
from fastapi.testclient import TestClient
client = TestClient(app)
r = client.post("/speakers", json={"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"})
assert r.status_code == 200
assert state._speaker_names == {"SPEAKER_00": "Thomas", "SPEAKER_01": "Möller"}
assert state._speakers_event.is_set()
finally:
app.dependency_overrides.pop(current_user, None)
state._speakers_event = None
state._speaker_names = None
```
**Step 2: Run to verify it fails**
```bash
.venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v
```
**Step 3: Add endpoint to `api/router.py`**
Add after the existing endpoints (before the websocket):
```python
@router.post("/speakers")
async def post_speakers(body: dict, user: dict = Depends(current_user)):
if state._speakers_event is None:
raise HTTPException(status_code=409, detail="Keine ausstehende Sprecher-Zuordnung")
state._speaker_names = {k: v for k, v in body.items() if isinstance(k, str)}
state._speakers_event.set()
return {"ok": True}
```
**Step 4: Run tests**
```bash
.venv/bin/pytest tests/test_api.py::test_post_speakers_resolves_pipeline_pause -v
```
**Step 5: Commit**
```bash
git add api/router.py tests/test_api.py
git commit -m "feat: POST /speakers — resolves pipeline pause with speaker name mapping"
```
---
### Task 9: api/pipeline.py — extend with diarization path
**Files:**
- Modify: `api/pipeline.py`
**Context:** When `diarization.enabled` is true and `hf_token` is set, run Whisper (with segments) and pyannote in parallel, align, try Ollama name identification, emit `speakers_unknown` if needed, then write three documents. If diarization is disabled, run the old single-document path unchanged.
**Step 1: Read current `api/pipeline.py`** — already read above.
**Step 2: No new test here** — the pipeline is tested through integration. The individual components (alignment, diarization, llm) are tested separately.
**Step 3: Rewrite `api/pipeline.py`**
```python
import asyncio
import logging
import os
import tempfile
import traceback
from datetime import datetime
from api.state import state, Status
from api.router import broadcast
from config import load as load_config
from transcription import engine as transcription_engine
from llm import OllamaClient
from output import save_transcript, write_meeting_docs
logger = logging.getLogger(__name__)
async def run_pipeline():
cfg = load_config()
recorder = getattr(state, "_recorder", None)
if recorder is None:
return
output_dir = getattr(state, "_recording_output_dir", cfg["output"]["path"])
instructions = getattr(state, "_recording_instructions", "")
diar_cfg = cfg.get("diarization", {})
use_diarization = diar_cfg.get("enabled") and diar_cfg.get("hf_token")
recorder.stop()
await state.set_status(Status.PROCESSING)
await broadcast({"event": "processing"})
wav_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
wav_path = f.name
recorder.save_wav(wav_path)
if use_diarization:
await _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg)
else:
await _run_solo_pipeline(cfg, wav_path, output_dir, instructions)
except Exception as e:
tb = traceback.format_exc()
logger.error("Pipeline error:\n%s", tb)
state.last_error = str(e)
await state.set_status(Status.ERROR)
await broadcast({"event": "error", "message": str(e)})
finally:
state.recording_user = None
state._recording_output_dir = None
state._recording_instructions = ""
state._speakers_event = None
state._pending_aligned_segments = None
state._speaker_names = None
if wav_path:
try:
os.unlink(wav_path)
except OSError:
pass
async def _run_solo_pipeline(cfg, wav_path, output_dir, instructions):
"""Original single-document pipeline (no diarization)."""
raw_text = await transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
base_url=cfg["whisper"].get("base_url", ""),
)
await broadcast({"event": "transcribed", "raw": raw_text})
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
refined = await client.refine(
raw_text=raw_text,
instructions=instructions,
model=cfg["ollama"]["model"],
)
title = "Diktat"
for line in refined.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
path = save_transcript(title=title, content=refined, output_dir=output_dir)
await broadcast({"event": "saved", "path": path, "title": title})
await state.set_status(Status.IDLE)
async def _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cfg):
"""Diarization pipeline: 3 documents, speaker identification."""
from diarization import Diarizer
from alignment import align_segments
# Run Whisper and pyannote in parallel
diarizer = Diarizer(hf_token=diar_cfg["hf_token"])
whisper_task = asyncio.create_task(
transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
base_url=cfg["whisper"].get("base_url", ""),
with_segments=True,
)
)
diar_task = asyncio.create_task(diarizer.diarize(wav_path))
whisper_segs, speaker_segs = await asyncio.gather(whisper_task, diar_task)
# Align
aligned = align_segments(whisper_segs, speaker_segs)
await broadcast({"event": "transcribed", "raw": " ".join(t for _, t in aligned)})
# Try Ollama name identification
excerpt = "\n".join(f"{s}: {t}" for s, t in aligned[:20])
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
name_map = await client.identify_speakers(excerpt, model=cfg["ollama"]["model"])
if not name_map:
# Pause and ask user
excerpts_per_speaker = _build_excerpts(aligned)
state._speakers_event = asyncio.Event()
state._pending_aligned_segments = aligned
await state.set_status(Status.AWAITING_SPEAKERS)
await broadcast({"event": "speakers_unknown", "speakers": [
{"id": spk, "excerpts": exs}
for spk, exs in excerpts_per_speaker.items()
]})
await state._speakers_event.wait()
name_map = state._speaker_names or {}
# Apply names
def resolve(label):
return name_map.get(label) or label.replace("SPEAKER_0", "Sprecher ").replace("SPEAKER_", "Sprecher ")
named_aligned = [(resolve(spk), text) for spk, text in aligned]
speakers = sorted({spk for spk, _ in named_aligned})
# Duration
total_secs = sum(s["end"] - s["start"] for s in whisper_segs) if whisper_segs else 0
duration_min = max(1, round(total_secs / 60))
# Full transcript text for summarization
transcript_text = "\n\n".join(f"**{spk}:** {txt}" for spk, txt in named_aligned)
# Summarize
summary = await client.summarize(transcript_text, model=cfg["ollama"]["model"])
# Write three documents
dt = datetime.now()
paths = write_meeting_docs(
aligned_segments=named_aligned,
summary=summary,
speakers=speakers,
duration_min=duration_min,
output_dir=output_dir,
dt=dt,
)
await state.set_status(Status.IDLE)
await broadcast({
"event": "saved",
"path": paths["index"],
"title": f"Meeting {dt.strftime('%d.%m.%Y %H:%M')}",
"meeting": True,
"paths": paths,
})
def _build_excerpts(aligned: list[tuple[str, str]], max_per_speaker: int = 4) -> dict[str, list[str]]:
"""Build a dict of speaker → list of text excerpts (3-4 sentences each)."""
from collections import defaultdict
buckets: dict[str, list[str]] = defaultdict(list)
for spk, text in aligned:
if len(buckets[spk]) < max_per_speaker:
buckets[spk].append(text[:200])
return dict(buckets)
```
**Step 4: Run full test suite to check nothing broke**
```bash
.venv/bin/pytest -v 2>&1 | tail -20
```
Expected: all PASS
**Step 5: Commit**
```bash
git add api/pipeline.py
git commit -m "feat: meeting pipeline — diarization, speaker ID, 3-doc output"
```
---
### Task 10: Frontend — speaker naming card
**Files:**
- Modify: `frontend/index.html` (CSS + HTML)
- Modify: `frontend/app.js` (WS handler + card logic)
**Step 1: Add CSS to `frontend/index.html`**
Add inside `<style>` (before `</style>`):
```css
.speaker-card {
background: var(--surface); border: 1px solid var(--yellow);
border-radius: 10px; padding: 20px; display: flex; flex-direction: column; gap: 16px;
}
.speaker-card.hidden { display: none; }
.speaker-card h3 { font-size: .8rem; color: var(--yellow); text-transform: uppercase; letter-spacing: .08em; margin: 0; }
.speaker-row { display: flex; flex-direction: column; gap: 8px; }
.speaker-excerpt {
font-size: .82rem; color: var(--muted); background: var(--surface2);
border-radius: 6px; padding: 8px 12px; min-height: 48px;
}
.excerpt-nav { display: flex; align-items: center; gap: 8px; }
.excerpt-nav button {
background: none; border: 1px solid var(--border); color: var(--muted);
border-radius: 4px; width: 28px; height: 28px; cursor: pointer;
font-size: 1rem; display: flex; align-items: center; justify-content: center;
transition: border-color .15s, color .15s;
}
.excerpt-nav button:hover { border-color: var(--yellow); color: var(--yellow); }
.excerpt-counter { font-size: .75rem; color: var(--muted); min-width: 30px; text-align: center; }
.speaker-name-input {
background: var(--surface2); border: 1px solid var(--border); color: var(--text);
border-radius: 6px; padding: 8px 12px; font-family: inherit; font-size: .9rem;
outline: none; width: 100%; transition: border-color .15s;
}
.speaker-name-input:focus { border-color: var(--yellow); }
.speaker-card-actions { display: flex; gap: 10px; }
.card-btn {
font-size: .82rem; padding: 8px 16px; border-radius: 8px;
border: 1px solid var(--border); background: var(--surface2); color: var(--text);
cursor: pointer; font-family: inherit; transition: border-color .15s;
}
.card-btn:hover { border-color: var(--red); }
.card-btn.primary { background: var(--yellow); border-color: var(--yellow); color: #111; }
.card-btn.primary:hover { background: #e6c200; border-color: #e6c200; }
```
**Step 2: Add HTML to `frontend/index.html`**
Add just before `<section class="record-section">`:
```html
<div id="speaker-card" class="speaker-card hidden">
<h3>Wer hat gesprochen?</h3>
<div id="speaker-rows"></div>
<div class="speaker-card-actions">
<button class="card-btn primary" id="speaker-confirm-btn">Übernehmen</button>
<button class="card-btn" id="speaker-anon-btn">Anonym lassen</button>
</div>
</div>
```
**Step 3: Add JS to `frontend/app.js`**
Add after the existing constants at the top:
```javascript
const speakerCard = document.getElementById('speaker-card');
const speakerRows = document.getElementById('speaker-rows');
const speakerConfirmBtn = document.getElementById('speaker-confirm-btn');
const speakerAnonBtn = document.getElementById('speaker-anon-btn');
let _speakerData = []; // [{id, excerpts, inputEl, currentIdx}, ...]
```
Add to the `STATUS_LABELS`:
```javascript
awaiting_speakers: 'Sprecher zuordnen\u2026',
```
Replace the `ws.onmessage` handler — add handling for `speakers_unknown`:
```javascript
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.event === 'processing') setStatus('processing');
if (msg.event === 'saved') {
setStatus('idle');
hideSpeakerCard();
loadTranscripts();
}
if (msg.event === 'error') setStatus('error');
if (msg.event === 'speakers_unknown') showSpeakerCard(msg.speakers);
};
```
Add functions for the speaker card:
```javascript
function showSpeakerCard(speakers) {
_speakerData = [];
speakerRows.replaceChildren();
speakers.forEach(function(s) {
const row = document.createElement('div');
row.className = 'speaker-row';
const excerptEl = document.createElement('div');
excerptEl.className = 'speaker-excerpt';
excerptEl.textContent = s.excerpts[0] || '';
const counter = document.createElement('span');
counter.className = 'excerpt-counter';
counter.textContent = s.excerpts.length > 1 ? '1/' + s.excerpts.length : '';
let idx = 0;
const prev = document.createElement('button');
prev.textContent = '\u2039';
const next = document.createElement('button');
next.textContent = '\u203a';
function updateExcerpt() {
excerptEl.textContent = s.excerpts[idx] || '';
counter.textContent = s.excerpts.length > 1 ? (idx + 1) + '/' + s.excerpts.length : '';
}
prev.addEventListener('click', function() {
if (idx > 0) { idx--; updateExcerpt(); }
});
next.addEventListener('click', function() {
if (idx < s.excerpts.length - 1) { idx++; updateExcerpt(); }
});
const nav = document.createElement('div');
nav.className = 'excerpt-nav';
if (s.excerpts.length > 1) { nav.append(prev, counter, next); }
const input = document.createElement('input');
input.type = 'text';
input.className = 'speaker-name-input';
input.placeholder = s.id.replace('SPEAKER_0', 'Sprecher ').replace('SPEAKER_', 'Sprecher ');
row.append(excerptEl, nav, input);
speakerRows.appendChild(row);
_speakerData.push({ id: s.id, input: input });
});
speakerCard.classList.remove('hidden');
setStatus('awaiting_speakers');
}
function hideSpeakerCard() {
speakerCard.classList.add('hidden');
_speakerData = [];
}
async function submitSpeakers(useNames) {
const body = {};
_speakerData.forEach(function(s) {
body[s.id] = useNames ? s.input.value.trim() : '';
});
await apiFetch('/speakers', { method: 'POST', body: JSON.stringify(body) });
}
speakerConfirmBtn.addEventListener('click', function() { submitSpeakers(true); });
speakerAnonBtn.addEventListener('click', function() { submitSpeakers(false); });
```
**Step 4: Run full suite (no automated test for UI, visual check at step 5)**
```bash
.venv/bin/pytest -v 2>&1 | tail -15
```
Expected: all PASS (no test for UI JS)
**Step 5: Commit**
```bash
git add frontend/index.html frontend/app.js
git commit -m "feat: speaker naming card with excerpt navigator in main UI"
```
---
### Task 11: Settings page — diarization section
**Files:**
- Modify: `frontend/settings.html`
- Modify: `frontend/settings.js`
**Step 1: Add HTML section to `frontend/settings.html`**
After the `</section>` of the Processing section, add:
```html
<section>
<h2>Diarisierung</h2>
<div class="field">
<label style="display:flex;align-items:center;gap:10px;cursor:pointer;">
<input type="checkbox" id="diar-enabled" style="width:auto;">
Sprecher-Erkennung aktivieren
</label>
</div>
<div class="field">
<label>HuggingFace Token</label>
<input type="text" id="diar-token" placeholder="hf_...">
</div>
<p style="font-size:.78rem;color:var(--muted);margin-top:4px;">
Einmalig: <a href="https://huggingface.co/pyannote/speaker-diarization-3.1"
target="_blank" style="color:var(--muted);">pyannote-Modell freischalten</a>
und Token mit <strong>Read</strong>-Berechtigung erstellen.
</p>
</section>
```
**Step 2: Add to `frontend/settings.js`**
In `loadConfig()`, add after the ollama lines:
```javascript
document.getElementById('diar-enabled').checked = !!(cfg.diarization && cfg.diarization.enabled);
document.getElementById('diar-token').value = (cfg.diarization && cfg.diarization.hf_token) || '';
```
In the save button handler, add to `body`:
```javascript
diarization: {
enabled: document.getElementById('diar-enabled').checked,
hf_token: document.getElementById('diar-token').value,
},
```
**Step 3: Run full suite**
```bash
.venv/bin/pytest -v 2>&1 | tail -15
```
Expected: all PASS
**Step 4: Commit**
```bash
git add frontend/settings.html frontend/settings.js
git commit -m "feat: diarization section in settings — hf_token and enabled toggle"
```
---
### Task 12: Update SETUP.md with HuggingFace instructions
**Files:**
- Modify: `docs/SETUP.md`
**Step 1: Add section to `docs/SETUP.md`**
Add after the "Firewall" section:
```markdown
### 5. HuggingFace — pyannote-Modell freischalten (für Diarisierung)
1. Account erstellen auf [huggingface.co](https://huggingface.co)
2. Modell-Seite öffnen: https://huggingface.co/pyannote/speaker-diarization-3.1
**"Access repository"** klicken und Nutzungsbedingungen bestätigen
3. Token erstellen: https://huggingface.co/settings/tokens
**New token** → Typ: **Read** → Token kopieren
4. Im Transkriptor: Einstellungen → Diarisierung → Token einfügen + aktivieren
```
**Step 2: Commit**
```bash
git add docs/SETUP.md
git commit -m "docs: HuggingFace setup instructions for pyannote diarization"
```
---
### Task 13: Full test suite + push
**Step 1: Run full test suite**
```bash
cd /home/templis/work/tueit_Transkriptor && .venv/bin/pytest -v
```
Expected: all tests PASS
**Step 2: Manual smoke test checklist**
Restart app (`pkill -f main.py && .venv/bin/python main.py &`), then:
- [ ] Einstellungen → Diarisierung: Token eintragen, aktivieren, speichern
- [ ] Aufnahme starten, kurzes Gespräch führen
- [ ] Status wechselt zu "Sprecher zuordnen…", Karte erscheint
- [ ] Excerpts durchblättern ( )
- [ ] Namen eingeben → Übernehmen
- [ ] Drei Einträge in der Transkript-Liste (index, transkript, zusammenfassung)
- [ ] Index zeigt TL;DR + Links zu den anderen beiden
- [ ] Transcript zeigt `**Thomas:** …` Absätze
- [ ] Zusammenfassung hat ## Abschnitte
- [ ] Diarisierung deaktiviert → normales Diktat-Verhalten
**Step 3: Push**
```bash
git push
```