c7cad4bb2a
- transcription.py: new _transcribe_remote_whispercpp() using /inference endpoint - transcription.py: backend param routes to openai or whispercpp remote path - config.py: whisper.backend default 'openai', alt 'whispercpp' - pipeline.py: passes backend from config to transcribe_file - settings: backend dropdown (OpenAI-compat / whisper.cpp) - SETUP.md: whisper.cpp ROCm build and systemd setup instructions whisper-cpp-server running on beastix :8080 (ROCm0, gfx1030, RX 6800 XT)
129 lines
4.5 KiB
Python
129 lines
4.5 KiB
Python
import asyncio
|
|
import httpx
|
|
from typing import Union
|
|
|
|
|
|
class TranscriptionEngine:
|
|
_model = None
|
|
|
|
def _get_model(self, model_name: str = "large-v3", device: str = "auto"):
|
|
if self._model is None:
|
|
from faster_whisper import WhisperModel
|
|
if device == "auto":
|
|
try:
|
|
self._model = WhisperModel(model_name, device="cuda", compute_type="float16")
|
|
except Exception:
|
|
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
|
|
else:
|
|
compute = "float16" if device in ("cuda", "rocm") else "int8"
|
|
self._model = WhisperModel(model_name, device=device, compute_type=compute)
|
|
return self._model
|
|
|
|
async def transcribe_file(
|
|
self,
|
|
audio_path: str,
|
|
language: str = "de",
|
|
model_name: str = "large-v3",
|
|
device: str = "auto",
|
|
base_url: str = "",
|
|
with_segments: bool = False,
|
|
backend: str = "openai",
|
|
) -> Union[str, list[dict]]:
|
|
if base_url:
|
|
if backend == "whispercpp":
|
|
return await self._transcribe_remote_whispercpp(
|
|
audio_path, language, base_url, with_segments
|
|
)
|
|
return await self._transcribe_remote(
|
|
audio_path, language, model_name, base_url, with_segments
|
|
)
|
|
return await self._transcribe_local(
|
|
audio_path, language, model_name, device, with_segments
|
|
)
|
|
|
|
async def _transcribe_remote(
|
|
self,
|
|
audio_path: str,
|
|
language: str,
|
|
model_name: str,
|
|
base_url: str,
|
|
with_segments: bool,
|
|
) -> Union[str, list[dict]]:
|
|
async with httpx.AsyncClient(timeout=300) as client:
|
|
with open(audio_path, "rb") as f:
|
|
data = {"model": model_name, "language": language}
|
|
if with_segments:
|
|
data["timestamp_granularities[]"] = "segment"
|
|
data["response_format"] = "verbose_json"
|
|
r = await client.post(
|
|
f"{base_url}/v1/audio/transcriptions",
|
|
files={"file": ("audio.wav", f, "audio/wav")},
|
|
data=data,
|
|
)
|
|
r.raise_for_status()
|
|
body = r.json()
|
|
if not with_segments:
|
|
return body["text"]
|
|
raw_segs = body.get("segments") or []
|
|
if raw_segs:
|
|
return [
|
|
{"start": s["start"], "end": s["end"], "text": s["text"].strip()}
|
|
for s in raw_segs
|
|
]
|
|
return [{"start": 0.0, "end": 9999.0, "text": body["text"].strip()}]
|
|
|
|
async def _transcribe_remote_whispercpp(
|
|
self,
|
|
audio_path: str,
|
|
language: str,
|
|
base_url: str,
|
|
with_segments: bool,
|
|
) -> Union[str, list[dict]]:
|
|
async with httpx.AsyncClient(timeout=300) as client:
|
|
with open(audio_path, "rb") as f:
|
|
data = {"language": language}
|
|
if with_segments:
|
|
data["response_format"] = "verbose_json"
|
|
r = await client.post(
|
|
f"{base_url}/inference",
|
|
files={"file": ("audio.wav", f, "audio/wav")},
|
|
data=data,
|
|
)
|
|
r.raise_for_status()
|
|
body = r.json()
|
|
if not with_segments:
|
|
return body.get("text", "").strip()
|
|
raw_segs = body.get("segments") or []
|
|
if raw_segs:
|
|
return [
|
|
{"start": s["start"], "end": s["end"], "text": s["text"].strip()}
|
|
for s in raw_segs
|
|
]
|
|
return [{"start": 0.0, "end": 9999.0, "text": body.get("text", "").strip()}]
|
|
|
|
async def _transcribe_local(
|
|
self,
|
|
audio_path: str,
|
|
language: str,
|
|
model_name: str,
|
|
device: str,
|
|
with_segments: bool,
|
|
) -> Union[str, list[dict]]:
|
|
loop = asyncio.get_running_loop()
|
|
model = self._get_model(model_name, device)
|
|
segments, _ = await loop.run_in_executor(
|
|
None,
|
|
lambda: model.transcribe(audio_path, language=language),
|
|
)
|
|
segments = list(segments)
|
|
if not with_segments:
|
|
return "".join(seg.text for seg in segments).strip()
|
|
return [
|
|
{"start": seg.start, "end": seg.end, "text": seg.text.strip()}
|
|
for seg in segments
|
|
if seg.text.strip()
|
|
]
|
|
|
|
|
|
engine = TranscriptionEngine()
|