Files
thomas.kopp 04b655e664 fix: use sounddevice names for audio device list and combined source
- /audio/devices now returns sounddevice device names (not pactl source names)
  so the stored device name works directly with sd.InputStream
- /audio/combined maps sounddevice names back to pactl source names via
  description matching for the loopback commands
- Combined sink description set to 'transkriptor-combined' (no spaces) so
  sounddevice name matches the value stored in config
- Add _pactl_source_for_sd_name() helper for the mapping
2026-04-02 07:51:42 +02:00

209 lines
7.5 KiB
Python

import asyncio
import os
import signal
import threading
import webbrowser
from pathlib import Path
import uvicorn
from fastapi import FastAPI
from fastapi.responses import FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
import pystray
from PIL import Image, ImageDraw
from api.router import router
from api.state import state, Status
from config import load as load_config
# ── FastAPI ────────────────────────────────────────────────────────────────────
app = FastAPI(title="tüit Transkriptor")
app.include_router(router)
FRONTEND_DIR = Path(__file__).parent / "frontend"
@app.get("/")
async def index():
return FileResponse(str(FRONTEND_DIR / "index.html"))
@app.get("/login")
async def login_page():
return FileResponse(str(FRONTEND_DIR / "login.html"))
@app.get("/app.js")
async def appjs():
return FileResponse(str(FRONTEND_DIR / "app.js"))
@app.get("/logo.svg")
async def logo():
return FileResponse(str(FRONTEND_DIR / "logo.svg"), media_type="image/svg+xml")
@app.get("/settings.js")
async def settingsjs():
return FileResponse(str(FRONTEND_DIR / "settings.js"))
# ── PipeWire combined source restore ──────────────────────────────────────────
def _restore_pipewire_combined():
"""Recreate transkriptor-combined.monitor on startup if it was previously configured."""
import json, subprocess, logging
state_path = Path(os.path.expanduser("~/.config/tueit-transcriber/pipewire-modules.json"))
if not state_path.exists():
return
try:
data = json.loads(state_path.read_text())
mic = data.get("mic")
monitor = data.get("monitor")
if not mic or not monitor:
return
sources = subprocess.check_output(
["pactl", "list", "sources", "short"], stderr=subprocess.DEVNULL, timeout=5
).decode()
if "transkriptor-combined.monitor" in sources:
return # already loaded
sink_id = subprocess.check_output([
"pactl", "load-module", "module-null-sink",
"sink_name=transkriptor-combined",
"sink_properties=device.description=transkriptor-combined",
], timeout=5).decode().strip()
mic_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={mic}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
mon_id = subprocess.check_output([
"pactl", "load-module", "module-loopback",
f"source={monitor}", "sink=transkriptor-combined",
], timeout=5).decode().strip()
ids = [int(sink_id), int(mic_id), int(mon_id)]
state_path.write_text(json.dumps({"ids": ids, "mic": mic, "monitor": monitor}))
logging.getLogger(__name__).info("Restored PipeWire combined source (ids: %s)", ids)
except Exception as e:
logging.getLogger(__name__).warning("Could not restore PipeWire combined source: %s", e)
# ── PID file ───────────────────────────────────────────────────────────────────
def write_pid(pid_path: str):
os.makedirs(os.path.dirname(pid_path), exist_ok=True)
Path(pid_path).write_text(str(os.getpid()))
def remove_pid(pid_path: str):
try:
os.unlink(pid_path)
except FileNotFoundError:
pass
# ── SIGUSR1 → toggle ──────────────────────────────────────────────────────────
# We capture uvicorn's event loop after it starts, so the signal handler can
# schedule the toggle coroutine in the correct loop — not a separate one.
_uvicorn_loop: asyncio.AbstractEventLoop | None = None
def _sigusr1_handler(signum, frame):
if _uvicorn_loop:
_uvicorn_loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_async_toggle(), loop=_uvicorn_loop)
)
async def _async_toggle():
from api.router import toggle_recording
# Toggle without a real user dependency — use guest context for signal-triggered recordings.
from api.router import _guest_user
await toggle_recording(user=_guest_user())
# ── Tray ───────────────────────────────────────────────────────────────────────
def _make_icon(recording: bool = False) -> Image.Image:
img = Image.new("RGBA", (64, 64), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
color = (218, 37, 28, 255) if recording else (80, 80, 80, 255)
draw.ellipse([8, 8, 56, 56], fill=color)
return img
def run_tray(port: int):
icon = pystray.Icon(
"tueit-transcriber",
_make_icon(False),
"tüit Transkriptor",
menu=pystray.Menu(
pystray.MenuItem("Aufnahme starten/stoppen", lambda i, it: (
_uvicorn_loop and _uvicorn_loop.call_soon_threadsafe(
lambda: asyncio.ensure_future(_async_toggle(), loop=_uvicorn_loop)
)
), default=True),
pystray.MenuItem("Öffnen", lambda i, it: webbrowser.open(f"http://localhost:{port}")),
pystray.MenuItem("Beenden", lambda i, it: (icon.stop(), os._exit(0))),
),
)
def update_icon(s):
icon.icon = _make_icon(s.status == Status.RECORDING)
state.subscribe(update_icon)
icon.run()
# ── Server ─────────────────────────────────────────────────────────────────────
class _LoopCapture(uvicorn.Server):
"""Subclass that exposes its event loop for the SIGUSR1 handler."""
def install_signal_handlers(self):
# Disable uvicorn's own signal handlers so our SIGUSR1 handler works.
pass
async def startup(self, sockets=None):
global _uvicorn_loop
_uvicorn_loop = asyncio.get_running_loop()
await super().startup(sockets=sockets)
def run_server(config: uvicorn.Config):
server = _LoopCapture(config)
server.run()
# ── Entrypoint ─────────────────────────────────────────────────────────────────
if __name__ == "__main__":
cfg = load_config()
port = cfg["server"]["port"]
host = cfg.get("network", {}).get("host", "127.0.0.1")
pid_path = cfg.get("pid_file", os.path.expanduser("~/.local/run/tueit-transcriber.pid"))
write_pid(pid_path)
_restore_pipewire_combined()
signal.signal(signal.SIGUSR1, _sigusr1_handler)
uvicorn_cfg = uvicorn.Config(app, host=host, port=port, log_level="debug")
server_thread = threading.Thread(target=run_server, args=(uvicorn_cfg,), daemon=True)
server_thread.start()
# Wait until uvicorn has captured its loop
import time
for _ in range(50):
if _uvicorn_loop is not None:
break
time.sleep(0.1)
from auth import has_users
start_path = "/setup" if not has_users() else "/"
webbrowser.open(f"http://localhost:{port}{start_path}")
try:
run_tray(port)
finally:
remove_pid(pid_path)