import asyncio import httpx from typing import Union class TranscriptionEngine: _model = None def _get_model(self, model_name: str = "large-v3", device: str = "auto"): if self._model is None: from faster_whisper import WhisperModel if device == "auto": try: self._model = WhisperModel(model_name, device="cuda", compute_type="float16") except Exception: self._model = WhisperModel(model_name, device="cpu", compute_type="int8") else: compute = "float16" if device in ("cuda", "rocm") else "int8" self._model = WhisperModel(model_name, device=device, compute_type=compute) return self._model async def transcribe_file( self, audio_path: str, language: str = "de", model_name: str = "large-v3", device: str = "auto", base_url: str = "", with_segments: bool = False, ) -> Union[str, list[dict]]: if base_url: return await self._transcribe_remote( audio_path, language, model_name, base_url, with_segments ) return await self._transcribe_local( audio_path, language, model_name, device, with_segments ) async def _transcribe_remote( self, audio_path: str, language: str, model_name: str, base_url: str, with_segments: bool, ) -> Union[str, list[dict]]: async with httpx.AsyncClient(timeout=300) as client: with open(audio_path, "rb") as f: data = {"model": model_name, "language": language} if with_segments: data["timestamp_granularities[]"] = "segment" data["response_format"] = "verbose_json" r = await client.post( f"{base_url}/v1/audio/transcriptions", files={"file": ("audio.wav", f, "audio/wav")}, data=data, ) r.raise_for_status() body = r.json() if not with_segments: return body["text"] raw_segs = body.get("segments") or [] if raw_segs: return [ {"start": s["start"], "end": s["end"], "text": s["text"].strip()} for s in raw_segs ] return [{"start": 0.0, "end": 9999.0, "text": body["text"].strip()}] async def _transcribe_local( self, audio_path: str, language: str, model_name: str, device: str, with_segments: bool, ) -> Union[str, list[dict]]: loop = asyncio.get_running_loop() model = self._get_model(model_name, device) segments, _ = await loop.run_in_executor( None, lambda: model.transcribe(audio_path, language=language), ) segments = list(segments) if not with_segments: return "".join(seg.text for seg in segments).strip() return [ {"start": seg.start, "end": seg.end, "text": seg.text.strip()} for seg in segments if seg.text.strip() ] engine = TranscriptionEngine()