""" Transcription Service — Speaches STT integration. Sends audio to your Speaches instances for real-time speech-to-text. Used by the Hold Slayer to understand IVR prompts and detect menu options. """ import io import logging from typing import Optional import httpx from config import SpeachesSettings logger = logging.getLogger(__name__) class TranscriptionService: """ Client for Speaches STT service. Speaches exposes an OpenAI-compatible API: POST /v1/audio/transcriptions """ def __init__(self, settings: SpeachesSettings): self.settings = settings self._client: Optional[httpx.AsyncClient] = None async def _get_client(self) -> httpx.AsyncClient: """Get or create the HTTP client.""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( base_url=self.settings.url, timeout=httpx.Timeout(30.0, connect=5.0), ) return self._client async def transcribe( self, audio_data: bytes, language: str = "en", prompt: Optional[str] = None, ) -> str: """ Transcribe audio data to text. Args: audio_data: Raw PCM audio (16-bit signed, 16kHz, mono) language: Language code (default: "en") prompt: Optional context hint for better accuracy (e.g., "IVR menu options, phone banking") Returns: Transcribed text """ client = await self._get_client() # Convert raw PCM to WAV format for the API wav_data = self._pcm_to_wav(audio_data) try: response = await client.post( "/v1/audio/transcriptions", files={"file": ("audio.wav", wav_data, "audio/wav")}, data={ "model": self.settings.model, "language": language, "response_format": "text", **({"prompt": prompt} if prompt else {}), }, ) response.raise_for_status() text = response.text.strip() logger.debug(f"Transcription: '{text}'") return text except httpx.HTTPStatusError as e: logger.error(f"Speaches API error: {e.response.status_code} {e.response.text}") return "" except httpx.ConnectError: logger.error(f"Cannot connect to Speaches at {self.settings.url}") return "" except Exception as e: logger.error(f"Transcription failed: {e}") return "" async def transcribe_stream( self, audio_data: bytes, language: str = "en", ): """ Stream transcription — for real-time results. Uses Speaches streaming endpoint if available, falls back to chunked transcription. Yields: str: Partial transcription chunks """ # For now, do chunked transcription # TODO: Implement WebSocket streaming when Speaches supports it chunk_size = 16000 * 2 * 3 # 3 seconds of 16kHz 16-bit mono for i in range(0, len(audio_data), chunk_size): chunk = audio_data[i:i + chunk_size] if len(chunk) > 0: text = await self.transcribe(chunk, language) if text: yield text async def close(self) -> None: """Close the HTTP client.""" if self._client and not self._client.is_closed: await self._client.aclose() self._client = None @staticmethod def _pcm_to_wav(pcm_data: bytes, sample_rate: int = 16000, channels: int = 1, sample_width: int = 2) -> bytes: """ Convert raw PCM data to WAV format. Args: pcm_data: Raw PCM audio bytes sample_rate: Sample rate in Hz (default: 16000) channels: Number of channels (default: 1 = mono) sample_width: Bytes per sample (default: 2 = 16-bit) Returns: WAV file as bytes """ import struct data_size = len(pcm_data) file_size = 36 + data_size # Header is 44 bytes, minus 8 for RIFF header wav = io.BytesIO() # RIFF header wav.write(b"RIFF") wav.write(struct.pack("