feat: merge implement/transkriptor — full tüit Transkriptor implementation

This commit is contained in:
2026-04-01 11:47:32 +02:00
27 changed files with 1679 additions and 0 deletions
+9
View File
@@ -1 +1,10 @@
.worktrees/ .worktrees/
__pycache__/
*.pyc
*.pyo
.venv/
venv/
*.egg-info/
dist/
.env
data/
+21
View File
@@ -0,0 +1,21 @@
# CLAUDE.md — tüit Transkriptor
Desktop transcription tool. Python, no Docker.
## Key Commands
# Install dependencies
pip install -r requirements.txt
# Run
python main.py
# Run tests
pytest -v
# Trigger recording toggle via signal
pkill -USR1 -f main.py
## Architecture
See docs/plans/2026-04-01-desktop-transcription-design.md
View File
+73
View File
@@ -0,0 +1,73 @@
import os
import tempfile
from api.state import state, Status
from config import load as load_config
from transcription import engine as transcription_engine
from llm import OllamaClient
from output import save_transcript
from api.router import broadcast
async def run_pipeline():
cfg = load_config()
recorder = getattr(state, "_recorder", None)
if recorder is None:
return
output_dir = getattr(state, "_recording_output_dir", cfg["output"]["path"])
instructions = getattr(state, "_recording_instructions", "")
recorder.stop()
await state.set_status(Status.PROCESSING)
await broadcast({"event": "processing"})
wav_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as f:
wav_path = f.name
recorder.save_wav(wav_path)
raw_text = await transcription_engine.transcribe_file(
wav_path,
language=cfg["whisper"]["language"],
model_name=cfg["whisper"]["model"],
device=cfg["whisper"]["device"],
)
await broadcast({"event": "transcribed", "raw": raw_text})
client = OllamaClient(base_url=cfg["ollama"]["base_url"])
refined = await client.refine(
raw_text=raw_text,
instructions=instructions,
model=cfg["ollama"]["model"],
)
await broadcast({"event": "refined", "markdown": refined})
title = "Diktat"
for line in refined.splitlines():
if line.startswith("# "):
title = line[2:].strip()
break
path = save_transcript(
title=title,
content=refined,
output_dir=output_dir,
)
await broadcast({"event": "saved", "path": path, "title": title})
await state.set_status(Status.IDLE)
except Exception as e:
state.last_error = str(e)
await state.set_status(Status.ERROR)
await broadcast({"event": "error", "message": str(e)})
finally:
state.recording_user = None
state._recording_output_dir = None
state._recording_instructions = ""
if wav_path:
try:
os.unlink(wav_path)
except OSError:
pass
+144
View File
@@ -0,0 +1,144 @@
import asyncio
import os
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException, Header
from api.state import state, Status
from config import load as load_config
from output import list_transcripts
router = APIRouter()
_ws_clients: list[WebSocket] = []
# ---------------------------------------------------------------------------
# Auth dependency
# ---------------------------------------------------------------------------
async def current_user(authorization: Optional[str] = Header(None)) -> dict:
from auth import get_user_for_token
token = None
if authorization and authorization.startswith("Bearer "):
token = authorization[7:]
if not token:
raise HTTPException(status_code=401, detail="Nicht angemeldet")
user = get_user_for_token(token)
if not user:
raise HTTPException(status_code=401, detail="Ungültiger oder abgelaufener Token")
return user
# ---------------------------------------------------------------------------
# Auth endpoints (no current_user dependency — these are unauthenticated)
# ---------------------------------------------------------------------------
@router.post("/login")
async def login(body: dict):
from auth import authenticate
username = body.get("username", "")
password = body.get("password", "")
if not username or not password:
raise HTTPException(status_code=400, detail="Benutzername und Passwort erforderlich")
token = authenticate(username, password)
if not token:
raise HTTPException(status_code=401, detail="Ungültige Anmeldedaten")
return {"token": token, "username": username}
@router.post("/logout")
async def logout(authorization: Optional[str] = Header(None)):
from auth import invalidate_token
if authorization and authorization.startswith("Bearer "):
invalidate_token(authorization[7:])
return {"ok": True}
# ---------------------------------------------------------------------------
# Protected endpoints
# ---------------------------------------------------------------------------
@router.get("/status")
async def get_status(user: dict = Depends(current_user)):
return {"status": state.status, "username": user["username"]}
@router.post("/toggle")
async def toggle_recording(user: dict = Depends(current_user)):
from api.pipeline import run_pipeline
if state.status == Status.RECORDING:
asyncio.create_task(run_pipeline())
return {"action": "stopped"}
if state.status == Status.IDLE:
from audio import AudioRecorder
state._recorder = AudioRecorder()
state._recorder.start()
state.recording_user = user["username"]
state._recording_output_dir = os.path.join(user["output_dir"], user["username"])
state._recording_instructions = user.get("instructions", "")
await state.set_status(Status.RECORDING)
return {"action": "started"}
return {"action": "busy", "status": state.status}
@router.post("/instructions")
async def set_instructions(body: dict, user: dict = Depends(current_user)):
user["instructions"] = body.get("instructions", "")
return {"ok": True}
@router.get("/transcripts")
async def get_transcripts(user: dict = Depends(current_user)):
user_dir = os.path.join(user["output_dir"], user["username"])
return list_transcripts(user_dir)
@router.get("/config")
async def get_config(user: dict = Depends(current_user)):
return load_config()
@router.put("/config")
async def put_config(body: dict, user: dict = Depends(current_user)):
if not user.get("is_admin"):
raise HTTPException(status_code=403, detail="Nur Administratoren können die Config ändern")
cfg = load_config()
cfg.update(body)
return cfg
@router.post("/open")
async def open_file(body: dict, user: dict = Depends(current_user)):
import subprocess
path = body.get("path", "")
# Only allow opening files within the user's own output directory
user_dir = os.path.join(user["output_dir"], user["username"])
if path and os.path.exists(path) and os.path.abspath(path).startswith(os.path.abspath(user_dir)):
subprocess.Popen(["xdg-open", path])
return {"ok": True}
@router.websocket("/ws")
async def websocket_endpoint(ws: WebSocket, token: str = ""):
from auth import get_user_for_token
user = get_user_for_token(token)
if not user:
await ws.close(code=4001)
return
await ws.accept()
_ws_clients.append(ws)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast(message: dict):
for ws in list(_ws_clients):
try:
await ws.send_json(message)
except Exception:
if ws in _ws_clients:
_ws_clients.remove(ws)
+36
View File
@@ -0,0 +1,36 @@
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
class Status(str, Enum):
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
ERROR = "error"
@dataclass
class AppState:
status: Status = Status.IDLE
recording_user: str | None = None # which user triggered the current recording
last_error: str | None = None
_listeners: list[Callable] = field(default_factory=list, repr=False)
def subscribe(self, callback: Callable):
self._listeners.append(callback)
async def notify(self):
for cb in self._listeners:
if asyncio.iscoroutinefunction(cb):
await cb(self)
else:
cb(self)
async def set_status(self, status: Status):
self.status = status
await self.notify()
state = AppState()
+46
View File
@@ -0,0 +1,46 @@
import wave
import threading
import numpy as np
class AudioRecorder:
def __init__(self, sample_rate: int = 16000):
self.sample_rate = sample_rate
self._buffer: list[np.ndarray] = []
self._stream = None
self.is_recording = False
self._lock = threading.Lock()
def _callback(self, indata, frames, time, status):
if self.is_recording:
with self._lock:
self._buffer.append(indata[:, 0].copy().astype(np.int16))
def start(self):
import sounddevice as sd
self._buffer = []
self.is_recording = True
self._stream = sd.InputStream(
samplerate=self.sample_rate,
channels=1,
dtype="int16",
callback=self._callback,
)
self._stream.start()
def stop(self):
self.is_recording = False
if self._stream:
self._stream.stop()
self._stream.close()
self._stream = None
def save_wav(self, path: str) -> str:
with self._lock:
data = np.concatenate(self._buffer) if self._buffer else np.zeros(0, dtype=np.int16)
with wave.open(path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(self.sample_rate)
wf.writeframes(data.tobytes())
return path
+128
View File
@@ -0,0 +1,128 @@
import getpass
import hashlib
import os
import secrets
import tomllib
from typing import Optional
import tomli_w
USERS_PATH = os.path.expanduser("~/.config/tueit-transcriber/users.toml")
# In-memory session store: token → username
# Users must re-login after server restart — acceptable for a desktop app.
_sessions: dict[str, str] = {}
def _hash_password(password: str) -> str:
salt = secrets.token_hex(16)
key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex()
return f"{salt}:{key}"
def _verify_password(password: str, stored: str) -> bool:
try:
salt, key = stored.split(":", 1)
except ValueError:
return False
new_key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000).hex()
return secrets.compare_digest(new_key, key)
# ── User store ─────────────────────────────────────────────────────────────────
def has_users() -> bool:
return bool(_load_users())
def _load_users() -> dict:
if not os.path.exists(USERS_PATH):
return {}
with open(USERS_PATH, "rb") as f:
return tomllib.load(f).get("users", {})
def _save_users(users: dict):
os.makedirs(os.path.dirname(USERS_PATH), exist_ok=True)
with open(USERS_PATH, "wb") as f:
tomli_w.dump({"users": users}, f)
def create_user(username: str, password: str, output_dir: str, is_admin: bool = False):
users = _load_users()
users[username] = {
"password_hash": _hash_password(password),
"output_dir": output_dir,
"is_admin": is_admin,
}
_save_users(users)
# ── Session management ─────────────────────────────────────────────────────────
def authenticate(username: str, password: str) -> Optional[str]:
"""Verify credentials. Returns a session token on success, None on failure."""
users = _load_users()
user = users.get(username)
if not user:
return None
if not _verify_password(password, user["password_hash"]):
return None
token = secrets.token_urlsafe(32)
_sessions[token] = username
return token
def get_user_for_token(token: str) -> Optional[dict]:
"""Return user info dict for a valid token, or None."""
username = _sessions.get(token)
if not username:
return None
users = _load_users()
user = users.get(username)
if not user:
return None
return {
"username": username,
"output_dir": user["output_dir"],
"is_admin": user.get("is_admin", False),
}
def invalidate_token(token: str):
_sessions.pop(token, None)
# ── First-run setup wizard ─────────────────────────────────────────────────────
def setup_wizard():
"""Interactive console setup. Runs when no users exist yet."""
print("\n=== tüit Transkriptor — Ersteinrichtung ===\n")
print("Bitte richte den ersten Nutzer ein (wird Administrator).\n")
while True:
username = input("Benutzername: ").strip()
if username:
break
print("Benutzername darf nicht leer sein.")
while True:
password = getpass.getpass("Passwort: ")
confirm = getpass.getpass("Passwort bestätigen: ")
if password != confirm:
print("Passwörter stimmen nicht überein.")
continue
if len(password) < 6:
print("Passwort muss mindestens 6 Zeichen lang sein.")
continue
break
default_dir = os.path.expanduser(f"~/Transkripte/{username}")
answer = input(f"Transkripte speichern unter [{default_dir}]: ").strip()
output_dir = answer if answer else default_dir
create_user(username, password, output_dir, is_admin=True)
print(f"\nNutzer '{username}' wurde angelegt.")
print(f"Transkripte werden gespeichert unter: {output_dir}")
print("\nWeitere Nutzer können später über die Web-Oberfläche hinzugefügt werden.\n")
+61
View File
@@ -0,0 +1,61 @@
import os
import tomllib
CONFIG_PATH = os.path.expanduser("~/.config/tueit-transcriber/config.toml")
DEFAULTS = {
"ollama": {
"base_url": "http://localhost:11434",
"model": "gemma3:12b",
},
"whisper": {
"model": "large-v3",
"language": "de",
"device": "auto", # "auto" = use GPU if ROCm available, else CPU
},
"server": {
"port": 8765,
},
"output": {
"path": os.path.expanduser(
"~/cloud.shron.de/Hetzner Storagebox/work"
),
},
"network": {
"host": "127.0.0.1",
},
"pid_file": os.path.expanduser("~/.local/run/tueit-transcriber.pid"),
}
def load() -> dict:
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
if not os.path.exists(CONFIG_PATH):
_write_defaults()
with open(CONFIG_PATH, "rb") as f:
on_disk = tomllib.load(f)
return _deep_merge(DEFAULTS, on_disk)
def _deep_merge(base: dict, override: dict) -> dict:
result = dict(base)
for k, v in override.items():
if k in result and isinstance(result[k], dict) and isinstance(v, dict):
result[k] = _deep_merge(result[k], v)
else:
result[k] = v
return result
def _write_defaults():
try:
import tomli_w
with open(CONFIG_PATH, "wb") as f:
tomli_w.dump(DEFAULTS, f)
except ImportError:
with open(CONFIG_PATH, "w") as f:
f.write("# tüit Transkriptor config\n\n")
f.write('[ollama]\nbase_url = "http://localhost:11434"\nmodel = "gemma3:12b"\n\n')
f.write('[whisper]\nmodel = "large-v3"\nlanguage = "de"\ndevice = "auto"\n\n')
f.write('[server]\nport = 8765\n\n')
f.write(f'[output]\npath = "{DEFAULTS["output"]["path"]}"\n')
+122
View File
@@ -0,0 +1,122 @@
const btn = document.getElementById('record-btn');
const statusText = document.getElementById('status-text');
const headerStatus = document.getElementById('header-status');
const preview = document.getElementById('preview');
const instructionsEl = document.getElementById('instructions');
const transcriptList = document.getElementById('transcript-list');
const userChip = document.getElementById('user-chip');
const logoutBtn = document.getElementById('logout-btn');
const STATUS_LABELS = {
idle: 'Bereit',
recording: 'Aufnahme läuft\u2026',
processing: 'Wird verarbeitet\u2026',
error: 'Fehler',
};
// Auth token is stored in sessionStorage so it's gone when the tab closes.
// On first load, if no token is present the server will redirect to /login.
const token = sessionStorage.getItem('token');
function authHeaders() {
return token ? { 'Authorization': `Bearer ${token}` } : {};
}
function apiFetch(url, options = {}) {
return fetch(url, {
...options,
headers: { 'Content-Type': 'application/json', ...authHeaders(), ...(options.headers || {}) },
});
}
logoutBtn.addEventListener('click', () => {
apiFetch('/logout', { method: 'POST' }).finally(() => {
sessionStorage.removeItem('token');
location.href = '/login';
});
});
instructionsEl.addEventListener('input', async () => {
await apiFetch('/instructions', {
method: 'POST',
body: JSON.stringify({ instructions: instructionsEl.value }),
});
});
function setStatus(status) {
btn.className = status;
headerStatus.className = `status-badge ${status}`;
const label = STATUS_LABELS[status] || status;
statusText.textContent = label;
headerStatus.textContent = label;
btn.disabled = status === 'processing';
}
btn.addEventListener('click', () => apiFetch('/toggle', { method: 'POST' }));
function connectWs() {
const proto = location.protocol === 'https:' ? 'wss:' : 'ws:';
const ws = new WebSocket(`${proto}//${location.host}/ws?token=${encodeURIComponent(token || '')}`);
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.event === 'processing') setStatus('processing');
if (msg.event === 'transcribed' || msg.event === 'refined') {
const text = msg.raw || msg.markdown || '';
preview.textContent = text;
preview.classList.add('has-content');
}
if (msg.event === 'saved') {
setStatus('idle');
loadTranscripts();
}
if (msg.event === 'error') {
setStatus('idle');
preview.textContent = `Fehler: ${msg.message}`;
}
};
ws.onclose = () => setTimeout(connectWs, 2000);
}
async function loadTranscripts() {
const r = await apiFetch('/transcripts');
if (!r.ok) return;
const items = await r.json();
transcriptList.replaceChildren(
...items.map((t) => {
const div = document.createElement('div');
div.className = 'transcript-item';
const name = document.createElement('span');
name.textContent = t.filename.replace('.md', '');
const meta = document.createElement('span');
meta.className = 'meta';
meta.textContent = `${Math.round(t.size / 1024 * 10) / 10} KB`;
div.append(name, meta);
div.addEventListener('click', () => {
apiFetch('/open', {
method: 'POST',
body: JSON.stringify({ path: t.path }),
});
});
return div;
})
);
}
(async () => {
const r = await apiFetch('/status');
if (r.status === 401) {
location.href = '/login';
return;
}
const data = await r.json();
setStatus(data.status);
if (data.username) {
userChip.textContent = data.username;
}
connectWs();
loadTranscripts();
})();
+170
View File
@@ -0,0 +1,170 @@
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>tüit Transkriptor</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link href="https://fonts.googleapis.com/css2?family=Overpass:wght@300;400;600;700&display=swap" rel="stylesheet">
<style>
:root {
--red: #DA251C;
--yellow: #FFD802;
--bg: #111;
--surface: #1a1a1a;
--surface2: #232323;
--text: #e8e8e8;
--muted: #888;
--border: #2e2e2e;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: 'Overpass', system-ui, sans-serif;
background: var(--bg);
color: var(--text);
min-height: 100vh;
display: flex;
flex-direction: column;
}
header {
display: flex;
align-items: center;
gap: 12px;
padding: 16px 24px;
border-bottom: 1px solid var(--border);
}
.logo-dot { width: 12px; height: 12px; background: var(--red); border-radius: 50%; }
header h1 { font-size: 1.1rem; font-weight: 600; letter-spacing: 0.04em; }
header h1 span { color: var(--red); }
.header-right { margin-left: auto; display: flex; align-items: center; gap: 12px; }
.status-badge {
font-size: 0.75rem;
padding: 4px 10px;
border-radius: 20px;
background: var(--surface2);
color: var(--muted);
text-transform: uppercase;
letter-spacing: 0.08em;
}
.status-badge.recording { background: var(--red); color: #fff; }
.status-badge.processing { background: var(--yellow); color: #111; }
.user-chip {
font-size: 0.75rem;
padding: 4px 10px;
border-radius: 20px;
background: var(--surface2);
color: var(--muted);
letter-spacing: 0.04em;
}
.logout-btn {
font-size: 0.75rem;
padding: 4px 10px;
border-radius: 20px;
background: none;
border: 1px solid var(--border);
color: var(--muted);
cursor: pointer;
font-family: inherit;
transition: border-color 0.15s, color 0.15s;
}
.logout-btn:hover { border-color: var(--red); color: var(--red); }
main {
flex: 1;
display: flex;
flex-direction: column;
gap: 20px;
padding: 24px;
max-width: 800px;
width: 100%;
margin: 0 auto;
}
.record-section { display: flex; flex-direction: column; align-items: center; gap: 16px; }
#record-btn {
width: 96px; height: 96px; border-radius: 50%;
background: var(--surface2); border: 3px solid var(--border);
cursor: pointer; transition: all 0.15s ease;
display: flex; align-items: center; justify-content: center;
outline: none;
}
#record-btn:hover { border-color: var(--red); }
#record-btn.recording { background: var(--red); border-color: var(--red); animation: pulse 1.4s infinite; }
#record-btn.processing { background: var(--yellow); border-color: var(--yellow); cursor: default; }
@keyframes pulse {
0%,100% { box-shadow: 0 0 0 0 rgba(218,37,28,0.4); }
50% { box-shadow: 0 0 0 16px rgba(218,37,28,0); }
}
.mic-icon { width: 36px; height: 36px; fill: var(--text); }
#record-btn.recording .mic-icon { fill: #fff; }
#record-btn.processing .mic-icon { fill: #111; }
#status-text { font-size: 0.85rem; color: var(--muted); }
.instructions-section { display: flex; flex-direction: column; gap: 8px; }
label { font-size: 0.8rem; color: var(--muted); text-transform: uppercase; letter-spacing: 0.06em; }
textarea {
background: var(--surface); border: 1px solid var(--border);
color: var(--text); border-radius: 8px; padding: 12px;
font-family: inherit; font-size: 0.9rem; resize: vertical;
min-height: 80px; outline: none; transition: border-color 0.15s;
}
textarea:focus { border-color: var(--yellow); }
textarea::placeholder { color: var(--muted); }
.preview-section { display: flex; flex-direction: column; gap: 8px; }
#preview {
background: var(--surface); border: 1px solid var(--border);
border-radius: 8px; padding: 16px;
font-size: 0.85rem; line-height: 1.6; color: var(--muted);
min-height: 60px; white-space: pre-wrap; word-break: break-word;
}
#preview.has-content { color: var(--text); }
.transcripts-section { display: flex; flex-direction: column; gap: 8px; }
#transcript-list { display: flex; flex-direction: column; gap: 6px; }
.transcript-item {
background: var(--surface); border: 1px solid var(--border);
border-radius: 6px; padding: 10px 14px;
display: flex; align-items: center; justify-content: space-between;
font-size: 0.82rem; cursor: pointer; transition: border-color 0.1s;
}
.transcript-item:hover { border-color: var(--red); }
.transcript-item .meta { color: var(--muted); font-size: 0.75rem; }
</style>
</head>
<body>
<header>
<div class="logo-dot"></div>
<h1>tüit <span>Transkriptor</span></h1>
<div class="header-right">
<span class="status-badge" id="header-status">Bereit</span>
<span class="user-chip" id="user-chip"></span>
<button class="logout-btn" id="logout-btn">Abmelden</button>
</div>
</header>
<main>
<section class="record-section">
<button id="record-btn" title="Aufnahme starten / stoppen">
<svg class="mic-icon" viewBox="0 0 24 24" xmlns="http://www.w3.org/2000/svg">
<path d="M12 1a4 4 0 0 1 4 4v6a4 4 0 0 1-8 0V5a4 4 0 0 1 4-4zm0 2a2 2 0 0 0-2 2v6a2 2 0 0 0 4 0V5a2 2 0 0 0-2-2zM6.5 10.5A5.5 5.5 0 0 0 12 16a5.5 5.5 0 0 0 5.5-5.5h2A7.5 7.5 0 0 1 13 17.93V21h2v2H9v-2h2v-3.07A7.5 7.5 0 0 1 4.5 10.5h2z"/>
</svg>
</button>
<span id="status-text">Klicken zum Starten</span>
</section>
<section class="instructions-section">
<label for="instructions">Instruktionen für den Sekretär</label>
<textarea
id="instructions"
placeholder="z.B. &quot;Heb die wichtigsten Punkte hervor&quot; · &quot;Erstelle ein Ticket&quot; · &quot;Mach ein Angebot daraus&quot;"
></textarea>
</section>
<section class="preview-section">
<label>Vorschau</label>
<div id="preview">Noch keine Aufnahme verarbeitet.</div>
</section>
<section class="transcripts-section">
<label>Meine Transkripte</label>
<div id="transcript-list"></div>
</section>
</main>
<script src="/app.js"></script>
</body>
</html>
+151
View File
@@ -0,0 +1,151 @@
<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>tüit Transkriptor — Anmelden</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link href="https://fonts.googleapis.com/css2?family=Overpass:wght@300;400;600;700&display=swap" rel="stylesheet">
<style>
:root {
--red: #DA251C;
--yellow: #FFD802;
--bg: #111;
--surface: #1a1a1a;
--surface2: #232323;
--text: #e8e8e8;
--muted: #888;
--border: #2e2e2e;
}
* { box-sizing: border-box; margin: 0; padding: 0; }
body {
font-family: 'Overpass', system-ui, sans-serif;
background: var(--bg);
color: var(--text);
min-height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
}
.card {
background: var(--surface);
border: 1px solid var(--border);
border-radius: 12px;
padding: 40px;
width: 100%;
max-width: 360px;
}
.logo {
display: flex;
align-items: center;
gap: 10px;
margin-bottom: 32px;
}
.logo-dot { width: 12px; height: 12px; background: var(--red); border-radius: 50%; flex-shrink: 0; }
.logo h1 { font-size: 1.1rem; font-weight: 600; letter-spacing: 0.04em; }
.logo h1 span { color: var(--red); }
.field { display: flex; flex-direction: column; gap: 6px; margin-bottom: 16px; }
label { font-size: 0.78rem; color: var(--muted); text-transform: uppercase; letter-spacing: 0.06em; }
input {
background: var(--surface2);
border: 1px solid var(--border);
color: var(--text);
border-radius: 8px;
padding: 10px 12px;
font-family: inherit;
font-size: 0.95rem;
outline: none;
transition: border-color 0.15s;
width: 100%;
}
input:focus { border-color: var(--yellow); }
input::placeholder { color: var(--muted); }
button[type="submit"] {
width: 100%;
margin-top: 8px;
padding: 12px;
background: var(--red);
color: #fff;
border: none;
border-radius: 8px;
font-family: inherit;
font-size: 1rem;
font-weight: 600;
cursor: pointer;
transition: opacity 0.15s;
}
button[type="submit"]:hover { opacity: 0.88; }
button[type="submit"]:disabled { opacity: 0.5; cursor: default; }
#error {
display: none;
margin-top: 14px;
padding: 10px 12px;
background: rgba(218, 37, 28, 0.12);
border: 1px solid rgba(218, 37, 28, 0.4);
border-radius: 6px;
font-size: 0.85rem;
color: #ff6b6b;
}
</style>
</head>
<body>
<div class="card">
<div class="logo">
<div class="logo-dot"></div>
<h1>tüit <span>Transkriptor</span></h1>
</div>
<form id="login-form">
<div class="field">
<label for="username">Benutzername</label>
<input type="text" id="username" name="username" autocomplete="username" autofocus placeholder="Benutzername">
</div>
<div class="field">
<label for="password">Passwort</label>
<input type="password" id="password" name="password" autocomplete="current-password" placeholder="Passwort">
</div>
<button type="submit" id="submit-btn">Anmelden</button>
<div id="error"></div>
</form>
</div>
<script>
const form = document.getElementById('login-form');
const errorEl = document.getElementById('error');
const submitBtn = document.getElementById('submit-btn');
form.addEventListener('submit', async (e) => {
e.preventDefault();
errorEl.style.display = 'none';
submitBtn.disabled = true;
submitBtn.textContent = 'Anmelden…';
// Read values directly — no innerHTML with untrusted data
const username = document.getElementById('username').value;
const password = document.getElementById('password').value;
try {
const r = await fetch('/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username, password }),
});
if (r.ok) {
const data = await r.json();
sessionStorage.setItem('token', data.token);
location.href = '/';
} else {
const data = await r.json().catch(() => ({}));
errorEl.textContent = data.detail || 'Anmeldung fehlgeschlagen.';
errorEl.style.display = 'block';
}
} catch {
errorEl.textContent = 'Server nicht erreichbar.';
errorEl.style.display = 'block';
} finally {
submitBtn.disabled = false;
submitBtn.textContent = 'Anmelden';
}
});
</script>
</body>
</html>
Executable
+88
View File
@@ -0,0 +1,88 @@
#!/usr/bin/env bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SERVICE_NAME="tueit-transcriber"
SERVICE_FILE="$HOME/.config/systemd/user/${SERVICE_NAME}.service"
echo "=== tüit Transkriptor Installer ==="
command -v python3 >/dev/null 2>&1 || { echo "ERROR: python3 not found"; exit 1; }
if ! command -v ollama >/dev/null 2>&1; then
echo "WARNING: ollama not found. Install from https://ollama.com"
echo " Nach der Installation: ollama pull gemma3:12b"
fi
if command -v rocminfo >/dev/null 2>&1; then
echo "ROCm erkannt — GPU-Beschleunigung verfügbar"
else
echo "INFO: ROCm nicht gefunden — Whisper läuft auf der CPU (langsamer)"
echo " Für GPU: sudo pacman -S rocm-hip-sdk"
fi
echo "Python-Abhängigkeiten werden installiert..."
pip install --user -r "$SCRIPT_DIR/requirements.txt"
# ── Netzwerk-Modus abfragen ────────────────────────────────────────────────────
echo ""
echo "Soll die App auch von anderen Geräten im Heimnetz erreichbar sein?"
echo " [1] Nur lokal (Standard, sicherer)"
echo " [2] Im Heimnetz (Windows, Android, andere Linux-Geräte)"
read -r -p "Auswahl [1/2]: " NET_MODE
if [[ "$NET_MODE" == "2" ]]; then
HOST="0.0.0.0"
echo "INFO: App wird auf allen Netzwerk-Interfaces gestartet."
echo " Firewall: sudo ufw allow 8765/tcp"
else
HOST="127.0.0.1"
fi
# Netzwerk-Host in Config schreiben, falls noch nicht vorhanden
CFG_FILE="$HOME/.config/tueit-transcriber/config.toml"
mkdir -p "$(dirname "$CFG_FILE")"
if ! grep -q "\[network\]" "$CFG_FILE" 2>/dev/null; then
printf '\n[network]\nhost = "%s"\n' "$HOST" >> "$CFG_FILE"
echo "Config aktualisiert: $CFG_FILE"
fi
# ── Systemd User Service ───────────────────────────────────────────────────────
mkdir -p "$HOME/.config/systemd/user"
cat > "$SERVICE_FILE" <<EOF
[Unit]
Description=tüit Transkriptor
After=graphical-session.target
[Service]
ExecStart=$(command -v python3) ${SCRIPT_DIR}/main.py
Restart=on-failure
RestartSec=5
Environment=DISPLAY=:0
Environment=DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/$(id -u)/bus
[Install]
WantedBy=default.target
EOF
systemctl --user daemon-reload
systemctl --user enable "$SERVICE_NAME"
systemctl --user start "$SERVICE_NAME"
echo ""
echo "=== Fertig ==="
echo "Status: systemctl --user status $SERVICE_NAME"
echo "Logs: journalctl --user -u $SERVICE_NAME -f"
echo ""
echo "KDE-Hotkey einrichten:"
echo " Systemeinstellungen → Kurzbefehle → Eigene Kurzbefehle"
echo " Befehl: pkill -USR1 -f main.py"
echo ""
if [[ "$NET_MODE" == "2" ]]; then
echo "Netzwerk-Zugriff: http://$(hostname -I | awk '{print $1}'):8765"
echo "Tipp: Seite als Lesezeichen auf Handy/PC speichern."
echo ""
fi
echo "Erster Start: ollama pull gemma3:12b"
+44
View File
@@ -0,0 +1,44 @@
import httpx
SYSTEM_PROMPT = """Du bist ein präziser Schreibassistent.
Du bekommst einen rohen Sprachtranskript und optionale Instruktionen des Nutzers.
Deine Aufgabe:
1. Bereinige den Text (Füllwörter, Wiederholungen, Tippfehler)
2. Strukturiere ihn mit Markdown-Überschriften wenn sinnvoll
3. Erzeuge einen passenden deutschen Titel als H1
4. Beachte Instruktionen des Nutzers wenn vorhanden
5. Antworte NUR mit dem fertigen Markdown — kein Kommentar, keine Erklärung
Format:
# Titel
Inhalt...
"""
class OllamaClient:
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
async def list_models(self) -> list[str]:
async with httpx.AsyncClient() as client:
r = await client.get(f"{self.base_url}/api/tags")
r.raise_for_status()
return [m["name"] for m in r.json().get("models", [])]
async def refine(
self,
raw_text: str,
instructions: str = "",
model: str = "gemma3:12b",
) -> str:
prompt = f"Transkript:\n{raw_text}"
if instructions.strip():
prompt += f"\n\nInstruktionen:\n{instructions.strip()}"
async with httpx.AsyncClient(timeout=120) as client:
r = await client.post(
f"{self.base_url}/api/generate",
json={"model": model, "prompt": prompt, "system": SYSTEM_PROMPT, "stream": False},
)
r.raise_for_status()
return r.json()["response"]
+160
View File
@@ -0,0 +1,160 @@
import asyncio
import os
import signal
import threading
import webbrowser
from pathlib import Path
import uvicorn
from fastapi import FastAPI
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
import pystray
from PIL import Image, ImageDraw
from api.router import router
from api.state import state, Status
from config import load as load_config
# ── FastAPI ────────────────────────────────────────────────────────────────────
app = FastAPI(title="tüit Transkriptor")
app.include_router(router)
FRONTEND_DIR = Path(__file__).parent / "frontend"
@app.get("/")
async def index():
return FileResponse(str(FRONTEND_DIR / "index.html"))
@app.get("/login")
async def login_page():
return FileResponse(str(FRONTEND_DIR / "login.html"))
@app.get("/app.js")
async def appjs():
return FileResponse(str(FRONTEND_DIR / "app.js"))
# ── PID file ───────────────────────────────────────────────────────────────────
def write_pid(pid_path: str):
os.makedirs(os.path.dirname(pid_path), exist_ok=True)
Path(pid_path).write_text(str(os.getpid()))
def remove_pid(pid_path: str):
try:
os.unlink(pid_path)
except FileNotFoundError:
pass
# ── SIGUSR1 → toggle ──────────────────────────────────────────────────────────
# We capture uvicorn's event loop after it starts, so the signal handler can
# schedule the toggle coroutine in the correct loop — not a separate one.
_uvicorn_loop: asyncio.AbstractEventLoop | None = None
def _sigusr1_handler(signum, frame):
if _uvicorn_loop:
_uvicorn_loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_async_toggle(), loop=_uvicorn_loop)
)
async def _async_toggle():
from api.router import toggle_recording
# Toggle without a real user dependency — use guest context for signal-triggered recordings.
from api.router import _guest_user
await toggle_recording(user=_guest_user())
# ── Tray ───────────────────────────────────────────────────────────────────────
def _make_icon(recording: bool = False) -> Image.Image:
img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
color = (218, 37, 28, 255) if recording else (80, 80, 80, 255)
draw.ellipse([8, 8, 56, 56], fill=color)
return img
def run_tray(port: int):
icon = pystray.Icon(
"tueit-transcriber",
_make_icon(False),
"tüit Transkriptor",
menu=pystray.Menu(
pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: (
_uvicorn_loop and _uvicorn_loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_async_toggle(), loop=_uvicorn_loop)
)
), default=True),
pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")),
pystray.MenuItem("Beenden", lambda i, it: (icon.stop(), os._exit(0))),
),
)
def update_icon(s):
icon.icon = _make_icon(s.status == Status.RECORDING)
state.subscribe(update_icon)
icon.run()
# ── Server ─────────────────────────────────────────────────────────────────────
class _LoopCapture(uvicorn.Server):
"""Subclass that exposes its event loop for the SIGUSR1 handler."""
def install_signal_handlers(self):
# Disable uvicorn's own signal handlers so our SIGUSR1 handler works.
pass
async def startup(self, sockets=None):
global _uvicorn_loop
_uvicorn_loop = asyncio.get_running_loop()
await super().startup(sockets=sockets)
def run_server(config: uvicorn.Config):
server = _LoopCapture(config)
server.run()
# ── Entrypoint ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
from auth import setup_wizard, has_users
if not has_users():
setup_wizard()
cfg = load_config()
port = cfg["server"]["port"]
host = cfg.get("network", {}).get("host", "127.0.0.1")
pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid"))
write_pid(pid_path)
signal.signal(signal.SIGUSR1, _sigusr1_handler)
uvicorn_cfg = uvicorn.Config(app, host=host, port=port, log_level="warning")
server_thread = threading.Thread(target=run_server, args=(uvicorn_cfg,), daemon=True)
server_thread.start()
# Wait until uvicorn has captured its loop
import time
for _ in range(50):
if _uvicorn_loop is not None:
break
time.sleep(0.1)
webbrowser.open(f"http://localhost:{port}")
try:
run_tray(port)
finally:
remove_pid(pid_path)
+50
View File
@@ -0,0 +1,50 @@
import os
import re
import unicodedata
from datetime import datetime
def slugify(text: str) -> str:
for src, dst in [("ä","a"),("ö","o"),("ü","u"),("Ä","a"),("Ö","o"),("Ü","u"),("ß","ss")]:
text = text.replace(src, dst)
text = unicodedata.normalize("NFKD", text)
text = "".join(c for c in text if unicodedata.category(c) != "Mn")
text = text.lower()
text = re.sub(r"[^a-z0-9]+", "-", text)
return text.strip("-")
def save_transcript(
title: str,
content: str,
output_dir: str,
dt: datetime | None = None,
) -> str:
if dt is None:
dt = datetime.now()
slug = slugify(title)[:60]
filename = f"{dt.strftime('%Y-%m-%d-%H%M')}-{slug}.md"
os.makedirs(output_dir, exist_ok=True)
path = os.path.join(output_dir, filename)
with open(path, "w", encoding="utf-8") as f:
f.write(f"---\ndate: {dt.isoformat(timespec='seconds')}\ntags: [transkript]\n---\n\n")
f.write(f"# {title}\n\n")
f.write(content)
if not content.endswith("\n"):
f.write("\n")
return path
def list_transcripts(output_dir: str, limit: int = 20) -> list[dict]:
if not os.path.exists(output_dir):
return []
files = sorted(
[f for f in os.listdir(output_dir) if f.endswith(".md")],
reverse=True,
)[:limit]
result = []
for f in files:
full = os.path.join(output_dir, f)
stat = os.stat(full)
result.append({"filename": f, "path": full, "size": stat.st_size, "mtime": stat.st_mtime})
return result
+2
View File
@@ -0,0 +1,2 @@
[pytest]
asyncio_mode = auto
+11
View File
@@ -0,0 +1,11 @@
fastapi>=0.111
uvicorn[standard]>=0.29
pystray>=0.19
Pillow>=10.0
sounddevice>=0.4.6
faster-whisper>=1.0.3
httpx>=0.27
numpy>=1.26
tomli_w>=1.0
pytest>=8.0
pytest-asyncio>=0.23
View File
+60
View File
@@ -0,0 +1,60 @@
from fastapi.testclient import TestClient
_TEST_USER = {"username": "testuser", "output_dir": "/tmp", "is_admin": False}
def make_app():
from fastapi import FastAPI
from api.router import router, current_user
app = FastAPI()
# Override auth for tests — no real credentials needed
app.dependency_overrides[current_user] = lambda: _TEST_USER
app.include_router(router)
return app
def test_status_returns_idle():
client = TestClient(make_app())
r = client.get("/status")
assert r.status_code == 200
assert r.json()["status"] == "idle"
assert r.json()["username"] == "testuser"
def test_config_get_returns_dict():
client = TestClient(make_app())
r = client.get("/config")
assert r.status_code == 200
assert "ollama" in r.json()
def test_transcripts_returns_list():
client = TestClient(make_app())
r = client.get("/transcripts")
assert r.status_code == 200
assert isinstance(r.json(), list)
def test_status_requires_auth():
from fastapi import FastAPI
from api.router import router
app = FastAPI()
app.include_router(router)
client = TestClient(app, raise_server_exceptions=False)
r = client.get("/status")
assert r.status_code == 401
def test_login_rejects_wrong_credentials():
import tempfile, os
from unittest.mock import patch
from fastapi import FastAPI
from api.router import router
app = FastAPI()
app.include_router(router)
client = TestClient(app, raise_server_exceptions=False)
with tempfile.TemporaryDirectory() as tmpdir:
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
r = client.post("/login", json={"username": "nobody", "password": "wrong"})
assert r.status_code == 401
+29
View File
@@ -0,0 +1,29 @@
import numpy as np
from unittest.mock import patch, MagicMock
def test_recorder_starts_and_stops():
from audio import AudioRecorder
with patch("sounddevice.InputStream") as MockStream:
mock_stream = MagicMock()
MockStream.return_value.start = MagicMock()
MockStream.return_value.stop = MagicMock()
MockStream.return_value.close = MagicMock()
recorder = AudioRecorder(sample_rate=16000)
assert not recorder.is_recording
recorder._stream = MockStream.return_value
recorder.is_recording = True
recorder.stop()
assert not recorder.is_recording
def test_recorder_save_wav(tmp_path):
import wave
from audio import AudioRecorder
recorder = AudioRecorder(sample_rate=16000)
recorder._buffer = [np.zeros(1600, dtype=np.int16)]
out = str(tmp_path / "test.wav")
recorder.save_wav(out)
with wave.open(out) as wf:
assert wf.getframerate() == 16000
assert wf.getnchannels() == 1
+91
View File
@@ -0,0 +1,91 @@
import importlib
import os
import tempfile
from unittest.mock import patch
def _fresh_auth(tmpdir):
"""Reload auth module with a temp users file and clear sessions."""
import auth
importlib.reload(auth)
auth._sessions.clear()
return os.path.join(tmpdir, "users.toml")
def test_has_users_false_when_empty():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
assert not auth.has_users()
def test_create_and_authenticate():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
auth._sessions.clear()
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
auth.create_user("thomas", "geheim123", "/tmp/transkripte", is_admin=True)
token = auth.authenticate("thomas", "geheim123")
assert token is not None
assert len(token) > 10
def test_authenticate_wrong_password():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
auth._sessions.clear()
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
auth.create_user("thomas", "geheim123", "/tmp/transkripte")
assert auth.authenticate("thomas", "falsch") is None
def test_authenticate_unknown_user():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
assert auth.authenticate("niemand", "irgendwas") is None
def test_get_user_for_token():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
auth._sessions.clear()
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
auth.create_user("anna", "secret456", "/tmp/anna")
token = auth.authenticate("anna", "secret456")
user = auth.get_user_for_token(token)
assert user["username"] == "anna"
assert user["output_dir"] == "/tmp/anna"
def test_invalidate_token():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
auth._sessions.clear()
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
auth.create_user("bob", "pass789!", "/tmp/bob")
token = auth.authenticate("bob", "pass789!")
auth.invalidate_token(token)
assert auth.get_user_for_token(token) is None
def test_has_users_true_after_create():
with tempfile.TemporaryDirectory() as tmpdir:
import auth
importlib.reload(auth)
users_path = os.path.join(tmpdir, "users.toml")
with patch("auth.USERS_PATH", users_path):
auth.create_user("lisa", "abc123!", "/tmp/lisa")
assert auth.has_users()
+25
View File
@@ -0,0 +1,25 @@
import os
import tempfile
from unittest.mock import patch
def test_config_loads_defaults():
with tempfile.TemporaryDirectory() as tmpdir:
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
import importlib, config
importlib.reload(config)
cfg = config.load()
assert cfg["ollama"]["model"] == "gemma3:12b"
assert cfg["whisper"]["model"] == "large-v3"
assert cfg["server"]["port"] == 8765
def test_config_creates_file_on_first_run():
with tempfile.TemporaryDirectory() as tmpdir:
import importlib, config
importlib.reload(config)
cfg_path = os.path.join(tmpdir, "config.toml")
with patch("config.CONFIG_PATH", cfg_path):
config.load()
assert os.path.exists(cfg_path)
+37
View File
@@ -0,0 +1,37 @@
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
@pytest.mark.asyncio
async def test_refine_calls_ollama():
from llm import OllamaClient
mock_response = MagicMock()
mock_response.json.return_value = {"response": "# Titel\n\nInhalt."}
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as MockClient:
instance = MockClient.return_value.__aenter__.return_value
instance.post = AsyncMock(return_value=mock_response)
client = OllamaClient(base_url="http://localhost:11434")
result = await client.refine(
raw_text="Das ist ein test.",
instructions="Mach eine Zusammenfassung.",
model="gemma3:12b",
)
assert "Inhalt" in result
instance.post.assert_called_once()
@pytest.mark.asyncio
async def test_list_models_returns_list():
from llm import OllamaClient
mock_response = MagicMock()
mock_response.json.return_value = {"models": [{"name": "gemma3:12b"}, {"name": "mistral:7b"}]}
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as MockClient:
instance = MockClient.return_value.__aenter__.return_value
instance.get = AsyncMock(return_value=mock_response)
client = OllamaClient(base_url="http://localhost:11434")
models = await client.list_models()
assert "gemma3:12b" in models
+60
View File
@@ -0,0 +1,60 @@
import os
import tempfile
from datetime import datetime
def test_save_transcript_creates_file():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Test Aufnahme",
content="Dies ist ein Test.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
assert os.path.exists(path)
def test_save_transcript_filename_format():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Mein erstes Diktat",
content="Inhalt.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
assert os.path.basename(path) == "2026-04-01-1432-mein-erstes-diktat.md"
def test_save_transcript_contains_frontmatter():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Test",
content="Inhalt.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
text = open(path).read()
assert "---" in text
assert "date:" in text
assert "transkript" in text
def test_save_transcript_contains_content():
with tempfile.TemporaryDirectory() as tmpdir:
from output import save_transcript
path = save_transcript(
title="Test",
content="Das ist der Inhalt.",
output_dir=tmpdir,
dt=datetime(2026, 4, 1, 14, 32, 0),
)
assert "Das ist der Inhalt." in open(path).read()
def test_slugify():
from output import slugify
assert slugify("Mein erstes Diktat") == "mein-erstes-diktat"
assert slugify("test -- foo") == "test-foo"
+25
View File
@@ -0,0 +1,25 @@
import asyncio
from unittest.mock import MagicMock
def test_transcription_engine_is_singleton():
from transcription import engine, TranscriptionEngine
assert isinstance(engine, TranscriptionEngine)
def test_transcribe_file_calls_whisper(tmp_path):
wav = tmp_path / "test.wav"
wav.write_bytes(b"\x00" * 100)
mock_model = MagicMock()
mock_segment = MagicMock()
mock_segment.text = " Hallo Welt"
mock_model.transcribe.return_value = ([mock_segment], MagicMock())
from transcription import TranscriptionEngine
eng = TranscriptionEngine()
eng._model = mock_model
result = asyncio.run(eng.transcribe_file(str(wav), language="de"))
assert result == "Hallo Welt"
mock_model.transcribe.assert_called_once_with(str(wav), language="de")
+36
View File
@@ -0,0 +1,36 @@
import asyncio
class TranscriptionEngine:
_model = None
def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
if self._model is None:
from faster_whisper import WhisperModel
if device == "auto":
try:
self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
except Exception:
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
else:
compute = "float16" if device in ("cuda", "rocm") else "int8"
self._model = WhisperModel(model_name, device=device, compute_type=compute)
return self._model
async def transcribe_file(
self,
audio_path: str,
language: str = "de",
model_name: str = "large-v3",
device: str = "auto",
) -> str:
loop = asyncio.get_event_loop()
model = self._get_model(model_name, device)
segments, _ = await loop.run_in_executor(
None,
lambda: model.transcribe(audio_path, language=language),
)
return "".join(seg.text for seg in segments).strip()
engine = TranscriptionEngine()