feat: AI-generated title+tldr, subfolder structure, backlinks in transkript/zusammenfassung

- llm: generate_title_and_tldr() returns concise title and 2-3 sentence summary
- output: index in root, transkript+zusammenfassung in {base}/ subdir with backlinks
- pipeline: call generate_title_and_tldr for both solo and meeting recordings
- router: mirror subdir structure when copying to Obsidian vault

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-02 12:07:54 +02:00
parent 1cfb9c127b
commit 336628341b
6 changed files with 1072 additions and 27 deletions
+6 -8
View File
@@ -81,13 +81,8 @@ async def _run_solo_pipeline(cfg, wav_path, output_dir, instructions):
) )
dt = datetime.now() dt = datetime.now()
paths = write_solo_docs(raw_text=raw_text, refined=refined, output_dir=output_dir, dt=dt) title, tldr = await client.generate_title_and_tldr(refined, model=cfg["ollama"]["model"])
paths = write_solo_docs(raw_text=raw_text, refined=refined, output_dir=output_dir, dt=dt, title=title, tldr=tldr)
title = "Diktat"
for line in refined.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
await state.set_status(Status.IDLE) await state.set_status(Status.IDLE)
await broadcast({ await broadcast({
@@ -152,6 +147,7 @@ async def _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cf
transcript_text = "\n\n".join(f"**{spk}:** {txt}" for spk, txt in named_aligned) transcript_text = "\n\n".join(f"**{spk}:** {txt}" for spk, txt in named_aligned)
summary = await client.summarize(transcript_text, model=cfg["ollama"]["model"]) summary = await client.summarize(transcript_text, model=cfg["ollama"]["model"])
title, tldr = await client.generate_title_and_tldr(summary, model=cfg["ollama"]["model"])
dt = datetime.now() dt = datetime.now()
paths = write_meeting_docs( paths = write_meeting_docs(
@@ -161,13 +157,15 @@ async def _run_meeting_pipeline(cfg, wav_path, output_dir, instructions, diar_cf
duration_min=duration_min, duration_min=duration_min,
output_dir=output_dir, output_dir=output_dir,
dt=dt, dt=dt,
title=title,
tldr=tldr,
) )
await state.set_status(Status.IDLE) await state.set_status(Status.IDLE)
await broadcast({ await broadcast({
"event": "saved", "event": "saved",
"path": paths["index"], "path": paths["index"],
"title": f"Meeting {dt.strftime('%d.%m.%Y %H:%M')}", "title": title,
"meeting": True, "meeting": True,
"paths": paths, "paths": paths,
}) })
+9 -5
View File
@@ -223,22 +223,26 @@ async def open_file(body: dict, user: dict = Depends(current_user)):
from urllib.parse import quote from urllib.parse import quote
cfg = load_config() cfg = load_config()
vault = cfg.get("obsidian", {}).get("vault", "").strip() vault = cfg.get("obsidian", {}).get("vault", "").strip()
# If only the index was passed, also include sibling transkript/zusammenfassung # If only the index was passed, also include siblings from subdir
all_paths = list(paths) all_paths = list(paths)
for p in paths: for p in paths:
if p.endswith("-index.md"): if p.endswith("-index.md"):
base = p[: -len("-index.md")] base = os.path.basename(p)[: -len("-index.md")]
subdir = os.path.join(os.path.dirname(p), base)
for suffix in ("-transkript.md", "-zusammenfassung.md"): for suffix in ("-transkript.md", "-zusammenfassung.md"):
sibling = base + suffix sibling = os.path.join(subdir, base + suffix)
if os.path.exists(sibling) and sibling not in all_paths: if os.path.exists(sibling) and sibling not in all_paths:
all_paths.append(sibling) all_paths.append(sibling)
open_target = all_paths[0] open_target = all_paths[0]
if vault and os.path.isdir(vault): if vault and os.path.isdir(vault):
# Mirror directory structure: index → vault root, others → vault/{base}/
for p in all_paths: for p in all_paths:
dest = os.path.join(vault, os.path.basename(p)) rel = os.path.relpath(p, abs_user_dir)
dest = os.path.join(vault, rel)
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.copy2(p, dest) shutil.copy2(p, dest)
open_target = os.path.join(vault, os.path.basename(all_paths[0])) open_target = os.path.join(vault, os.path.relpath(all_paths[0], abs_user_dir))
vault_name = os.path.basename(vault.rstrip("/")) if vault else "" vault_name = os.path.basename(vault.rstrip("/")) if vault else ""
file_name = os.path.basename(open_target) file_name = os.path.basename(open_target)
if vault_name: if vault_name:
@@ -0,0 +1,921 @@
# Settings Page & Remote Whisper Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add a settings page with PipeWire audio device selection and remote Whisper/Ollama URL configuration, enabling clients to offload AI processing to Beastix.
**Architecture:** Config gains `audio.device` and `whisper.base_url`. `transcription.py` branches on `base_url`: local faster-whisper or remote OpenAI-compatible HTTP upload. A new `/settings` page (admin-only) lets users pick PipeWire sources via `pactl` and configure server URLs. PipeWire combined source is created on demand via `pactl load-module`.
**Tech Stack:** FastAPI, httpx (already in deps), pactl (PipeWire), sounddevice, faster-whisper, vanilla JS/CSS (tüit CI dark theme)
---
### Task 1: Extend config defaults
**Files:**
- Modify: `config.py`
- Test: `tests/test_config.py`
**Step 1: Write the failing test**
Add to `tests/test_config.py`:
```python
def test_config_has_audio_and_whisper_base_url():
import config
from unittest.mock import patch
import tempfile, os
with tempfile.TemporaryDirectory() as tmpdir:
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
cfg = config.load()
assert "audio" in cfg
assert cfg["audio"]["device"] == ""
assert cfg["whisper"]["base_url"] == ""
```
**Step 2: Run to verify it fails**
```bash
pytest tests/test_config.py::test_config_has_audio_and_whisper_base_url -v
```
Expected: FAIL — KeyError or AssertionError
**Step 3: Update `config.py` DEFAULTS**
```python
DEFAULTS = {
"ollama": {
"base_url": "http://localhost:11434",
"model": "gemma3:12b",
},
"whisper": {
"model": "large-v3",
"language": "de",
"device": "auto",
"base_url": "", # empty = local, else http://beastix:8000
},
"audio": {
"device": "", # empty = system default
},
"server": {
"port": 8765,
},
"output": {
"path": os.path.expanduser("~/cloud.shron.de/Hetzner Storagebox/work"),
},
"network": {
"host": "127.0.0.1",
},
"pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"),
}
```
Update the fallback string writer in `_write_defaults`:
```python
f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\nbase_url = ""\n\n')
f.write('[audio]\ndevice = ""\n\n')
```
**Step 4: Run tests**
```bash
pytest tests/test_config.py -v
```
Expected: all PASS
**Step 5: Commit**
```bash
git add config.py tests/test_config.py
git commit -m "feat: add audio.device and whisper.base_url to config defaults"
```
---
### Task 2: Remote Whisper in transcription.py
**Files:**
- Modify: `transcription.py`
- Modify: `api/pipeline.py`
- Test: `tests/test_transcription.py`
**Step 1: Write the failing test**
Add to `tests/test_transcription.py`:
```python
import pytest
@pytest.mark.asyncio
async def test_transcribe_uses_remote_when_base_url_set(tmp_path):
import wave, struct
wav = tmp_path / "test.wav"
with wave.open(str(wav), "wb") as wf:
wf.setnchannels(1); wf.setsampwidth(2); wf.setframerate(16000)
wf.writeframes(struct.pack("<100h", *([0] * 100)))
import respx, httpx
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
with respx.mock:
respx.post("http://beastix:8000/v1/audio/transcriptions").mock(
return_value=httpx.Response(200, json={"text": "Hallo Welt"})
)
result = await eng.transcribe_file(
str(wav), language="de", model_name="large-v3",
device="auto", base_url="http://beastix:8000",
)
assert result == "Hallo Welt"
```
**Step 2: Run to verify it fails**
```bash
pytest tests/test_transcription.py::test_transcribe_uses_remote_when_base_url_set -v
```
Expected: FAIL — `transcribe_file` doesn't accept `base_url`
**Step 3: Rewrite `transcription.py`**
```python
import asyncio
import httpx
class TranscriptionEngine:
_model = None
def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
if self._model is None:
from faster_whisper import WhisperModel
if device == "auto":
try:
self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
except Exception:
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
else:
compute = "float16" if device in ("cuda", "rocm") else "int8"
self._model = WhisperModel(model_name, device=device, compute_type=compute)
return self._model
async def transcribe_file(
self,
audio_path: str,
language: str = "de",
model_name: str = "large-v3",
device: str = "auto",
base_url: str = "",
) -> str:
if base_url:
return await self._transcribe_remote(audio_path, language, model_name, base_url)
return await self._transcribe_local(audio_path, language, model_name, device)
async def _transcribe_remote(
self, audio_path: str, language: str, model_name: str, base_url: str
) -> str:
async with httpx.AsyncClient(timeout=300) as client:
with open(audio_path, "rb") as f:
r = await client.post(
f"{base_url}/v1/audio/transcriptions",
files={"file": ("audio.wav", f, "audio/wav")},
data={"model": model_name, "language": language},
)
r.raise_for_status()
return r.json()["text"]
async def _transcribe_local(
self, audio_path: str, language: str, model_name: str, device: str
) -> str:
loop = asyncio.get_event_loop()
model = self._get_model(model_name, device)
segments, _ = await loop.run_in_executor(
None,
lambda: model.transcribe(audio_path, language=language),
)
return "".join(seg.text for seg in segments).strip()
engine = TranscriptionEngine()
```
**Step 4: Update `api/pipeline.py` — pass base_url**
In `run_pipeline`, update the `transcribe_file` call:
```python
raw_text = await transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
base_url=cfg["whisper"].get("base_url", ""),
)
```
**Step 5: Run all transcription tests**
```bash
pytest tests/test_transcription.py -v
```
Expected: all PASS
**Step 6: Commit**
```bash
git add transcription.py api/pipeline.py tests/test_transcription.py
git commit -m "feat: remote Whisper via whisper.base_url — OpenAI-compatible upload"
```
---
### Task 3: Audio device in AudioRecorder
**Files:**
- Modify: `audio.py`
- Modify: `api/router.py` (toggle endpoint)
- Test: `tests/test_audio.py`
**Step 1: Write the failing test**
Add to `tests/test_audio.py`:
```python
def test_recorder_stores_device_param():
from audio import AudioRecorder
rec = AudioRecorder(device="my-pipewire-source")
assert rec.device == "my-pipewire-source"
def test_recorder_device_none_when_empty_string():
from audio import AudioRecorder
rec = AudioRecorder(device="")
assert rec.device is None
```
**Step 2: Run to verify they fail**
```bash
pytest tests/test_audio.py::test_recorder_stores_device_param tests/test_audio.py::test_recorder_device_none_when_empty_string -v
```
Expected: FAIL
**Step 3: Update `audio.py`**
```python
import wave
import threading
import numpy as np
class AudioRecorder:
def __init__(self, sample_rate: int = 16000, device: str | None = None):
self.sample_rate = sample_rate
self.device = device or None # empty string becomes None = system default
self._buffer: list[np.ndarray] = []
self._stream = None
self.is_recording = False
self._lock = threading.Lock()
def _callback(self, indata, frames, time, status):
if self.is_recording:
with self._lock:
self._buffer.append(indata[:, 0].copy().astype(np.int16))
def start(self):
import sounddevice as sd
self._buffer = []
self.is_recording = True
self._stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
dtype="int16",
callback=self._callback,
device=self.device,
)
self._stream.start()
def stop(self):
self.is_recording = False
if self._stream:
self._stream.stop()
self._stream.close()
self._stream = None
def save_wav(self, path: str) -> str:
with self._lock:
data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16)
with wave.open(path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(self.sample_rate)
wf.writeframes(data.tobytes())
return path
```
**Step 4: Pass device from config in `api/router.py` toggle endpoint**
In `toggle_recording`, update the `Status.IDLE` branch:
```python
if state.status == Status.IDLE:
from audio import AudioRecorder
audio_device = cfg.get("audio", {}).get("device") or None
state._recorder = AudioRecorder(device=audio_device)
...
```
Also load config at the top of toggle_recording (it's already imported):
```python
cfg = load_config()
```
**Step 5: Run tests**
```bash
pytest tests/test_audio.py -v
```
Expected: all PASS
**Step 6: Commit**
```bash
git add audio.py api/router.py tests/test_audio.py
git commit -m "feat: AudioRecorder accepts device param — reads audio.device from config"
```
---
### Task 4: API — GET /audio/devices and POST /audio/combined
**Files:**
- Modify: `api/router.py`
- Test: `tests/test_api.py`
**Step 1: Write the failing tests**
Add to `tests/test_api.py`:
```python
def test_audio_devices_returns_list(monkeypatch):
import subprocess
pactl_output = (
"1\talsa_input.pci.analog-stereo\tPipeWire\ts32le 2ch 48000Hz\tRUNNING\n"
"2\talsa_output.pci.analog-stereo.monitor\tPipeWire\ts32le 2ch 48000Hz\tIDLE\n"
)
monkeypatch.setattr(subprocess, "check_output", lambda *a, **kw: pactl_output.encode())
from unittest.mock import patch
with patch("api.router.current_user",
return_value={"username": "u", "output_dir": "/tmp", "is_admin": True}):
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
r = client.get("/audio/devices", headers={"Authorization": "Bearer fake"})
assert r.status_code == 200
devices = r.json()
assert len(devices) == 2
assert devices[0]["name"] == "alsa_input.pci.analog-stereo"
def test_audio_devices_forbidden_for_non_admin():
from unittest.mock import patch
with patch("api.router.current_user",
return_value={"username": "u", "output_dir": "/tmp", "is_admin": False}):
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
r = client.get("/audio/devices", headers={"Authorization": "Bearer fake"})
assert r.status_code == 403
```
**Step 2: Run to verify they fail**
```bash
pytest tests/test_api.py::test_audio_devices_returns_list tests/test_api.py::test_audio_devices_forbidden_for_non_admin -v
```
Expected: FAIL — routes don't exist
**Step 3: Add endpoints to `api/router.py`**
```python
@router.get("/audio/devices")
async def list_audio_devices(user: dict = Depends(current_user)):
import subprocess
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren")
try:
out = subprocess.check_output(
["pactl", "list", "sources", "short"],
stderr=subprocess.DEVNULL, timeout=5,
).decode()
except Exception as e:
raise HTTPException(status_code=500, detail=f"pactl fehlgeschlagen: {e}")
devices = []
for line in out.strip().splitlines():
parts = line.split("\t")
if len(parts) >= 2:
devices.append({
"index": parts[0],
"name": parts[1],
"state": parts[4] if len(parts) > 4 else "",
})
return devices
@router.post("/audio/combined")
async def create_combined_source(body: dict, user: dict = Depends(current_user)):
import subprocess, json, pathlib
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren")
mic = body.get("mic", "")
monitor = body.get("monitor", "")
if not mic or not monitor:
raise HTTPException(status_code=400, detail="mic und monitor erforderlich")
# Validate: names must come from pactl list — no shell injection via user input
out = subprocess.check_output(
["pactl", "list", "sources", "short"], stderr=subprocess.DEVNULL, timeout=5
).decode()
known = {line.split("\t")[1] for line in out.strip().splitlines() if "\t" in line}
if mic not in known or monitor not in known:
raise HTTPException(status_code=400, detail="Unbekanntes Audio-Device")
sink_id = subprocess.check_output([
"pactl", "load-module", "module-null-sink",
"sink_name=transkriptor-combined",
"sink_properties=device.description=Transkriptor Combined",
], timeout=5).decode().strip()
mic_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={mic}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
mon_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={monitor}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
state_path = pathlib.Path(
os.path.expanduser("~/.config/tueit-transcriber/pipewire-modules.json")
)
state_path.write_text(json.dumps({"ids": [int(sink_id), int(mic_id), int(mon_id)]}))
return {"device": "transkriptor-combined.monitor", "module_ids": [sink_id, mic_id, mon_id]}
```
**Step 4: Run tests**
```bash
pytest tests/test_api.py::test_audio_devices_returns_list tests/test_api.py::test_audio_devices_forbidden_for_non_admin -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add api/router.py tests/test_api.py
git commit -m "feat: GET /audio/devices, POST /audio/combined — PipeWire source management"
```
---
### Task 5: Fix PUT /config to deep-merge
**Files:**
- Modify: `api/router.py`
- Test: `tests/test_api.py`
Current `put_config` does a shallow `cfg.update(body)` — overwrites nested dicts. Must deep-merge.
**Step 1: Write the failing test**
Add to `tests/test_api.py`:
```python
def test_put_config_deep_merges(tmp_path, monkeypatch):
import config as cfg_mod
monkeypatch.setattr(cfg_mod, "CONFIG_PATH",
str(tmp_path / "config.toml"))
from unittest.mock import patch
with patch("api.router.current_user",
return_value={"username": "u", "output_dir": "/tmp", "is_admin": True}):
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
r = client.put("/config",
json={"whisper": {"base_url": "http://beastix:8000"}},
headers={"Authorization": "Bearer fake"})
assert r.status_code == 200
data = r.json()
# base_url updated, model preserved
assert data["whisper"]["base_url"] == "http://beastix:8000"
assert data["whisper"]["model"] == "large-v3"
```
**Step 2: Run to verify it fails**
```bash
pytest tests/test_api.py::test_put_config_deep_merges -v
```
Expected: FAIL — shallow update loses whisper.model
**Step 3: Fix `put_config` in `api/router.py`**
```python
@router.put("/config")
async def put_config(body: dict, user: dict = Depends(current_user)):
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern")
import tomli_w
from config import _deep_merge, CONFIG_PATH
cfg = load_config()
merged = _deep_merge(cfg, body)
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(merged, f)
return merged
```
**Step 4: Run tests**
```bash
pytest tests/test_api.py::test_put_config_deep_merges -v
```
Expected: PASS
**Step 5: Commit**
```bash
git add api/router.py tests/test_api.py
git commit -m "fix: PUT /config deep-merges nested config instead of shallow update"
```
---
### Task 6: GET /status returns is_admin
**Files:**
- Modify: `api/router.py`
- Test: `tests/test_api.py`
**Step 1: Write the failing test**
Add to `tests/test_api.py`:
```python
def test_status_includes_is_admin():
from unittest.mock import patch
with patch("api.router.current_user",
return_value={"username": "u", "output_dir": "/tmp", "is_admin": True}):
from fastapi.testclient import TestClient
from main import app
client = TestClient(app)
r = client.get("/status", headers={"Authorization": "Bearer fake"})
assert r.status_code == 200
assert r.json()["is_admin"] is True
```
**Step 2: Run to verify it fails**
```bash
pytest tests/test_api.py::test_status_includes_is_admin -v
```
Expected: FAIL
**Step 3: Update `get_status` in `api/router.py`**
```python
@router.get("/status")
async def get_status(user: dict = Depends(current_user)):
return {
"status": state.status,
"username": user["username"],
"is_admin": user.get("is_admin", False),
}
```
**Step 4: Add gear icon in `frontend/app.js` init block**
In the `(async () => { ... })()` init, after `userChip.textContent = data.username`:
```javascript
if (data.is_admin) {
const gearLink = document.createElement('a');
gearLink.href = '/settings';
gearLink.className = 'back-btn';
gearLink.title = 'Einstellungen';
gearLink.textContent = '\u2699'; // ⚙ gear symbol
document.querySelector('.header-right').prepend(gearLink);
}
```
**Step 5: Run tests**
```bash
pytest tests/test_api.py::test_status_includes_is_admin -v
```
Expected: PASS
**Step 6: Commit**
```bash
git add api/router.py frontend/app.js tests/test_api.py
git commit -m "feat: status includes is_admin, gear icon in header for admins"
```
---
### Task 7: Settings page HTML + JS + routes
**Files:**
- Create: `frontend/settings.html`
- Create: `frontend/settings.js`
- Modify: `api/router.py`
- Modify: `main.py`
**Step 1: Add GET /settings to `api/router.py`**
```python
@router.get("/settings")
async def settings_page_route(user: dict = Depends(current_user)):
from fastapi.responses import FileResponse, RedirectResponse
from pathlib import Path
if not user.get("is_admin"):
return RedirectResponse("/")
return FileResponse(str(Path(__file__).parent.parent / "frontend" / "settings.html"))
```
**Step 2: Add `/settings.js` route to `main.py`**
```python
@app.get("/settings.js")
async def settingsjs():
return FileResponse(str(FRONTEND_DIR / "settings.js"))
```
**Step 3: Create `frontend/settings.html`**
```html
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>tüit Transkriptor — Einstellungen</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link href="https://fonts.googleapis.com/css2?family=Overpass:wght@300;400;600;700&display=swap" rel="stylesheet">
<style>
:root { --red:#DA251C;--yellow:#FFD802;--bg:#111;--surface:#1a1a1a;--surface2:#232323;--text:#e8e8e8;--muted:#888;--border:#2e2e2e; }
*{box-sizing:border-box;margin:0;padding:0;}
body{font-family:'Overpass',system-ui,sans-serif;background:var(--bg);color:var(--text);min-height:100vh;display:flex;flex-direction:column;}
header{display:flex;align-items:center;gap:12px;padding:16px 24px;border-bottom:1px solid var(--border);}
.header-logo{height:28px;width:auto;display:block;}
.header-divider{width:1px;height:20px;background:var(--border);flex-shrink:0;}
.header-appname{font-size:1rem;font-weight:600;letter-spacing:.04em;color:var(--muted);}
.header-right{margin-left:auto;display:flex;align-items:center;gap:12px;}
.back-btn{font-size:.75rem;padding:4px 10px;border-radius:20px;background:none;border:1px solid var(--border);color:var(--muted);cursor:pointer;font-family:inherit;text-decoration:none;transition:border-color .15s,color .15s;}
.back-btn:hover{border-color:var(--red);color:var(--red);}
main{flex:1;display:flex;flex-direction:column;gap:24px;padding:24px;max-width:700px;width:100%;margin:0 auto;}
h2{font-size:.8rem;color:var(--muted);text-transform:uppercase;letter-spacing:.06em;margin-bottom:12px;padding-bottom:8px;border-bottom:1px solid var(--border);}
.field{display:flex;flex-direction:column;gap:6px;margin-bottom:14px;}
label{font-size:.78rem;color:var(--muted);letter-spacing:.04em;}
select,input[type=text]{background:var(--surface);border:1px solid var(--border);color:var(--text);border-radius:8px;padding:10px 12px;font-family:inherit;font-size:.9rem;outline:none;transition:border-color .15s;width:100%;}
select:focus,input[type=text]:focus{border-color:var(--yellow);}
.btn-row{display:flex;gap:10px;margin-top:4px;}
.btn{font-size:.82rem;padding:8px 16px;border-radius:8px;border:1px solid var(--border);background:var(--surface2);color:var(--text);cursor:pointer;font-family:inherit;transition:border-color .15s,background .15s;}
.btn:hover{border-color:var(--red);}
.btn.primary{background:var(--red);border-color:var(--red);color:#fff;}
.btn.primary:hover{background:#b81e16;border-color:#b81e16;}
.toast{position:fixed;bottom:24px;right:24px;background:var(--surface2);border:1px solid var(--border);border-radius:8px;padding:10px 16px;font-size:.85rem;opacity:0;transition:opacity .2s;pointer-events:none;}
.toast.show{opacity:1;}
.combined-form{display:none;flex-direction:column;gap:10px;margin-top:10px;padding:12px;background:var(--surface2);border-radius:8px;border:1px solid var(--border);}
.combined-form.visible{display:flex;}
</style>
</head>
<body>
<header>
<img src="/logo.svg" class="header-logo" alt="tüit">
<div class="header-divider"></div>
<span class="header-appname">Transkriptor — Einstellungen</span>
<div class="header-right">
<a href="/" class="back-btn">&#8592; Zurück</a>
</div>
</header>
<main>
<section>
<h2>Audio</h2>
<div class="field">
<label>Aufnahmequelle</label>
<select id="audio-device">
<option value="">Systemstandard</option>
</select>
</div>
<div class="btn-row">
<button class="btn" id="refresh-devices-btn">Geräte aktualisieren</button>
<button class="btn" id="create-combined-btn">Combined Source erstellen</button>
</div>
<div class="combined-form" id="combined-form">
<div class="field">
<label>Mikrofon</label>
<select id="combined-mic"></select>
</div>
<div class="field">
<label>System-Audio Monitor</label>
<select id="combined-monitor"></select>
</div>
<div class="btn-row">
<button class="btn primary" id="combined-confirm-btn">Erstellen</button>
<button class="btn" id="combined-cancel-btn">Abbrechen</button>
</div>
</div>
</section>
<section>
<h2>Verarbeitung</h2>
<div class="field">
<label>Whisper Server URL (leer = lokal)</label>
<input type="text" id="whisper-url" placeholder="http://beastix:8000">
</div>
<div class="field">
<label>Whisper Modell</label>
<input type="text" id="whisper-model" placeholder="large-v3">
</div>
<div class="field">
<label>Ollama Server URL</label>
<input type="text" id="ollama-url" placeholder="http://localhost:11434">
</div>
<div class="field">
<label>Ollama Modell</label>
<select id="ollama-model"></select>
</div>
<div class="btn-row">
<button class="btn primary" id="save-btn">Speichern</button>
</div>
</section>
</main>
<div class="toast" id="toast"></div>
<script src="/settings.js"></script>
</body>
</html>
```
**Step 4: Create `frontend/settings.js`**
```javascript
const token = sessionStorage.getItem('token');
function authHeaders() {
return token ? { 'Authorization': 'Bearer ' + token } : {};
}
function apiFetch(url, options) {
options = options || {};
return fetch(url, Object.assign({}, options, {
headers: Object.assign({'Content-Type': 'application/json'}, authHeaders(), options.headers || {}),
}));
}
let _devices = [];
function showToast(msg) {
const t = document.getElementById('toast');
t.textContent = msg;
t.classList.add('show');
setTimeout(function() { t.classList.remove('show'); }, 2500);
}
async function loadDevices() {
const r = await apiFetch('/audio/devices');
if (!r.ok) return;
_devices = await r.json();
const sel = document.getElementById('audio-device');
const current = sel.value;
sel.replaceChildren(new Option('Systemstandard', ''));
_devices.forEach(function(d) { sel.appendChild(new Option(d.name, d.name)); });
if (current) sel.value = current;
['combined-mic', 'combined-monitor'].forEach(function(id) {
const el = document.getElementById(id);
el.replaceChildren();
_devices.forEach(function(d) { el.appendChild(new Option(d.name, d.name)); });
});
}
async function loadOllamaModels(baseUrl, current) {
try {
const r = await fetch(baseUrl + '/api/tags');
if (!r.ok) return;
const data = await r.json();
const sel = document.getElementById('ollama-model');
sel.replaceChildren();
(data.models || []).forEach(function(m) { sel.appendChild(new Option(m.name, m.name)); });
if (current) sel.value = current;
} catch(e) {}
}
async function loadConfig() {
const r = await apiFetch('/config');
if (!r.ok) return;
const cfg = await r.json();
document.getElementById('audio-device').value = (cfg.audio && cfg.audio.device) || '';
document.getElementById('whisper-url').value = (cfg.whisper && cfg.whisper.base_url) || '';
document.getElementById('whisper-model').value = (cfg.whisper && cfg.whisper.model) || 'large-v3';
const ollamaUrl = (cfg.ollama && cfg.ollama.base_url) || 'http://localhost:11434';
document.getElementById('ollama-url').value = ollamaUrl;
await loadOllamaModels(ollamaUrl, cfg.ollama && cfg.ollama.model);
}
document.getElementById('refresh-devices-btn').addEventListener('click', loadDevices);
document.getElementById('create-combined-btn').addEventListener('click', function() {
document.getElementById('combined-form').classList.toggle('visible');
});
document.getElementById('combined-cancel-btn').addEventListener('click', function() {
document.getElementById('combined-form').classList.remove('visible');
});
document.getElementById('combined-confirm-btn').addEventListener('click', async function() {
const mic = document.getElementById('combined-mic').value;
const monitor = document.getElementById('combined-monitor').value;
const r = await apiFetch('/audio/combined', {
method: 'POST',
body: JSON.stringify({ mic: mic, monitor: monitor }),
});
if (!r.ok) { showToast('Fehler beim Erstellen'); return; }
const data = await r.json();
showToast('Erstellt: ' + data.device);
document.getElementById('combined-form').classList.remove('visible');
await loadDevices();
document.getElementById('audio-device').value = data.device;
});
document.getElementById('ollama-url').addEventListener('change', function(e) {
loadOllamaModels(e.target.value, document.getElementById('ollama-model').value);
});
document.getElementById('save-btn').addEventListener('click', async function() {
const body = {
audio: { device: document.getElementById('audio-device').value },
whisper: {
base_url: document.getElementById('whisper-url').value,
model: document.getElementById('whisper-model').value,
},
ollama: {
base_url: document.getElementById('ollama-url').value,
model: document.getElementById('ollama-model').value,
},
};
const r = await apiFetch('/config', { method: 'PUT', body: JSON.stringify(body) });
if (r.ok) { showToast('Gespeichert'); } else { showToast('Fehler beim Speichern'); }
});
(async function() {
if (!token) { location.href = '/login'; return; }
await loadDevices();
await loadConfig();
})();
```
**Step 5: Manual verification checklist**
Restart app, open browser as admin:
- [ ] Gear icon (⚙) sichtbar im Header
- [ ] Klick öffnet `/settings`
- [ ] Audio-Dropdown listet PipeWire-Sources
- [ ] "Geräte aktualisieren" lädt Liste neu
- [ ] "Combined Source erstellen" zeigt Mic/Monitor-Dropdowns
- [ ] Nach Erstellen: neues Device in der Liste wählbar
- [ ] Whisper-URL leer → lokale Verarbeitung
- [ ] Whisper-URL gesetzt → Transkript wird remote verarbeitet
- [ ] Ollama-Modelle laden aus konfiguriertem Ollama-Server
- [ ] Speichern → Toast, config.toml aktualisiert
- [ ] Aufnahme nutzt konfiguriertes Audio-Device
- [ ] Non-Admin sieht kein Gear-Icon, `/settings` leitet zu `/` um
**Step 6: Commit**
```bash
git add api/router.py main.py frontend/settings.html frontend/settings.js
git commit -m "feat: settings page — PipeWire audio device + remote Whisper/Ollama config"
```
---
### Task 8: Run full test suite + push
```bash
pytest -v
```
Expected: all tests pass.
```bash
git push
```
---
## Beastix Setup (einmalig, außerhalb App-Code)
```bash
pip install faster-whisper-server
uvicorn faster_whisper_server.main:app --host 0.0.0.0 --port 8000
```
Clients tragen ein:
```toml
[whisper]
base_url = "http://beastix:8000"
```
+34
View File
@@ -5,6 +5,13 @@ Ermittle, welche echten Namen den Sprechern zugeordnet werden können — z.B. d
Antworte NUR mit einem JSON-Objekt: {"SPEAKER_00": "Name oder null", "SPEAKER_01": "Name oder null"} Antworte NUR mit einem JSON-Objekt: {"SPEAKER_00": "Name oder null", "SPEAKER_01": "Name oder null"}
Kein weiterer Text, keine Erklärung.""" Kein weiterer Text, keine Erklärung."""
TITLE_TLDR_PROMPT = """Du bekommst einen aufbereiteten Transkript-Text.
Gib NUR ein JSON-Objekt zurück mit zwei Feldern:
- "title": ein prägnanter, aussagekräftiger Titel (max. 8 Wörter, kein Datum, kein "Diktat")
- "tldr": 2-3 Sätze, die den Inhalt des Transkripts konkret zusammenfassen
Kein weiterer Text, kein Kommentar, kein Markdown-Block."""
SUMMARIZE_PROMPT = """Du bist ein präziser Assistent für Business-Kommunikation. SUMMARIZE_PROMPT = """Du bist ein präziser Assistent für Business-Kommunikation.
Du bekommst ein Gesprächstranskript mit Sprecher-Labels. Du bekommst ein Gesprächstranskript mit Sprecher-Labels.
Erstelle eine strukturierte Zusammenfassung auf Deutsch mit: Erstelle eine strukturierte Zusammenfassung auf Deutsch mit:
@@ -66,6 +73,33 @@ class OllamaClient:
r.raise_for_status() r.raise_for_status()
return r.json()["response"] return r.json()["response"]
async def generate_title_and_tldr(
self,
text: str,
model: str = "gemma3:12b",
) -> tuple[str, str]:
"""Return (title, tldr) for the given text. Falls back to defaults on error."""
import json
async with httpx.AsyncClient(timeout=60) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": f"Text:\n{text[:3000]}",
"system": TITLE_TLDR_PROMPT,
"stream": False,
},
)
r.raise_for_status()
raw = r.json()["response"].strip()
try:
data = json.loads(raw)
title = str(data.get("title", "")).strip() or "Diktat"
tldr = str(data.get("tldr", "")).strip() or "Kein TL;DR verfügbar."
return title, tldr
except Exception:
return "Diktat", "Kein TL;DR verfügbar."
async def identify_speakers( async def identify_speakers(
self, self,
transcript_excerpt: str, transcript_excerpt: str,
+93 -13
View File
@@ -61,6 +61,73 @@ def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]:
return result return result
def write_solo_docs(
raw_text: str,
refined: str,
output_dir: str,
dt: "datetime | None" = None,
title: str = "",
tldr: str = "",
) -> dict[str, str]:
"""Write index (in output_dir), transkript + zusammenfassung (in subdir)."""
if dt is None:
dt = datetime.now()
os.makedirs(output_dir, exist_ok=True)
if not title:
title = "Diktat"
for line in refined.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
if not tldr:
tldr = _extract_tldr(refined)
base = dt.strftime("%Y-%m-%d-%H%M") + "-" + slugify(title)[:50]
date_str = dt.strftime("%d.%m.%Y %H:%M")
frontmatter = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n"
index_filename = f"{base}-index.md"
subdir = os.path.join(output_dir, base)
os.makedirs(subdir, exist_ok=True)
# --- transkript (raw whisper output, in subdir) ---
transkript_filename = f"{base}-transkript.md"
transkript_path = os.path.join(subdir, transkript_filename)
with open(transkript_path, "w", encoding="utf-8") as f:
f.write(frontmatter)
f.write(f"# {title} — Rohtranskript\n\n")
f.write(f"← [Index](../{index_filename})\n\n")
f.write(raw_text)
if not raw_text.endswith("\n"):
f.write("\n")
# --- zusammenfassung (Ollama-polished, in subdir) ---
zusammenfassung_filename = f"{base}-zusammenfassung.md"
zusammenfassung_path = os.path.join(subdir, zusammenfassung_filename)
with open(zusammenfassung_path, "w", encoding="utf-8") as f:
f.write(frontmatter)
f.write(f"← [Index](../{index_filename})\n\n")
f.write(refined)
if not refined.endswith("\n"):
f.write("\n")
# --- index (in output_dir root) ---
index_content = (
f"# {title}\n\n"
f"**Datum:** {date_str}\n\n"
f"> {tldr}\n\n"
f"- [Transkript]({base}/{transkript_filename})\n"
f"- [Zusammenfassung]({base}/{zusammenfassung_filename})\n"
)
index_path = os.path.join(output_dir, index_filename)
with open(index_path, "w", encoding="utf-8") as f:
f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, index]\n---\n\n")
f.write(index_content)
return {"index": index_path, "transkript": transkript_path, "zusammenfassung": zusammenfassung_path}
def write_meeting_docs( def write_meeting_docs(
aligned_segments: list[tuple[str, str]], aligned_segments: list[tuple[str, str]],
summary: str, summary: str,
@@ -68,49 +135,62 @@ def write_meeting_docs(
duration_min: int, duration_min: int,
output_dir: str, output_dir: str,
dt: "datetime | None" = None, dt: "datetime | None" = None,
title: str = "",
tldr: str = "",
) -> dict[str, str]: ) -> dict[str, str]:
"""Write index, transkript, and zusammenfassung. Returns {type: path}.""" """Write index (in output_dir), transkript + zusammenfassung (in subdir)."""
if dt is None: if dt is None:
dt = datetime.now() dt = datetime.now()
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
base = dt.strftime("%Y-%m-%d-%H%M") + "-meeting"
if not title:
title = f"Meeting {dt.strftime('%d.%m.%Y %H:%M')}"
if not tldr:
tldr = _extract_tldr(summary)
base = dt.strftime("%Y-%m-%d-%H%M") + "-" + slugify(title)[:50]
date_str = dt.strftime("%d.%m.%Y %H:%M") date_str = dt.strftime("%d.%m.%Y %H:%M")
frontmatter_base = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting]\n---\n\n" frontmatter_base = f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting]\n---\n\n"
# --- transkript --- index_filename = f"{base}-index.md"
subdir = os.path.join(output_dir, base)
os.makedirs(subdir, exist_ok=True)
# --- transkript (in subdir) ---
transcript_lines = [] transcript_lines = []
for speaker, text in aligned_segments: for speaker, text in aligned_segments:
transcript_lines.append(f"**{speaker}:** {text}\n") transcript_lines.append(f"**{speaker}:** {text}\n")
transcript_content = "\n".join(transcript_lines) transcript_content = "\n".join(transcript_lines)
transkript_filename = f"{base}-transkript.md" transkript_filename = f"{base}-transkript.md"
transkript_path = os.path.join(output_dir, transkript_filename) transkript_path = os.path.join(subdir, transkript_filename)
with open(transkript_path, "w", encoding="utf-8") as f: with open(transkript_path, "w", encoding="utf-8") as f:
f.write(frontmatter_base) f.write(frontmatter_base)
f.write(f"← [Index](../{index_filename})\n\n")
f.write(transcript_content) f.write(transcript_content)
if not transcript_content.endswith("\n"): if not transcript_content.endswith("\n"):
f.write("\n") f.write("\n")
# --- zusammenfassung --- # --- zusammenfassung (in subdir) ---
zusammenfassung_filename = f"{base}-zusammenfassung.md" zusammenfassung_filename = f"{base}-zusammenfassung.md"
zusammenfassung_path = os.path.join(output_dir, zusammenfassung_filename) zusammenfassung_path = os.path.join(subdir, zusammenfassung_filename)
with open(zusammenfassung_path, "w", encoding="utf-8") as f: with open(zusammenfassung_path, "w", encoding="utf-8") as f:
f.write(frontmatter_base) f.write(frontmatter_base)
f.write(f"← [Index](../{index_filename})\n\n")
f.write(summary) f.write(summary)
if not summary.endswith("\n"): if not summary.endswith("\n"):
f.write("\n") f.write("\n")
# --- index --- # --- index (in output_dir root) ---
speaker_str = ", ".join(speakers) if speakers else "Unbekannt" speaker_str = ", ".join(speakers) if speakers else "Unbekannt"
tl_dr = _extract_tldr(summary)
index_content = ( index_content = (
f"# Meeting — {date_str}\n\n" f"# {title}\n\n"
f"**Datum:** {date_str} \n"
f"**Sprecher:** {speaker_str} \n" f"**Sprecher:** {speaker_str} \n"
f"**Dauer:** {duration_min} min\n\n" f"**Dauer:** {duration_min} min\n\n"
f"> {tl_dr}\n\n" f"> {tldr}\n\n"
f"- [Transkript]({transkript_filename})\n" f"- [Transkript]({base}/{transkript_filename})\n"
f"- [Zusammenfassung]({zusammenfassung_filename})\n" f"- [Zusammenfassung]({base}/{zusammenfassung_filename})\n"
) )
index_filename = f"{base}-index.md"
index_path = os.path.join(output_dir, index_filename) index_path = os.path.join(output_dir, index_filename)
with open(index_path, "w", encoding="utf-8") as f: with open(index_path, "w", encoding="utf-8") as f:
f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting, index]\n---\n\n") f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript, meeting, index]\n---\n\n")
+9 -1
View File
@@ -68,13 +68,21 @@ def test_write_solo_docs_creates_three_files(tmp_path):
refined="# Projektstatus\n\nDas Projekt läuft gut.\n", refined="# Projektstatus\n\nDas Projekt läuft gut.\n",
output_dir=str(tmp_path), output_dir=str(tmp_path),
dt=datetime(2026, 4, 2, 15, 0), dt=datetime(2026, 4, 2, 15, 0),
title="Projektstatus Update",
tldr="Das Projekt läuft gut und ist im Zeitplan.",
) )
assert set(paths.keys()) == {"index", "transkript", "zusammenfassung"} assert set(paths.keys()) == {"index", "transkript", "zusammenfassung"}
assert all(os.path.exists(p) for p in paths.values()) assert all(os.path.exists(p) for p in paths.values())
index = open(paths["index"]).read() index = open(paths["index"]).read()
assert "Projektstatus" in index assert "Projektstatus Update" in index
assert "transkript" in index assert "transkript" in index
assert "zusammenfassung" in index assert "zusammenfassung" in index
# transkript and zusammenfassung are in a subdir
assert os.path.dirname(paths["transkript"]) != str(tmp_path)
assert os.path.dirname(paths["index"]) == str(tmp_path)
# backlinks present
assert "Index" in open(paths["transkript"]).read()
assert "Index" in open(paths["zusammenfassung"]).read()
assert "Das ist der rohe Text" in open(paths["transkript"]).read() assert "Das ist der rohe Text" in open(paths["transkript"]).read()
assert "Projekt läuft gut" in open(paths["zusammenfassung"]).read() assert "Projekt läuft gut" in open(paths["zusammenfassung"]).read()