""" Call Manager — Active call state tracking and event bus. Central nervous system of the gateway. Tracks all active calls, publishes events, and coordinates between SIP engine and services. """ import asyncio import logging import uuid from collections.abc import AsyncIterator from datetime import datetime from typing import Optional from core.event_bus import EventBus, EventSubscription from models.call import ActiveCall, AudioClassification, CallMode, CallStatus, ClassificationResult from models.events import EventType, GatewayEvent logger = logging.getLogger(__name__) class CallManager: """ Manages all active calls and their state. The single source of truth for what's happening on the gateway. """ def __init__(self, event_bus: EventBus): self.event_bus = event_bus self._active_calls: dict[str, ActiveCall] = {} self._call_legs: dict[str, str] = {} # SIP leg ID -> call ID mapping # ================================================================ # Call Lifecycle # ================================================================ async def create_call( self, remote_number: str, mode: CallMode = CallMode.DIRECT, intent: Optional[str] = None, call_flow_id: Optional[str] = None, device: Optional[str] = None, services: Optional[list[str]] = None, ) -> ActiveCall: """Create a new call and track it.""" call_id = f"call_{uuid.uuid4().hex[:12]}" call = ActiveCall( id=call_id, remote_number=remote_number, mode=mode, intent=intent, call_flow_id=call_flow_id, device=device, services=services or [], ) self._active_calls[call_id] = call await self.event_bus.publish(GatewayEvent( type=EventType.CALL_INITIATED, call_id=call_id, data={"number": remote_number, "mode": mode.value, "intent": intent}, message=f"📞 Calling {remote_number} ({mode.value})", )) return call async def update_status(self, call_id: str, status: CallStatus) -> None: """Update a call's status and publish event.""" call = self._active_calls.get(call_id) if not call: logger.warning(f"Cannot update status: call {call_id} not found") return old_status = call.status call.status = status # Track timing milestones if status == CallStatus.CONNECTED and not call.connected_at: call.connected_at = datetime.now() elif status == CallStatus.ON_HOLD: call.hold_started_at = datetime.now() elif status == CallStatus.HUMAN_DETECTED: call.hold_started_at = None # Stop counting hold time # Map status to event type event_map = { CallStatus.RINGING: EventType.CALL_RINGING, CallStatus.CONNECTED: EventType.CALL_CONNECTED, CallStatus.NAVIGATING_IVR: EventType.IVR_STEP, CallStatus.ON_HOLD: EventType.HOLD_DETECTED, CallStatus.HUMAN_DETECTED: EventType.HUMAN_DETECTED, CallStatus.TRANSFERRING: EventType.TRANSFER_STARTED, CallStatus.BRIDGED: EventType.TRANSFER_COMPLETE, CallStatus.COMPLETED: EventType.CALL_ENDED, CallStatus.FAILED: EventType.CALL_FAILED, } event_type = event_map.get(status, EventType.CALL_CONNECTED) await self.event_bus.publish(GatewayEvent( type=event_type, call_id=call_id, data={ "old_status": old_status.value, "new_status": status.value, "duration": call.duration, "hold_time": call.hold_time, }, message=f"Call {call_id}: {old_status.value} → {status.value}", )) async def add_classification( self, call_id: str, result: ClassificationResult ) -> None: """Add an audio classification result to a call.""" call = self._active_calls.get(call_id) if not call: return call.current_classification = result.audio_type call.classification_history.append(result) await self.event_bus.publish(GatewayEvent( type=EventType.AUDIO_CLASSIFIED, call_id=call_id, data={ "audio_type": result.audio_type.value, "confidence": result.confidence, }, message=f"🎵 Audio: {result.audio_type.value} ({result.confidence:.0%})", )) async def add_transcript(self, call_id: str, text: str) -> None: """Add a transcript chunk to a call.""" call = self._active_calls.get(call_id) if not call: return call.transcript_chunks.append(text) await self.event_bus.publish(GatewayEvent( type=EventType.TRANSCRIPT_CHUNK, call_id=call_id, data={"text": text}, message=f"📝 '{text[:80]}...' " if len(text) > 80 else f"📝 '{text}'", )) async def end_call(self, call_id: str, status: CallStatus = CallStatus.COMPLETED) -> Optional[ActiveCall]: """End a call and remove from active tracking.""" call = self._active_calls.pop(call_id, None) if call: call.status = status await self.event_bus.publish(GatewayEvent( type=EventType.CALL_ENDED, call_id=call_id, data={ "duration": call.duration, "hold_time": call.hold_time, "final_status": status.value, }, message=f"📵 Call ended: {call.remote_number} ({call.duration}s, hold: {call.hold_time}s)", )) return call # ================================================================ # Leg Mapping # ================================================================ def map_leg(self, sip_leg_id: str, call_id: str) -> None: """Map a SIP leg ID to a call ID.""" self._call_legs[sip_leg_id] = call_id def get_call_for_leg(self, sip_leg_id: str) -> Optional[ActiveCall]: """Look up which call a SIP leg belongs to.""" call_id = self._call_legs.get(sip_leg_id) if call_id: return self._active_calls.get(call_id) return None # ================================================================ # Queries # ================================================================ def get_call(self, call_id: str) -> Optional[ActiveCall]: """Get an active call by ID.""" return self._active_calls.get(call_id) @property def active_calls(self) -> dict[str, ActiveCall]: """All active calls.""" return dict(self._active_calls) @property def active_call_count(self) -> int: return len(self._active_calls)