"""WebSocket API — Real-time call events and audio classification stream.""" import asyncio import logging from fastapi import APIRouter, WebSocket, WebSocketDisconnect from api.deps import get_gateway from models.events import EventType, GatewayEvent logger = logging.getLogger(__name__) router = APIRouter() async def _send_trunk_status(websocket: WebSocket, gateway) -> None: """Send current SIP trunk status as a synthetic event to a newly connected client.""" try: trunk_status = await gateway.sip_engine.get_trunk_status() registered = trunk_status.get("registered", False) event_type = ( EventType.SIP_TRUNK_REGISTERED if registered else EventType.SIP_TRUNK_REGISTRATION_FAILED ) reason = trunk_status.get("reason", "Trunk registration failed or not configured") event = GatewayEvent( type=event_type, message=( f"SIP trunk registered with {trunk_status.get('host')}" if registered else f"SIP trunk not registered — {reason}" ), data=trunk_status, ) await websocket.send_json(event.to_ws_message()) except Exception as exc: logger.warning(f"Could not send trunk status on connect: {exc}") @router.websocket("/events") async def event_stream(websocket: WebSocket): """ Real-time event stream. Sends all gateway events as JSON: - Call lifecycle (initiated, ringing, connected, ended) - Hold Slayer events (IVR steps, DTMF, hold detected, human detected) - Audio classifications - Transcript chunks - Device status changes Example message: { "type": "holdslayer.human_detected", "call_id": "call_abc123", "timestamp": "2025-01-15T14:30:00", "data": {"audio_type": "live_human", "confidence": 0.92}, "message": "🚨 Human detected!" } """ await websocket.accept() logger.info("WebSocket client connected") gateway = getattr(websocket.app.state, "gateway", None) if not gateway: await websocket.send_json({"error": "Gateway not initialized"}) await websocket.close() return # Immediately push current trunk status so the dashboard doesn't start blank await _send_trunk_status(websocket, gateway) subscription = gateway.event_bus.subscribe() try: async for event in subscription: await websocket.send_json(event.to_ws_message()) except WebSocketDisconnect: logger.info("WebSocket client disconnected") except Exception as e: logger.error(f"WebSocket error: {e}") finally: subscription.close() @router.websocket("/calls/{call_id}/events") async def call_event_stream(websocket: WebSocket, call_id: str): """ Event stream filtered to a specific call. Same format as /events but only sends events for the specified call. """ await websocket.accept() logger.info(f"WebSocket client connected for call {call_id}") gateway = getattr(websocket.app.state, "gateway", None) if not gateway: await websocket.send_json({"error": "Gateway not initialized"}) await websocket.close() return subscription = gateway.event_bus.subscribe() try: async for event in subscription: if event.call_id == call_id: await websocket.send_json(event.to_ws_message()) except WebSocketDisconnect: logger.info(f"WebSocket client disconnected for call {call_id}") except Exception as e: logger.error(f"WebSocket error: {e}") finally: subscription.close()