Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
231 lines
7.7 KiB
Python
231 lines
7.7 KiB
Python
"""
|
|
Recording Service — Call recording management.
|
|
|
|
Records calls to WAV files via the PJSUA2 media pipeline,
|
|
manages storage, and provides playback/download access.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RecordingService:
|
|
"""
|
|
Manages call recordings.
|
|
|
|
Features:
|
|
- Start/stop recording for any active call leg
|
|
- Dual-channel recording (separate caller/agent streams)
|
|
- Mixed recording (both parties in one file)
|
|
- WAV storage with organized directory structure
|
|
- Recording metadata tracking
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
storage_dir: str = "recordings",
|
|
max_recording_seconds: int = 7200, # 2 hours
|
|
sample_rate: int = 16000,
|
|
):
|
|
self._storage_dir = Path(storage_dir)
|
|
self._max_recording_seconds = max_recording_seconds
|
|
self._sample_rate = sample_rate
|
|
self._active_recordings: dict[str, RecordingSession] = {}
|
|
self._metadata: list[dict] = []
|
|
|
|
async def start(self) -> None:
|
|
"""Initialize the recording service."""
|
|
self._storage_dir.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"🎙️ Recording service ready (storage: {self._storage_dir})")
|
|
|
|
# ================================================================
|
|
# Recording Lifecycle
|
|
# ================================================================
|
|
|
|
async def start_recording(
|
|
self,
|
|
call_id: str,
|
|
media_pipeline=None,
|
|
leg_ids: Optional[list[str]] = None,
|
|
dual_channel: bool = False,
|
|
) -> "RecordingSession":
|
|
"""
|
|
Start recording a call.
|
|
|
|
Args:
|
|
call_id: The call to record.
|
|
media_pipeline: MediaPipeline instance for PJSUA2 recording.
|
|
leg_ids: Specific SIP leg IDs to record. If None, records all legs.
|
|
dual_channel: If True, record each party to a separate channel.
|
|
|
|
Returns:
|
|
RecordingSession with file paths and metadata.
|
|
"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
date_dir = datetime.now().strftime("%Y-%m-%d")
|
|
recording_dir = self._storage_dir / date_dir
|
|
recording_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if dual_channel:
|
|
filepath_caller = str(recording_dir / f"{call_id}_{timestamp}_caller.wav")
|
|
filepath_agent = str(recording_dir / f"{call_id}_{timestamp}_agent.wav")
|
|
filepath_mixed = str(recording_dir / f"{call_id}_{timestamp}_mixed.wav")
|
|
else:
|
|
filepath_caller = None
|
|
filepath_agent = None
|
|
filepath_mixed = str(recording_dir / f"{call_id}_{timestamp}.wav")
|
|
|
|
session = RecordingSession(
|
|
call_id=call_id,
|
|
filepath_mixed=filepath_mixed,
|
|
filepath_caller=filepath_caller,
|
|
filepath_agent=filepath_agent,
|
|
started_at=datetime.now(),
|
|
sample_rate=self._sample_rate,
|
|
)
|
|
|
|
# Start PJSUA2 recording if media pipeline is available
|
|
if media_pipeline and leg_ids:
|
|
for leg_id in leg_ids:
|
|
if filepath_mixed:
|
|
media_pipeline.start_recording(leg_id, filepath_mixed)
|
|
|
|
self._active_recordings[call_id] = session
|
|
logger.info(f"🔴 Recording started: {call_id} → {filepath_mixed}")
|
|
|
|
# Safety timeout
|
|
asyncio.create_task(
|
|
self._recording_timeout(call_id),
|
|
name=f"rec_timeout_{call_id}",
|
|
)
|
|
|
|
return session
|
|
|
|
async def stop_recording(
|
|
self,
|
|
call_id: str,
|
|
media_pipeline=None,
|
|
) -> Optional["RecordingSession"]:
|
|
"""Stop recording a call and finalize the WAV file."""
|
|
session = self._active_recordings.pop(call_id, None)
|
|
if not session:
|
|
logger.warning(f" No active recording for {call_id}")
|
|
return None
|
|
|
|
session.stopped_at = datetime.now()
|
|
session.duration_seconds = int(
|
|
(session.stopped_at - session.started_at).total_seconds()
|
|
)
|
|
|
|
# Stop PJSUA2 recording
|
|
if media_pipeline:
|
|
# The pipeline handles flushing and closing the WAV file
|
|
for leg_id in (session._leg_ids or []):
|
|
media_pipeline.stop_recording(leg_id)
|
|
|
|
# Calculate file size
|
|
if session.filepath_mixed and os.path.exists(session.filepath_mixed):
|
|
session.file_size_bytes = os.path.getsize(session.filepath_mixed)
|
|
|
|
# Store metadata
|
|
self._metadata.append(session.to_dict())
|
|
|
|
logger.info(
|
|
f"⏹ Recording stopped: {call_id} "
|
|
f"({session.duration_seconds}s, "
|
|
f"{session.file_size_bytes or 0} bytes)"
|
|
)
|
|
return session
|
|
|
|
async def _recording_timeout(self, call_id: str) -> None:
|
|
"""Auto-stop recording after max duration."""
|
|
await asyncio.sleep(self._max_recording_seconds)
|
|
if call_id in self._active_recordings:
|
|
logger.warning(f" Recording timeout for {call_id}, auto-stopping")
|
|
await self.stop_recording(call_id)
|
|
|
|
# ================================================================
|
|
# Queries
|
|
# ================================================================
|
|
|
|
def get_recording(self, call_id: str) -> Optional[dict]:
|
|
"""Get recording metadata for a call."""
|
|
for meta in reversed(self._metadata):
|
|
if meta["call_id"] == call_id:
|
|
return meta
|
|
return None
|
|
|
|
def list_recordings(
|
|
self,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
) -> list[dict]:
|
|
"""List recording metadata, newest first."""
|
|
sorted_meta = sorted(
|
|
self._metadata,
|
|
key=lambda m: m.get("started_at", ""),
|
|
reverse=True,
|
|
)
|
|
return sorted_meta[offset : offset + limit]
|
|
|
|
@property
|
|
def active_recording_count(self) -> int:
|
|
return len(self._active_recordings)
|
|
|
|
@property
|
|
def total_recordings(self) -> int:
|
|
return len(self._metadata)
|
|
|
|
def storage_usage_bytes(self) -> int:
|
|
"""Calculate total storage used by recordings."""
|
|
total = 0
|
|
for root, _dirs, files in os.walk(self._storage_dir):
|
|
for f in files:
|
|
total += os.path.getsize(os.path.join(root, f))
|
|
return total
|
|
|
|
|
|
class RecordingSession:
|
|
"""Tracks a single active recording session."""
|
|
|
|
def __init__(
|
|
self,
|
|
call_id: str,
|
|
filepath_mixed: Optional[str] = None,
|
|
filepath_caller: Optional[str] = None,
|
|
filepath_agent: Optional[str] = None,
|
|
started_at: Optional[datetime] = None,
|
|
sample_rate: int = 16000,
|
|
):
|
|
self.call_id = call_id
|
|
self.filepath_mixed = filepath_mixed
|
|
self.filepath_caller = filepath_caller
|
|
self.filepath_agent = filepath_agent
|
|
self.started_at = started_at or datetime.now()
|
|
self.stopped_at: Optional[datetime] = None
|
|
self.duration_seconds: Optional[int] = None
|
|
self.file_size_bytes: Optional[int] = None
|
|
self.sample_rate = sample_rate
|
|
self._leg_ids: list[str] = []
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"call_id": self.call_id,
|
|
"filepath_mixed": self.filepath_mixed,
|
|
"filepath_caller": self.filepath_caller,
|
|
"filepath_agent": self.filepath_agent,
|
|
"started_at": self.started_at.isoformat() if self.started_at else None,
|
|
"stopped_at": self.stopped_at.isoformat() if self.stopped_at else None,
|
|
"duration_seconds": self.duration_seconds,
|
|
"file_size_bytes": self.file_size_bytes,
|
|
"sample_rate": self.sample_rate,
|
|
}
|