Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
781 lines
29 KiB
Python
781 lines
29 KiB
Python
"""
|
|
Sippy Engine — SIP signaling via Sippy B2BUA.
|
|
|
|
Implements the SIPEngine interface using Sippy B2BUA for SIP signaling
|
|
(INVITE, BYE, REGISTER, DTMF) and delegates media handling to PJSUA2
|
|
via the MediaPipeline.
|
|
|
|
Architecture:
|
|
Sippy B2BUA → SIP signaling (call control, registration, DTMF)
|
|
PJSUA2 → Media anchor (conference bridge, audio tapping, recording)
|
|
|
|
Sippy B2BUA runs in its own thread (it has its own event loop).
|
|
We bridge async/sync via run_in_executor.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import uuid
|
|
from typing import Any, Callable, Optional
|
|
|
|
from core.sip_engine import SIPEngine
|
|
from models.device import Device, DeviceType
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ================================================================
|
|
# Sippy B2BUA Wrapper Types
|
|
# ================================================================
|
|
|
|
class SipCallLeg:
|
|
"""Tracks a single SIP call leg managed by Sippy."""
|
|
|
|
def __init__(self, leg_id: str, direction: str, remote_uri: str):
|
|
self.leg_id = leg_id
|
|
self.direction = direction # "outbound" or "inbound"
|
|
self.remote_uri = remote_uri
|
|
self.state = "init" # init, trying, ringing, connected, terminated
|
|
self.sippy_ua = None # Sippy UA object reference
|
|
self.media_port: Optional[int] = None # PJSUA2 conf bridge port
|
|
self.dtmf_buffer: list[str] = []
|
|
|
|
def __repr__(self):
|
|
return f"<SipCallLeg {self.leg_id} {self.direction} {self.state} → {self.remote_uri}>"
|
|
|
|
|
|
class SipBridge:
|
|
"""Two call legs bridged together."""
|
|
|
|
def __init__(self, bridge_id: str, leg_a: str, leg_b: str):
|
|
self.bridge_id = bridge_id
|
|
self.leg_a = leg_a
|
|
self.leg_b = leg_b
|
|
|
|
def __repr__(self):
|
|
return f"<SipBridge {self.bridge_id}: {self.leg_a} ↔ {self.leg_b}>"
|
|
|
|
|
|
# ================================================================
|
|
# Sippy B2BUA Event Handlers
|
|
# ================================================================
|
|
|
|
class SippyCallController:
|
|
"""
|
|
Handles Sippy B2BUA callbacks for a single call leg.
|
|
|
|
Sippy B2BUA uses a callback model — when SIP events happen
|
|
(180 Ringing, 200 OK, BYE, etc.), the corresponding method
|
|
is called on this controller.
|
|
"""
|
|
|
|
def __init__(self, leg: SipCallLeg, engine: "SippyEngine"):
|
|
self.leg = leg
|
|
self.engine = engine
|
|
|
|
def on_trying(self):
|
|
"""100 Trying received."""
|
|
self.leg.state = "trying"
|
|
logger.debug(f" {self.leg.leg_id}: 100 Trying")
|
|
|
|
def on_ringing(self, ringing_code: int = 180):
|
|
"""180 Ringing / 183 Session Progress received."""
|
|
self.leg.state = "ringing"
|
|
logger.info(f" {self.leg.leg_id}: {ringing_code} Ringing")
|
|
if self.engine._on_leg_state_change:
|
|
self.engine._loop.call_soon_threadsafe(
|
|
self.engine._on_leg_state_change, self.leg.leg_id, "ringing"
|
|
)
|
|
|
|
def on_connected(self, sdp_body: Optional[str] = None):
|
|
"""200 OK — call connected, media negotiated."""
|
|
self.leg.state = "connected"
|
|
logger.info(f" {self.leg.leg_id}: Connected")
|
|
|
|
# Extract remote RTP endpoint from SDP for PJSUA2 media bridge
|
|
if sdp_body and self.engine.media_pipeline:
|
|
try:
|
|
remote_rtp = self.engine._parse_sdp_rtp_endpoint(sdp_body)
|
|
if remote_rtp:
|
|
port = self.engine.media_pipeline.add_remote_stream(
|
|
self.leg.leg_id,
|
|
remote_rtp["host"],
|
|
remote_rtp["port"],
|
|
remote_rtp["codec"],
|
|
)
|
|
self.leg.media_port = port
|
|
except Exception as e:
|
|
logger.error(f" Failed to set up media for {self.leg.leg_id}: {e}")
|
|
|
|
if self.engine._on_leg_state_change:
|
|
self.engine._loop.call_soon_threadsafe(
|
|
self.engine._on_leg_state_change, self.leg.leg_id, "connected"
|
|
)
|
|
|
|
def on_disconnected(self, reason: str = ""):
|
|
"""BYE received or call terminated."""
|
|
self.leg.state = "terminated"
|
|
logger.info(f" {self.leg.leg_id}: Disconnected ({reason})")
|
|
|
|
# Clean up media
|
|
if self.engine.media_pipeline and self.leg.media_port is not None:
|
|
try:
|
|
self.engine.media_pipeline.remove_stream(self.leg.leg_id)
|
|
except Exception as e:
|
|
logger.error(f" Failed to clean up media for {self.leg.leg_id}: {e}")
|
|
|
|
if self.engine._on_leg_state_change:
|
|
self.engine._loop.call_soon_threadsafe(
|
|
self.engine._on_leg_state_change, self.leg.leg_id, "terminated"
|
|
)
|
|
|
|
def on_dtmf(self, digit: str):
|
|
"""DTMF digit received (RFC 2833 or SIP INFO)."""
|
|
self.leg.dtmf_buffer.append(digit)
|
|
logger.debug(f" {self.leg.leg_id}: DTMF '{digit}'")
|
|
|
|
|
|
# ================================================================
|
|
# Main Engine
|
|
# ================================================================
|
|
|
|
class SippyEngine(SIPEngine):
|
|
"""
|
|
SIP engine using Sippy B2BUA for signaling.
|
|
|
|
Sippy B2BUA handles:
|
|
- SIP REGISTER (trunk registration + device registration)
|
|
- SIP INVITE / ACK / BYE (call setup/teardown)
|
|
- SIP INFO / RFC 2833 (DTMF)
|
|
- SDP negotiation (we extract RTP endpoints for PJSUA2)
|
|
|
|
Media is handled by PJSUA2's conference bridge (see MediaPipeline).
|
|
Sippy only needs to know about SDP — PJSUA2 handles the actual RTP.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
sip_address: str = "0.0.0.0",
|
|
sip_port: int = 5060,
|
|
trunk_host: str = "",
|
|
trunk_port: int = 5060,
|
|
trunk_username: str = "",
|
|
trunk_password: str = "",
|
|
trunk_transport: str = "udp",
|
|
domain: str = "gateway.local",
|
|
did: str = "",
|
|
media_pipeline=None, # MediaPipeline instance
|
|
on_leg_state_change: Optional[Callable] = None,
|
|
on_device_registered: Optional[Callable] = None,
|
|
):
|
|
# SIP config
|
|
self._sip_address = sip_address
|
|
self._sip_port = sip_port
|
|
self._trunk_host = trunk_host
|
|
self._trunk_port = trunk_port
|
|
self._trunk_username = trunk_username
|
|
self._trunk_password = trunk_password
|
|
self._trunk_transport = trunk_transport
|
|
self._domain = domain
|
|
self._did = did
|
|
|
|
# Media pipeline (PJSUA2)
|
|
self.media_pipeline = media_pipeline
|
|
|
|
# Callbacks for async state changes
|
|
self._on_leg_state_change = on_leg_state_change
|
|
self._on_device_registered = on_device_registered
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
|
|
# State
|
|
self._ready = False
|
|
self._trunk_registered = False
|
|
self._legs: dict[str, SipCallLeg] = {}
|
|
self._bridges: dict[str, SipBridge] = {}
|
|
self._registered_devices: list[dict] = []
|
|
|
|
# Sippy B2BUA internals (set during start)
|
|
self._sippy_global_config: dict[str, Any] = {}
|
|
self._sippy_thread: Optional[threading.Thread] = None
|
|
|
|
# ================================================================
|
|
# Lifecycle
|
|
# ================================================================
|
|
|
|
async def start(self) -> None:
|
|
"""Start the Sippy B2BUA SIP stack."""
|
|
self._loop = asyncio.get_running_loop()
|
|
logger.info("🔌 Starting Sippy B2BUA SIP engine...")
|
|
|
|
try:
|
|
from sippy.SipConf import SipConf
|
|
from sippy.SipTransactionManager import SipTransactionManager
|
|
|
|
# Configure Sippy
|
|
SipConf.my_address = self._sip_address
|
|
SipConf.my_port = self._sip_port
|
|
SipConf.my_uaname = "Hold Slayer Gateway"
|
|
|
|
self._sippy_global_config = {
|
|
"_sip_address": self._sip_address,
|
|
"_sip_port": self._sip_port,
|
|
"_sip_tm": None, # Transaction manager set after start
|
|
}
|
|
|
|
# Start Sippy's SIP transaction manager in a background thread
|
|
# Sippy uses its own event loop (Twisted reactor or custom loop)
|
|
self._sippy_thread = threading.Thread(
|
|
target=self._run_sippy_loop,
|
|
name="sippy-b2bua",
|
|
daemon=True,
|
|
)
|
|
self._sippy_thread.start()
|
|
|
|
# Register with trunk
|
|
if self._trunk_host:
|
|
await self._register_trunk()
|
|
|
|
self._ready = True
|
|
logger.info(
|
|
f"🔌 Sippy B2BUA ready on {self._sip_address}:{self._sip_port}"
|
|
)
|
|
|
|
except ImportError:
|
|
logger.warning(
|
|
"⚠️ Sippy B2BUA not installed — falling back to mock mode. "
|
|
"Install with: pip install sippy"
|
|
)
|
|
self._ready = True
|
|
self._trunk_registered = False
|
|
|
|
def _run_sippy_loop(self):
|
|
"""Run Sippy B2BUA's event loop in a dedicated thread."""
|
|
try:
|
|
from sippy.SipTransactionManager import SipTransactionManager
|
|
from sippy.Timeout import Timeout
|
|
|
|
# Initialize Sippy's transaction manager
|
|
stm = SipTransactionManager(self._sippy_global_config, self._handle_sippy_request)
|
|
self._sippy_global_config["_sip_tm"] = stm
|
|
|
|
logger.info(" Sippy transaction manager started")
|
|
|
|
# Sippy will block here in its event loop
|
|
# For the Twisted-based version, this runs the reactor
|
|
# For the asyncore version, this runs asyncore.loop()
|
|
from sippy.Core.EventDispatcher import ED
|
|
ED.loop()
|
|
|
|
except Exception as e:
|
|
logger.error(f" Sippy event loop crashed: {e}")
|
|
|
|
def _handle_sippy_request(self, req, sip_t):
|
|
"""
|
|
Handle incoming SIP requests from Sippy's transaction manager.
|
|
|
|
This is called in Sippy's thread for incoming INVITEs, etc.
|
|
"""
|
|
method = req.getMethod()
|
|
logger.info(f" Incoming SIP {method}")
|
|
|
|
if method == "INVITE":
|
|
self._handle_incoming_invite(req, sip_t)
|
|
elif method == "REGISTER":
|
|
self._handle_incoming_register(req, sip_t)
|
|
elif method == "BYE":
|
|
self._handle_incoming_bye(req, sip_t)
|
|
elif method == "INFO":
|
|
self._handle_incoming_info(req, sip_t)
|
|
|
|
def _handle_incoming_register(self, req, sip_t):
|
|
"""
|
|
Handle an incoming SIP REGISTER from a phone or softphone.
|
|
|
|
Extracts the AOR (address of record) from the To header, records
|
|
the contact and expiry, and sends a 200 OK. The gateway's
|
|
register_device() is called asynchronously via the event loop so
|
|
the phone gets an extension and SIP URI assigned automatically.
|
|
"""
|
|
try:
|
|
to_uri = str(req.getHFBody("to").getUri())
|
|
contact_hf = req.getHFBody("contact")
|
|
contact_uri = str(contact_hf.getUri()) if contact_hf else to_uri
|
|
expires_hf = req.getHFBody("expires")
|
|
expires = int(str(expires_hf)) if expires_hf else 3600
|
|
|
|
logger.info(f" SIP REGISTER: {to_uri} contact={contact_uri} expires={expires}")
|
|
|
|
if expires == 0:
|
|
# De-registration
|
|
self._registered_devices = [
|
|
d for d in self._registered_devices
|
|
if d.get("aor") != to_uri
|
|
]
|
|
logger.info(f" De-registered: {to_uri}")
|
|
else:
|
|
# Update or add registration record
|
|
existing = next(
|
|
(d for d in self._registered_devices if d.get("aor") == to_uri),
|
|
None,
|
|
)
|
|
if existing:
|
|
existing["contact"] = contact_uri
|
|
existing["expires"] = expires
|
|
else:
|
|
self._registered_devices.append({
|
|
"aor": to_uri,
|
|
"contact": contact_uri,
|
|
"expires": expires,
|
|
})
|
|
|
|
# Notify the gateway (async) so it can assign an extension
|
|
if self._loop:
|
|
self._loop.call_soon_threadsafe(
|
|
self._loop.create_task,
|
|
self._notify_registration(to_uri, contact_uri, expires),
|
|
)
|
|
|
|
# Reply 200 OK
|
|
req.sendResponse(200, "OK")
|
|
|
|
except Exception as e:
|
|
logger.error(f" REGISTER handling failed: {e}")
|
|
try:
|
|
req.sendResponse(500, "Server Error")
|
|
except Exception:
|
|
pass
|
|
|
|
async def _notify_registration(self, aor: str, contact: str, expires: int):
|
|
"""
|
|
Async callback: tell the gateway about the newly registered device
|
|
so it can assign an extension if needed.
|
|
"""
|
|
if self._on_device_registered:
|
|
await self._on_device_registered(aor, contact, expires)
|
|
|
|
def _handle_incoming_invite(self, req, sip_t):
|
|
"""Handle an incoming INVITE — create inbound call leg."""
|
|
from_uri = str(req.getHFBody("from").getUri())
|
|
to_uri = str(req.getHFBody("to").getUri())
|
|
|
|
leg_id = f"leg_{uuid.uuid4().hex[:12]}"
|
|
leg = SipCallLeg(leg_id, "inbound", from_uri)
|
|
leg.sippy_ua = sip_t.ua if hasattr(sip_t, "ua") else None
|
|
self._legs[leg_id] = leg
|
|
|
|
logger.info(f" Incoming call: {from_uri} → {to_uri} (leg: {leg_id})")
|
|
|
|
# Auto-answer for now (gateway always answers)
|
|
# In production, this would check routing rules
|
|
controller = SippyCallController(leg, self)
|
|
controller.on_connected(str(req.getBody()) if req.getBody() else None)
|
|
|
|
def _handle_incoming_bye(self, req, sip_t):
|
|
"""Handle incoming BYE — tear down call leg."""
|
|
# Find the leg by Sippy's UA object
|
|
for leg in self._legs.values():
|
|
if leg.sippy_ua and hasattr(sip_t, "ua") and leg.sippy_ua == sip_t.ua:
|
|
controller = SippyCallController(leg, self)
|
|
controller.on_disconnected("BYE received")
|
|
break
|
|
|
|
def _handle_incoming_info(self, req, sip_t):
|
|
"""Handle SIP INFO (DTMF via SIP INFO method)."""
|
|
body = str(req.getBody()) if req.getBody() else ""
|
|
if "dtmf" in body.lower() or "Signal=" in body:
|
|
# Extract DTMF digit from SIP INFO body
|
|
for line in body.split("\n"):
|
|
if line.startswith("Signal="):
|
|
digit = line.split("=")[1].strip()
|
|
for leg in self._legs.values():
|
|
if leg.sippy_ua and hasattr(sip_t, "ua") and leg.sippy_ua == sip_t.ua:
|
|
controller = SippyCallController(leg, self)
|
|
controller.on_dtmf(digit)
|
|
break
|
|
|
|
async def _register_trunk(self) -> None:
|
|
"""Register with the SIP trunk provider."""
|
|
try:
|
|
from sippy.UA import UA
|
|
from sippy.SipRegistrationAgent import SipRegistrationAgent
|
|
|
|
logger.info(f" Registering with trunk: {self._trunk_host}:{self._trunk_port}")
|
|
|
|
# Run registration in Sippy's thread
|
|
def do_register():
|
|
try:
|
|
reg_agent = SipRegistrationAgent(
|
|
self._sippy_global_config,
|
|
f"sip:{self._trunk_username}@{self._trunk_host}",
|
|
f"sip:{self._trunk_host}:{self._trunk_port}",
|
|
auth_name=self._trunk_username,
|
|
auth_password=self._trunk_password,
|
|
)
|
|
reg_agent.register()
|
|
self._trunk_registered = True
|
|
logger.info(" ✅ Trunk registration sent")
|
|
except Exception as e:
|
|
logger.error(f" ❌ Trunk registration failed: {e}")
|
|
self._trunk_registered = False
|
|
|
|
await asyncio.get_event_loop().run_in_executor(None, do_register)
|
|
|
|
except ImportError:
|
|
logger.warning(" Sippy registration agent not available")
|
|
self._trunk_registered = False
|
|
|
|
async def stop(self) -> None:
|
|
"""Gracefully shut down the SIP engine."""
|
|
logger.info("🔌 Stopping Sippy B2BUA...")
|
|
|
|
# Hang up all active legs
|
|
for leg_id in list(self._legs.keys()):
|
|
try:
|
|
await self.hangup(leg_id)
|
|
except Exception as e:
|
|
logger.error(f" Error hanging up {leg_id}: {e}")
|
|
|
|
# Stop Sippy's event loop
|
|
try:
|
|
from sippy.Core.EventDispatcher import ED
|
|
ED.breakLoop()
|
|
except Exception:
|
|
pass
|
|
|
|
if self._sippy_thread and self._sippy_thread.is_alive():
|
|
self._sippy_thread.join(timeout=5.0)
|
|
|
|
self._ready = False
|
|
self._trunk_registered = False
|
|
logger.info("🔌 Sippy B2BUA stopped")
|
|
|
|
async def is_ready(self) -> bool:
|
|
return self._ready
|
|
|
|
# ================================================================
|
|
# Outbound Calls
|
|
# ================================================================
|
|
|
|
async def make_call(self, number: str, caller_id: Optional[str] = None) -> str:
|
|
"""Place an outbound call via the SIP trunk."""
|
|
if not self._ready:
|
|
raise RuntimeError("SIP engine not ready")
|
|
|
|
leg_id = f"leg_{uuid.uuid4().hex[:12]}"
|
|
|
|
# Build SIP URI for the remote party via trunk
|
|
if self._trunk_host:
|
|
remote_uri = f"sip:{number}@{self._trunk_host}:{self._trunk_port}"
|
|
else:
|
|
remote_uri = f"sip:{number}@{self._domain}"
|
|
|
|
from_uri = f"sip:{caller_id or self._did}@{self._domain}"
|
|
|
|
leg = SipCallLeg(leg_id, "outbound", remote_uri)
|
|
self._legs[leg_id] = leg
|
|
|
|
logger.info(f"📞 Placing call: {from_uri} → {remote_uri} (leg: {leg_id})")
|
|
|
|
# Place the call via Sippy
|
|
def do_invite():
|
|
try:
|
|
from sippy.UA import UA
|
|
from sippy.SipCallId import SipCallId
|
|
from sippy.CCEvents import CCEventTry
|
|
|
|
controller = SippyCallController(leg, self)
|
|
|
|
# Create Sippy UA for this call
|
|
ua = UA(
|
|
self._sippy_global_config,
|
|
event_cb=controller,
|
|
nh_address=(self._trunk_host, self._trunk_port),
|
|
)
|
|
leg.sippy_ua = ua
|
|
|
|
# Generate SDP for the call
|
|
sdp_body = self._generate_sdp(leg_id)
|
|
|
|
# Send INVITE
|
|
event = CCEventTry(
|
|
(SipCallId(), from_uri, remote_uri),
|
|
body=sdp_body,
|
|
)
|
|
ua.recvEvent(event)
|
|
|
|
leg.state = "trying"
|
|
logger.info(f" INVITE sent for {leg_id}")
|
|
|
|
except ImportError:
|
|
# Sippy not installed — simulate for development
|
|
logger.warning(f" Sippy not installed, simulating call for {leg_id}")
|
|
leg.state = "ringing"
|
|
|
|
except Exception as e:
|
|
logger.error(f" Failed to send INVITE for {leg_id}: {e}")
|
|
leg.state = "terminated"
|
|
|
|
await asyncio.get_event_loop().run_in_executor(None, do_invite)
|
|
return leg_id
|
|
|
|
async def hangup(self, call_leg_id: str) -> None:
|
|
"""Hang up a call leg."""
|
|
leg = self._legs.get(call_leg_id)
|
|
if not leg:
|
|
logger.warning(f" Cannot hangup: leg {call_leg_id} not found")
|
|
return
|
|
|
|
def do_bye():
|
|
try:
|
|
if leg.sippy_ua:
|
|
from sippy.CCEvents import CCEventDisconnect
|
|
leg.sippy_ua.recvEvent(CCEventDisconnect())
|
|
except Exception as e:
|
|
logger.error(f" Error sending BYE for {call_leg_id}: {e}")
|
|
finally:
|
|
leg.state = "terminated"
|
|
|
|
await asyncio.get_event_loop().run_in_executor(None, do_bye)
|
|
|
|
# Clean up media
|
|
if self.media_pipeline and leg.media_port is not None:
|
|
self.media_pipeline.remove_stream(call_leg_id)
|
|
|
|
# Remove from tracking
|
|
self._legs.pop(call_leg_id, None)
|
|
|
|
# Clean up any bridges this leg was part of
|
|
for bridge_id, bridge in list(self._bridges.items()):
|
|
if bridge.leg_a == call_leg_id or bridge.leg_b == call_leg_id:
|
|
self._bridges.pop(bridge_id, None)
|
|
|
|
async def send_dtmf(self, call_leg_id: str, digits: str) -> None:
|
|
"""Send DTMF tones on a call leg."""
|
|
leg = self._legs.get(call_leg_id)
|
|
if not leg:
|
|
raise ValueError(f"Call leg {call_leg_id} not found")
|
|
|
|
logger.info(f" 📱 Sending DTMF '{digits}' on {call_leg_id}")
|
|
|
|
def do_dtmf():
|
|
try:
|
|
if leg.sippy_ua:
|
|
# Send via RFC 2833 (in-band RTP event)
|
|
# Sippy handles this through the UA's DTMF sender
|
|
for digit in digits:
|
|
from sippy.CCEvents import CCEventInfo
|
|
body = f"Signal={digit}\r\nDuration=160\r\n"
|
|
leg.sippy_ua.recvEvent(CCEventInfo(body=body))
|
|
else:
|
|
logger.warning(f" No UA for {call_leg_id}, DTMF not sent")
|
|
except ImportError:
|
|
logger.warning(f" Sippy not installed, DTMF simulated: {digits}")
|
|
except Exception as e:
|
|
logger.error(f" DTMF send error: {e}")
|
|
|
|
await asyncio.get_event_loop().run_in_executor(None, do_dtmf)
|
|
|
|
# ================================================================
|
|
# Device Calls (for transfer)
|
|
# ================================================================
|
|
|
|
async def call_device(self, device: Device) -> str:
|
|
"""Place a call to a registered device."""
|
|
if device.type in (DeviceType.SIP_PHONE, DeviceType.SOFTPHONE, DeviceType.WEBRTC):
|
|
if not device.sip_uri:
|
|
raise ValueError(f"Device {device.id} has no SIP URI")
|
|
# Direct SIP call to device's registered contact
|
|
return await self._call_sip_device(device)
|
|
elif device.type == DeviceType.CELL:
|
|
if not device.phone_number:
|
|
raise ValueError(f"Device {device.id} has no phone number")
|
|
# Call cell phone via trunk
|
|
return await self.make_call(device.phone_number)
|
|
else:
|
|
raise ValueError(f"Unsupported device type: {device.type}")
|
|
|
|
async def _call_sip_device(self, device: Device) -> str:
|
|
"""Place a direct SIP call to a registered device."""
|
|
leg_id = f"leg_{uuid.uuid4().hex[:12]}"
|
|
leg = SipCallLeg(leg_id, "outbound", device.sip_uri)
|
|
self._legs[leg_id] = leg
|
|
|
|
logger.info(f"📱 Calling device: {device.name} ({device.sip_uri}) (leg: {leg_id})")
|
|
|
|
def do_invite_device():
|
|
try:
|
|
from sippy.UA import UA
|
|
from sippy.CCEvents import CCEventTry
|
|
from sippy.SipCallId import SipCallId
|
|
|
|
controller = SippyCallController(leg, self)
|
|
|
|
# Parse device SIP URI for routing
|
|
# sip:robert@192.168.1.100:5060
|
|
uri_parts = device.sip_uri.replace("sip:", "").split("@")
|
|
if len(uri_parts) == 2:
|
|
host_parts = uri_parts[1].split(":")
|
|
host = host_parts[0]
|
|
port = int(host_parts[1]) if len(host_parts) > 1 else 5060
|
|
else:
|
|
host = self._domain
|
|
port = 5060
|
|
|
|
ua = UA(
|
|
self._sippy_global_config,
|
|
event_cb=controller,
|
|
nh_address=(host, port),
|
|
)
|
|
leg.sippy_ua = ua
|
|
|
|
sdp_body = self._generate_sdp(leg_id)
|
|
|
|
event = CCEventTry(
|
|
(SipCallId(), f"sip:gateway@{self._domain}", device.sip_uri),
|
|
body=sdp_body,
|
|
)
|
|
ua.recvEvent(event)
|
|
leg.state = "trying"
|
|
|
|
except ImportError:
|
|
logger.warning(f" Sippy not installed, simulating device call for {leg_id}")
|
|
leg.state = "ringing"
|
|
except Exception as e:
|
|
logger.error(f" Failed to call device {device.name}: {e}")
|
|
leg.state = "terminated"
|
|
|
|
await asyncio.get_event_loop().run_in_executor(None, do_invite_device)
|
|
return leg_id
|
|
|
|
# ================================================================
|
|
# Conference Bridge / Media
|
|
# ================================================================
|
|
|
|
async def bridge_calls(self, leg_a: str, leg_b: str) -> str:
|
|
"""Bridge two call legs together via PJSUA2 conference bridge."""
|
|
bridge_id = f"bridge_{uuid.uuid4().hex[:8]}"
|
|
|
|
leg_a_obj = self._legs.get(leg_a)
|
|
leg_b_obj = self._legs.get(leg_b)
|
|
|
|
if not leg_a_obj or not leg_b_obj:
|
|
raise ValueError(f"One or both legs not found: {leg_a}, {leg_b}")
|
|
|
|
logger.info(f"🔗 Bridging {leg_a} ↔ {leg_b} (bridge: {bridge_id})")
|
|
|
|
if self.media_pipeline:
|
|
# Use PJSUA2 conference bridge for actual media bridging
|
|
self.media_pipeline.bridge_streams(leg_a, leg_b)
|
|
else:
|
|
logger.warning(" No media pipeline — bridge is signaling-only")
|
|
|
|
self._bridges[bridge_id] = SipBridge(bridge_id, leg_a, leg_b)
|
|
return bridge_id
|
|
|
|
async def unbridge(self, bridge_id: str) -> None:
|
|
"""Remove a bridge."""
|
|
bridge = self._bridges.pop(bridge_id, None)
|
|
if bridge and self.media_pipeline:
|
|
self.media_pipeline.unbridge_streams(bridge.leg_a, bridge.leg_b)
|
|
|
|
def get_audio_stream(self, call_leg_id: str):
|
|
"""
|
|
Get a real-time audio stream from a call leg.
|
|
|
|
Taps into PJSUA2's conference bridge to get audio frames
|
|
for classification and transcription.
|
|
"""
|
|
if self.media_pipeline:
|
|
return self.media_pipeline.get_audio_tap(call_leg_id)
|
|
else:
|
|
# Fallback: yield silence frames
|
|
return self._silence_stream()
|
|
|
|
async def _silence_stream(self):
|
|
"""Yield silence frames when no media pipeline is available."""
|
|
for _ in range(100):
|
|
yield b"\x00" * 3200 # 100ms of silence at 16kHz 16-bit mono
|
|
await asyncio.sleep(0.1)
|
|
|
|
# ================================================================
|
|
# Registration
|
|
# ================================================================
|
|
|
|
async def get_registered_devices(self) -> list[dict]:
|
|
"""Get list of currently registered SIP devices."""
|
|
return list(self._registered_devices)
|
|
|
|
# ================================================================
|
|
# Trunk Status
|
|
# ================================================================
|
|
|
|
async def get_trunk_status(self) -> dict:
|
|
"""Get SIP trunk registration status."""
|
|
return {
|
|
"registered": self._trunk_registered,
|
|
"host": self._trunk_host or "not configured",
|
|
"port": self._trunk_port,
|
|
"transport": self._trunk_transport,
|
|
"username": self._trunk_username,
|
|
"active_legs": len(self._legs),
|
|
"active_bridges": len(self._bridges),
|
|
}
|
|
|
|
# ================================================================
|
|
# SDP Helpers
|
|
# ================================================================
|
|
|
|
def _generate_sdp(self, leg_id: str) -> str:
|
|
"""
|
|
Generate SDP body for a call.
|
|
|
|
If MediaPipeline is available, get the actual RTP listen address
|
|
from PJSUA2. Otherwise, generate a basic SDP.
|
|
"""
|
|
if self.media_pipeline:
|
|
rtp_port = self.media_pipeline.allocate_rtp_port(leg_id)
|
|
rtp_host = self._sip_address if self._sip_address != "0.0.0.0" else "127.0.0.1"
|
|
else:
|
|
rtp_port = 10000 + (hash(leg_id) % 50000)
|
|
rtp_host = self._sip_address if self._sip_address != "0.0.0.0" else "127.0.0.1"
|
|
|
|
return (
|
|
f"v=0\r\n"
|
|
f"o=holdslayer 0 0 IN IP4 {rtp_host}\r\n"
|
|
f"s=Hold Slayer Gateway\r\n"
|
|
f"c=IN IP4 {rtp_host}\r\n"
|
|
f"t=0 0\r\n"
|
|
f"m=audio {rtp_port} RTP/AVP 0 8 101\r\n"
|
|
f"a=rtpmap:0 PCMU/8000\r\n"
|
|
f"a=rtpmap:8 PCMA/8000\r\n"
|
|
f"a=rtpmap:101 telephone-event/8000\r\n"
|
|
f"a=fmtp:101 0-16\r\n"
|
|
f"a=sendrecv\r\n"
|
|
)
|
|
|
|
@staticmethod
|
|
def _parse_sdp_rtp_endpoint(sdp: str) -> Optional[dict]:
|
|
"""Extract RTP host/port/codec from SDP body."""
|
|
host = None
|
|
port = None
|
|
codec = "PCMU"
|
|
|
|
for line in sdp.split("\n"):
|
|
line = line.strip()
|
|
if line.startswith("c=IN IP4 "):
|
|
host = line.split(" ")[-1]
|
|
elif line.startswith("m=audio "):
|
|
parts = line.split(" ")
|
|
if len(parts) >= 2:
|
|
port = int(parts[1])
|
|
# First codec in the list
|
|
if len(parts) >= 4:
|
|
payload_type = parts[3]
|
|
codec_map = {"0": "PCMU", "8": "PCMA", "18": "G729"}
|
|
codec = codec_map.get(payload_type, "PCMU")
|
|
|
|
if host and port:
|
|
return {"host": host, "port": port, "codec": codec}
|
|
return None
|