""" Recording Service — Call recording management. Records calls to WAV files via the PJSUA2 media pipeline, manages storage, and provides playback/download access. """ import asyncio import logging import os from datetime import datetime from pathlib import Path from typing import Optional from config import get_settings logger = logging.getLogger(__name__) class RecordingService: """ Manages call recordings. Features: - Start/stop recording for any active call leg - Dual-channel recording (separate caller/agent streams) - Mixed recording (both parties in one file) - WAV storage with organized directory structure - Recording metadata tracking """ def __init__( self, storage_dir: str = "recordings", max_recording_seconds: int = 7200, # 2 hours sample_rate: int = 16000, ): self._storage_dir = Path(storage_dir) self._max_recording_seconds = max_recording_seconds self._sample_rate = sample_rate self._active_recordings: dict[str, RecordingSession] = {} self._metadata: list[dict] = [] async def start(self) -> None: """Initialize the recording service.""" self._storage_dir.mkdir(parents=True, exist_ok=True) logger.info(f"🎙️ Recording service ready (storage: {self._storage_dir})") # ================================================================ # Recording Lifecycle # ================================================================ async def start_recording( self, call_id: str, media_pipeline=None, leg_ids: Optional[list[str]] = None, dual_channel: bool = False, ) -> "RecordingSession": """ Start recording a call. Args: call_id: The call to record. media_pipeline: MediaPipeline instance for PJSUA2 recording. leg_ids: Specific SIP leg IDs to record. If None, records all legs. dual_channel: If True, record each party to a separate channel. Returns: RecordingSession with file paths and metadata. """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") date_dir = datetime.now().strftime("%Y-%m-%d") recording_dir = self._storage_dir / date_dir recording_dir.mkdir(parents=True, exist_ok=True) if dual_channel: filepath_caller = str(recording_dir / f"{call_id}_{timestamp}_caller.wav") filepath_agent = str(recording_dir / f"{call_id}_{timestamp}_agent.wav") filepath_mixed = str(recording_dir / f"{call_id}_{timestamp}_mixed.wav") else: filepath_caller = None filepath_agent = None filepath_mixed = str(recording_dir / f"{call_id}_{timestamp}.wav") session = RecordingSession( call_id=call_id, filepath_mixed=filepath_mixed, filepath_caller=filepath_caller, filepath_agent=filepath_agent, started_at=datetime.now(), sample_rate=self._sample_rate, ) # Start PJSUA2 recording if media pipeline is available if media_pipeline and leg_ids: for leg_id in leg_ids: if filepath_mixed: media_pipeline.start_recording(leg_id, filepath_mixed) self._active_recordings[call_id] = session logger.info(f"🔴 Recording started: {call_id} → {filepath_mixed}") # Safety timeout asyncio.create_task( self._recording_timeout(call_id), name=f"rec_timeout_{call_id}", ) return session async def stop_recording( self, call_id: str, media_pipeline=None, ) -> Optional["RecordingSession"]: """Stop recording a call and finalize the WAV file.""" session = self._active_recordings.pop(call_id, None) if not session: logger.warning(f" No active recording for {call_id}") return None session.stopped_at = datetime.now() session.duration_seconds = int( (session.stopped_at - session.started_at).total_seconds() ) # Stop PJSUA2 recording if media_pipeline: # The pipeline handles flushing and closing the WAV file for leg_id in (session._leg_ids or []): media_pipeline.stop_recording(leg_id) # Calculate file size if session.filepath_mixed and os.path.exists(session.filepath_mixed): session.file_size_bytes = os.path.getsize(session.filepath_mixed) # Store metadata self._metadata.append(session.to_dict()) logger.info( f"⏹ Recording stopped: {call_id} " f"({session.duration_seconds}s, " f"{session.file_size_bytes or 0} bytes)" ) return session async def _recording_timeout(self, call_id: str) -> None: """Auto-stop recording after max duration.""" await asyncio.sleep(self._max_recording_seconds) if call_id in self._active_recordings: logger.warning(f" Recording timeout for {call_id}, auto-stopping") await self.stop_recording(call_id) # ================================================================ # Queries # ================================================================ def get_recording(self, call_id: str) -> Optional[dict]: """Get recording metadata for a call.""" for meta in reversed(self._metadata): if meta["call_id"] == call_id: return meta return None def list_recordings( self, limit: int = 50, offset: int = 0, ) -> list[dict]: """List recording metadata, newest first.""" sorted_meta = sorted( self._metadata, key=lambda m: m.get("started_at", ""), reverse=True, ) return sorted_meta[offset : offset + limit] @property def active_recording_count(self) -> int: return len(self._active_recordings) @property def total_recordings(self) -> int: return len(self._metadata) def storage_usage_bytes(self) -> int: """Calculate total storage used by recordings.""" total = 0 for root, _dirs, files in os.walk(self._storage_dir): for f in files: total += os.path.getsize(os.path.join(root, f)) return total class RecordingSession: """Tracks a single active recording session.""" def __init__( self, call_id: str, filepath_mixed: Optional[str] = None, filepath_caller: Optional[str] = None, filepath_agent: Optional[str] = None, started_at: Optional[datetime] = None, sample_rate: int = 16000, ): self.call_id = call_id self.filepath_mixed = filepath_mixed self.filepath_caller = filepath_caller self.filepath_agent = filepath_agent self.started_at = started_at or datetime.now() self.stopped_at: Optional[datetime] = None self.duration_seconds: Optional[int] = None self.file_size_bytes: Optional[int] = None self.sample_rate = sample_rate self._leg_ids: list[str] = [] def to_dict(self) -> dict: return { "call_id": self.call_id, "filepath_mixed": self.filepath_mixed, "filepath_caller": self.filepath_caller, "filepath_agent": self.filepath_agent, "started_at": self.started_at.isoformat() if self.started_at else None, "stopped_at": self.stopped_at.isoformat() if self.stopped_at else None, "duration_seconds": self.duration_seconds, "file_size_bytes": self.file_size_bytes, "sample_rate": self.sample_rate, }