""" SIP Engine — Abstract interface for SIP signaling and media control. This defines the contract that any SIP backend (Sippy B2BUA, PJSUA2, etc.) must implement. The rest of the gateway talks to this interface, never to the underlying SIP library directly. """ import abc from collections.abc import AsyncIterator from typing import Optional from models.call import ActiveCall from models.device import Device class SIPEngine(abc.ABC): """ Abstract SIP engine interface. Implementations: - SippyEngine: Sippy B2BUA for signaling + PJSUA2 for media - MockEngine: For testing without a real SIP stack """ # ================================================================ # Lifecycle # ================================================================ @abc.abstractmethod async def start(self) -> None: """ Start the SIP engine. - Initialize the SIP stack - Register with the SIP trunk - Start listening for device registrations """ ... @abc.abstractmethod async def stop(self) -> None: """ Gracefully shut down. - Hang up all active calls - Unregister from trunk - Close all sockets """ ... @abc.abstractmethod async def is_ready(self) -> bool: """Is the engine ready to make/receive calls?""" ... # ================================================================ # Outbound Calls # ================================================================ @abc.abstractmethod async def make_call(self, number: str, caller_id: Optional[str] = None) -> str: """ Place an outbound call via the SIP trunk. Args: number: Phone number to call (E.164) caller_id: Optional caller ID override Returns: SIP call leg ID (used to reference this call in the engine) """ ... @abc.abstractmethod async def hangup(self, call_leg_id: str) -> None: """Hang up a call leg.""" ... @abc.abstractmethod async def send_dtmf(self, call_leg_id: str, digits: str) -> None: """ Send DTMF tones on a call leg. Args: call_leg_id: The call leg to send on digits: DTMF digits to send (0-9, *, #) """ ... # ================================================================ # Device Calls (for transfer) # ================================================================ @abc.abstractmethod async def call_device(self, device: Device) -> str: """ Place a call to a registered device. For SIP devices: sends INVITE to their registered contact. For cell phones: places outbound call via trunk. Args: device: The device to call Returns: SIP call leg ID for the device leg """ ... # ================================================================ # Conference Bridge / Media # ================================================================ @abc.abstractmethod async def bridge_calls(self, leg_a: str, leg_b: str) -> str: """ Bridge two call legs together in a conference. Audio from leg_a flows to leg_b and vice versa. Args: leg_a: First call leg ID leg_b: Second call leg ID Returns: Bridge/conference ID """ ... @abc.abstractmethod async def unbridge(self, bridge_id: str) -> None: """Remove a bridge, disconnecting the audio paths.""" ... @abc.abstractmethod def get_audio_stream(self, call_leg_id: str): """ Get a real-time audio stream from a call leg. Returns an async generator yielding audio chunks (PCM/WAV frames). Used by the audio classifier and transcription services. Yields: bytes: Audio frames (16-bit PCM, 16kHz mono) """ ... # ================================================================ # Registration # ================================================================ @abc.abstractmethod async def get_registered_devices(self) -> list[dict]: """ Get list of currently registered SIP devices. Returns: List of dicts with registration info: [{"uri": "sip:robert@...", "contact": "...", "expires": 3600}, ...] """ ... # ================================================================ # Trunk Status # ================================================================ @abc.abstractmethod async def get_trunk_status(self) -> dict: """ Get SIP trunk registration status. Returns: {"registered": True/False, "host": "...", "transport": "..."} """ ... class MockSIPEngine(SIPEngine): """ Mock SIP engine for testing. Simulates call lifecycle without any real SIP stack. """ def __init__(self): self._ready = False self._call_counter = 0 self._active_legs: dict[str, dict] = {} self._bridges: dict[str, tuple[str, str]] = {} self._registered_devices: list[dict] = [] async def start(self) -> None: self._ready = True async def stop(self) -> None: self._active_legs.clear() self._bridges.clear() self._ready = False async def is_ready(self) -> bool: return self._ready async def make_call(self, number: str, caller_id: Optional[str] = None) -> str: self._call_counter += 1 leg_id = f"mock_leg_{self._call_counter}" self._active_legs[leg_id] = { "number": number, "caller_id": caller_id, "state": "ringing", } return leg_id async def hangup(self, call_leg_id: str) -> None: self._active_legs.pop(call_leg_id, None) async def send_dtmf(self, call_leg_id: str, digits: str) -> None: if call_leg_id in self._active_legs: self._active_legs[call_leg_id].setdefault("dtmf_sent", []).append(digits) async def call_device(self, device: Device) -> str: self._call_counter += 1 leg_id = f"mock_device_leg_{self._call_counter}" self._active_legs[leg_id] = { "device_id": device.id, "device_name": device.name, "state": "ringing", } return leg_id async def bridge_calls(self, leg_a: str, leg_b: str) -> str: bridge_id = f"bridge_{leg_a}_{leg_b}" self._bridges[bridge_id] = (leg_a, leg_b) return bridge_id async def unbridge(self, bridge_id: str) -> None: self._bridges.pop(bridge_id, None) async def get_audio_stream(self, call_leg_id: str): """Yield empty audio frames for testing.""" import asyncio for _ in range(10): yield b"\x00" * 3200 # 100ms of silence at 16kHz 16-bit mono await asyncio.sleep(0.1) async def get_registered_devices(self) -> list[dict]: return self._registered_devices async def get_trunk_status(self) -> dict: return { "registered": False, "host": None, "transport": None, "mock": True, "reason": "No SIP trunk configured (mock mode)", }