Files
hold-slayer/core/call_manager.py
Robert Helewka ecf37658ce feat: add initial Hold Slayer AI telephony gateway implementation
Complete project scaffolding and core implementation of an AI-powered
telephony system that calls companies, navigates IVR menus, waits on
hold, and transfers to the user when a human answers.

Key components:
- FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces
- SIP/VoIP call management via PJSUA2 with RTP audio streaming
- LLM-powered IVR navigation using OpenAI/Anthropic with tool calling
- Hold detection service combining audio analysis and silence detection
- Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines
- Call recording with per-channel and mixed audio capture
- Event bus (asyncio pub/sub) for real-time client updates
- Web dashboard with live call monitoring
- SQLite persistence via SQLAlchemy with call history and analytics
- Notification support (email, SMS, webhook, desktop)
- Docker Compose deployment with Opal VoIP and Opal Media containers
- Comprehensive test suite with unit, integration, and E2E tests
- Simplified .gitignore and full project documentation in README
2026-03-21 19:23:26 +00:00

200 lines
6.9 KiB
Python

"""
Call Manager — Active call state tracking and event bus.
Central nervous system of the gateway. Tracks all active calls,
publishes events, and coordinates between SIP engine and services.
"""
import asyncio
import logging
import uuid
from collections.abc import AsyncIterator
from datetime import datetime
from typing import Optional
from core.event_bus import EventBus, EventSubscription
from models.call import ActiveCall, AudioClassification, CallMode, CallStatus, ClassificationResult
from models.events import EventType, GatewayEvent
logger = logging.getLogger(__name__)
class CallManager:
"""
Manages all active calls and their state.
The single source of truth for what's happening on the gateway.
"""
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
self._active_calls: dict[str, ActiveCall] = {}
self._call_legs: dict[str, str] = {} # SIP leg ID -> call ID mapping
# ================================================================
# Call Lifecycle
# ================================================================
async def create_call(
self,
remote_number: str,
mode: CallMode = CallMode.DIRECT,
intent: Optional[str] = None,
call_flow_id: Optional[str] = None,
device: Optional[str] = None,
services: Optional[list[str]] = None,
) -> ActiveCall:
"""Create a new call and track it."""
call_id = f"call_{uuid.uuid4().hex[:12]}"
call = ActiveCall(
id=call_id,
remote_number=remote_number,
mode=mode,
intent=intent,
call_flow_id=call_flow_id,
device=device,
services=services or [],
)
self._active_calls[call_id] = call
await self.event_bus.publish(GatewayEvent(
type=EventType.CALL_INITIATED,
call_id=call_id,
data={"number": remote_number, "mode": mode.value, "intent": intent},
message=f"📞 Calling {remote_number} ({mode.value})",
))
return call
async def update_status(self, call_id: str, status: CallStatus) -> None:
"""Update a call's status and publish event."""
call = self._active_calls.get(call_id)
if not call:
logger.warning(f"Cannot update status: call {call_id} not found")
return
old_status = call.status
call.status = status
# Track timing milestones
if status == CallStatus.CONNECTED and not call.connected_at:
call.connected_at = datetime.now()
elif status == CallStatus.ON_HOLD:
call.hold_started_at = datetime.now()
elif status == CallStatus.HUMAN_DETECTED:
call.hold_started_at = None # Stop counting hold time
# Map status to event type
event_map = {
CallStatus.RINGING: EventType.CALL_RINGING,
CallStatus.CONNECTED: EventType.CALL_CONNECTED,
CallStatus.NAVIGATING_IVR: EventType.IVR_STEP,
CallStatus.ON_HOLD: EventType.HOLD_DETECTED,
CallStatus.HUMAN_DETECTED: EventType.HUMAN_DETECTED,
CallStatus.TRANSFERRING: EventType.TRANSFER_STARTED,
CallStatus.BRIDGED: EventType.TRANSFER_COMPLETE,
CallStatus.COMPLETED: EventType.CALL_ENDED,
CallStatus.FAILED: EventType.CALL_FAILED,
}
event_type = event_map.get(status, EventType.CALL_CONNECTED)
await self.event_bus.publish(GatewayEvent(
type=event_type,
call_id=call_id,
data={
"old_status": old_status.value,
"new_status": status.value,
"duration": call.duration,
"hold_time": call.hold_time,
},
message=f"Call {call_id}: {old_status.value}{status.value}",
))
async def add_classification(
self, call_id: str, result: ClassificationResult
) -> None:
"""Add an audio classification result to a call."""
call = self._active_calls.get(call_id)
if not call:
return
call.current_classification = result.audio_type
call.classification_history.append(result)
await self.event_bus.publish(GatewayEvent(
type=EventType.AUDIO_CLASSIFIED,
call_id=call_id,
data={
"audio_type": result.audio_type.value,
"confidence": result.confidence,
},
message=f"🎵 Audio: {result.audio_type.value} ({result.confidence:.0%})",
))
async def add_transcript(self, call_id: str, text: str) -> None:
"""Add a transcript chunk to a call."""
call = self._active_calls.get(call_id)
if not call:
return
call.transcript_chunks.append(text)
await self.event_bus.publish(GatewayEvent(
type=EventType.TRANSCRIPT_CHUNK,
call_id=call_id,
data={"text": text},
message=f"📝 '{text[:80]}...' " if len(text) > 80 else f"📝 '{text}'",
))
async def end_call(self, call_id: str, status: CallStatus = CallStatus.COMPLETED) -> Optional[ActiveCall]:
"""End a call and remove from active tracking."""
call = self._active_calls.pop(call_id, None)
if call:
call.status = status
await self.event_bus.publish(GatewayEvent(
type=EventType.CALL_ENDED,
call_id=call_id,
data={
"duration": call.duration,
"hold_time": call.hold_time,
"final_status": status.value,
},
message=f"📵 Call ended: {call.remote_number} ({call.duration}s, hold: {call.hold_time}s)",
))
return call
# ================================================================
# Leg Mapping
# ================================================================
def map_leg(self, sip_leg_id: str, call_id: str) -> None:
"""Map a SIP leg ID to a call ID."""
self._call_legs[sip_leg_id] = call_id
def get_call_for_leg(self, sip_leg_id: str) -> Optional[ActiveCall]:
"""Look up which call a SIP leg belongs to."""
call_id = self._call_legs.get(sip_leg_id)
if call_id:
return self._active_calls.get(call_id)
return None
# ================================================================
# Queries
# ================================================================
def get_call(self, call_id: str) -> Optional[ActiveCall]:
"""Get an active call by ID."""
return self._active_calls.get(call_id)
@property
def active_calls(self) -> dict[str, ActiveCall]:
"""All active calls."""
return dict(self._active_calls)
@property
def active_call_count(self) -> int:
return len(self._active_calls)