Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
200 lines
6.9 KiB
Python
200 lines
6.9 KiB
Python
"""
|
|
Call Manager — Active call state tracking and event bus.
|
|
|
|
Central nervous system of the gateway. Tracks all active calls,
|
|
publishes events, and coordinates between SIP engine and services.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from collections.abc import AsyncIterator
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from core.event_bus import EventBus, EventSubscription
|
|
from models.call import ActiveCall, AudioClassification, CallMode, CallStatus, ClassificationResult
|
|
from models.events import EventType, GatewayEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CallManager:
|
|
"""
|
|
Manages all active calls and their state.
|
|
|
|
The single source of truth for what's happening on the gateway.
|
|
"""
|
|
|
|
def __init__(self, event_bus: EventBus):
|
|
self.event_bus = event_bus
|
|
self._active_calls: dict[str, ActiveCall] = {}
|
|
self._call_legs: dict[str, str] = {} # SIP leg ID -> call ID mapping
|
|
|
|
# ================================================================
|
|
# Call Lifecycle
|
|
# ================================================================
|
|
|
|
async def create_call(
|
|
self,
|
|
remote_number: str,
|
|
mode: CallMode = CallMode.DIRECT,
|
|
intent: Optional[str] = None,
|
|
call_flow_id: Optional[str] = None,
|
|
device: Optional[str] = None,
|
|
services: Optional[list[str]] = None,
|
|
) -> ActiveCall:
|
|
"""Create a new call and track it."""
|
|
call_id = f"call_{uuid.uuid4().hex[:12]}"
|
|
|
|
call = ActiveCall(
|
|
id=call_id,
|
|
remote_number=remote_number,
|
|
mode=mode,
|
|
intent=intent,
|
|
call_flow_id=call_flow_id,
|
|
device=device,
|
|
services=services or [],
|
|
)
|
|
|
|
self._active_calls[call_id] = call
|
|
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.CALL_INITIATED,
|
|
call_id=call_id,
|
|
data={"number": remote_number, "mode": mode.value, "intent": intent},
|
|
message=f"📞 Calling {remote_number} ({mode.value})",
|
|
))
|
|
|
|
return call
|
|
|
|
async def update_status(self, call_id: str, status: CallStatus) -> None:
|
|
"""Update a call's status and publish event."""
|
|
call = self._active_calls.get(call_id)
|
|
if not call:
|
|
logger.warning(f"Cannot update status: call {call_id} not found")
|
|
return
|
|
|
|
old_status = call.status
|
|
call.status = status
|
|
|
|
# Track timing milestones
|
|
if status == CallStatus.CONNECTED and not call.connected_at:
|
|
call.connected_at = datetime.now()
|
|
elif status == CallStatus.ON_HOLD:
|
|
call.hold_started_at = datetime.now()
|
|
elif status == CallStatus.HUMAN_DETECTED:
|
|
call.hold_started_at = None # Stop counting hold time
|
|
|
|
# Map status to event type
|
|
event_map = {
|
|
CallStatus.RINGING: EventType.CALL_RINGING,
|
|
CallStatus.CONNECTED: EventType.CALL_CONNECTED,
|
|
CallStatus.NAVIGATING_IVR: EventType.IVR_STEP,
|
|
CallStatus.ON_HOLD: EventType.HOLD_DETECTED,
|
|
CallStatus.HUMAN_DETECTED: EventType.HUMAN_DETECTED,
|
|
CallStatus.TRANSFERRING: EventType.TRANSFER_STARTED,
|
|
CallStatus.BRIDGED: EventType.TRANSFER_COMPLETE,
|
|
CallStatus.COMPLETED: EventType.CALL_ENDED,
|
|
CallStatus.FAILED: EventType.CALL_FAILED,
|
|
}
|
|
|
|
event_type = event_map.get(status, EventType.CALL_CONNECTED)
|
|
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=event_type,
|
|
call_id=call_id,
|
|
data={
|
|
"old_status": old_status.value,
|
|
"new_status": status.value,
|
|
"duration": call.duration,
|
|
"hold_time": call.hold_time,
|
|
},
|
|
message=f"Call {call_id}: {old_status.value} → {status.value}",
|
|
))
|
|
|
|
async def add_classification(
|
|
self, call_id: str, result: ClassificationResult
|
|
) -> None:
|
|
"""Add an audio classification result to a call."""
|
|
call = self._active_calls.get(call_id)
|
|
if not call:
|
|
return
|
|
|
|
call.current_classification = result.audio_type
|
|
call.classification_history.append(result)
|
|
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.AUDIO_CLASSIFIED,
|
|
call_id=call_id,
|
|
data={
|
|
"audio_type": result.audio_type.value,
|
|
"confidence": result.confidence,
|
|
},
|
|
message=f"🎵 Audio: {result.audio_type.value} ({result.confidence:.0%})",
|
|
))
|
|
|
|
async def add_transcript(self, call_id: str, text: str) -> None:
|
|
"""Add a transcript chunk to a call."""
|
|
call = self._active_calls.get(call_id)
|
|
if not call:
|
|
return
|
|
|
|
call.transcript_chunks.append(text)
|
|
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.TRANSCRIPT_CHUNK,
|
|
call_id=call_id,
|
|
data={"text": text},
|
|
message=f"📝 '{text[:80]}...' " if len(text) > 80 else f"📝 '{text}'",
|
|
))
|
|
|
|
async def end_call(self, call_id: str, status: CallStatus = CallStatus.COMPLETED) -> Optional[ActiveCall]:
|
|
"""End a call and remove from active tracking."""
|
|
call = self._active_calls.pop(call_id, None)
|
|
if call:
|
|
call.status = status
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.CALL_ENDED,
|
|
call_id=call_id,
|
|
data={
|
|
"duration": call.duration,
|
|
"hold_time": call.hold_time,
|
|
"final_status": status.value,
|
|
},
|
|
message=f"📵 Call ended: {call.remote_number} ({call.duration}s, hold: {call.hold_time}s)",
|
|
))
|
|
return call
|
|
|
|
# ================================================================
|
|
# Leg Mapping
|
|
# ================================================================
|
|
|
|
def map_leg(self, sip_leg_id: str, call_id: str) -> None:
|
|
"""Map a SIP leg ID to a call ID."""
|
|
self._call_legs[sip_leg_id] = call_id
|
|
|
|
def get_call_for_leg(self, sip_leg_id: str) -> Optional[ActiveCall]:
|
|
"""Look up which call a SIP leg belongs to."""
|
|
call_id = self._call_legs.get(sip_leg_id)
|
|
if call_id:
|
|
return self._active_calls.get(call_id)
|
|
return None
|
|
|
|
# ================================================================
|
|
# Queries
|
|
# ================================================================
|
|
|
|
def get_call(self, call_id: str) -> Optional[ActiveCall]:
|
|
"""Get an active call by ID."""
|
|
return self._active_calls.get(call_id)
|
|
|
|
@property
|
|
def active_calls(self) -> dict[str, ActiveCall]:
|
|
"""All active calls."""
|
|
return dict(self._active_calls)
|
|
|
|
@property
|
|
def active_call_count(self) -> int:
|
|
return len(self._active_calls)
|