Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
402 lines
14 KiB
Python
402 lines
14 KiB
Python
"""
|
|
AI PSTN Gateway — The main orchestrator.
|
|
|
|
Ties together SIP engine, call manager, event bus, and all services.
|
|
This is the top-level object that FastAPI and MCP talk to.
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from config import Settings, get_settings
|
|
from core.call_manager import CallManager
|
|
from core.dial_plan import next_extension
|
|
from core.event_bus import EventBus
|
|
from core.media_pipeline import MediaPipeline
|
|
from core.sip_engine import MockSIPEngine, SIPEngine
|
|
from core.sippy_engine import SippyEngine
|
|
from models.call import ActiveCall, CallMode, CallStatus
|
|
from models.call_flow import CallFlow
|
|
from models.device import Device, DeviceType
|
|
from models.events import EventType, GatewayEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _build_sip_engine(settings: Settings, gateway: "AIPSTNGateway") -> SIPEngine:
|
|
"""Build the appropriate SIP engine from config."""
|
|
trunk = settings.sip_trunk
|
|
gw_sip = settings.gateway_sip
|
|
|
|
if trunk.host and trunk.host != "sip.provider.com":
|
|
# Real trunk configured — use Sippy B2BUA
|
|
try:
|
|
return SippyEngine(
|
|
sip_address=gw_sip.host,
|
|
sip_port=gw_sip.port,
|
|
trunk_host=trunk.host,
|
|
trunk_port=trunk.port,
|
|
trunk_username=trunk.username,
|
|
trunk_password=trunk.password,
|
|
trunk_transport=trunk.transport,
|
|
domain=gw_sip.domain,
|
|
did=trunk.did,
|
|
on_device_registered=gateway._on_sip_device_registered,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Could not create SippyEngine: {e} — using mock")
|
|
|
|
return MockSIPEngine()
|
|
|
|
|
|
class AIPSTNGateway:
|
|
"""
|
|
The AI PSTN Gateway.
|
|
|
|
Central coordination point for:
|
|
- SIP engine (signaling + media)
|
|
- Call manager (state + events)
|
|
- Hold Slayer service
|
|
- Audio classifier
|
|
- Transcription service
|
|
- Device management
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
settings: Settings,
|
|
sip_engine: Optional[SIPEngine] = None,
|
|
):
|
|
self.settings = settings
|
|
self.event_bus = EventBus()
|
|
self.call_manager = CallManager(self.event_bus)
|
|
self.sip_engine: SIPEngine = sip_engine or MockSIPEngine()
|
|
|
|
# Services (initialized in start())
|
|
self._hold_slayer = None
|
|
self._audio_classifier = None
|
|
self._transcription = None
|
|
|
|
# Device registry (loaded from DB on start)
|
|
self._devices: dict[str, Device] = {}
|
|
|
|
# Startup time
|
|
self._started_at: Optional[datetime] = None
|
|
|
|
@classmethod
|
|
def from_config(cls, sip_engine: Optional[SIPEngine] = None) -> "AIPSTNGateway":
|
|
"""Create gateway from environment config."""
|
|
settings = get_settings()
|
|
gw = cls(settings=settings)
|
|
if sip_engine is not None:
|
|
gw.sip_engine = sip_engine
|
|
else:
|
|
gw.sip_engine = _build_sip_engine(settings, gw)
|
|
return gw
|
|
|
|
# ================================================================
|
|
# Lifecycle
|
|
# ================================================================
|
|
|
|
async def start(self) -> None:
|
|
"""Boot the gateway — start SIP engine and services."""
|
|
logger.info("🔥 Starting AI PSTN Gateway...")
|
|
|
|
# Start SIP engine
|
|
await self.sip_engine.start()
|
|
logger.info(f" SIP Engine: ready")
|
|
|
|
# Import services here to avoid circular imports
|
|
from services.audio_classifier import AudioClassifier
|
|
from services.transcription import TranscriptionService
|
|
|
|
self._audio_classifier = AudioClassifier(self.settings.classifier)
|
|
self._transcription = TranscriptionService(self.settings.speaches)
|
|
|
|
self._started_at = datetime.now()
|
|
|
|
trunk_status = await self.sip_engine.get_trunk_status()
|
|
trunk_registered = trunk_status.get("registered", False)
|
|
logger.info(f" SIP Trunk: {'registered' if trunk_registered else 'not registered'}")
|
|
logger.info(f" Devices: {len(self._devices)} registered")
|
|
logger.info("\U0001f525 AI PSTN Gateway is LIVE")
|
|
|
|
# Publish trunk registration status so dashboards/WS clients know immediately
|
|
if trunk_registered:
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.SIP_TRUNK_REGISTERED,
|
|
message=f"SIP trunk registered with {trunk_status.get('host')}",
|
|
data=trunk_status,
|
|
))
|
|
else:
|
|
reason = trunk_status.get("reason", "Trunk registration failed or not configured")
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.SIP_TRUNK_REGISTRATION_FAILED,
|
|
message=f"SIP trunk not registered — {reason}",
|
|
data=trunk_status,
|
|
))
|
|
|
|
async def stop(self) -> None:
|
|
"""Gracefully shut down."""
|
|
logger.info("Shutting down AI PSTN Gateway...")
|
|
|
|
# End all active calls
|
|
for call_id in list(self.call_manager.active_calls.keys()):
|
|
call = self.call_manager.get_call(call_id)
|
|
if call:
|
|
await self.call_manager.end_call(call_id, CallStatus.CANCELLED)
|
|
|
|
# Stop SIP engine
|
|
await self.sip_engine.stop()
|
|
|
|
self._started_at = None
|
|
logger.info("Gateway shut down cleanly.")
|
|
|
|
@property
|
|
def uptime(self) -> Optional[int]:
|
|
"""Gateway uptime in seconds."""
|
|
if self._started_at:
|
|
return int((datetime.now() - self._started_at).total_seconds())
|
|
return None
|
|
|
|
# ================================================================
|
|
# Call Operations
|
|
# ================================================================
|
|
|
|
async def make_call(
|
|
self,
|
|
number: str,
|
|
mode: CallMode = CallMode.DIRECT,
|
|
intent: Optional[str] = None,
|
|
call_flow_id: Optional[str] = None,
|
|
device: Optional[str] = None,
|
|
services: Optional[list[str]] = None,
|
|
) -> ActiveCall:
|
|
"""
|
|
Place an outbound call.
|
|
|
|
This is the main entry point for all call types:
|
|
- direct: Call and connect to device immediately
|
|
- hold_slayer: Navigate IVR, wait on hold, transfer when human detected
|
|
- ai_assisted: Connect with transcription, recording, noise cancel
|
|
"""
|
|
# Create call in manager
|
|
call = await self.call_manager.create_call(
|
|
remote_number=number,
|
|
mode=mode,
|
|
intent=intent,
|
|
call_flow_id=call_flow_id,
|
|
device=device or self.settings.hold_slayer.default_transfer_device,
|
|
services=services,
|
|
)
|
|
|
|
# Place outbound call via SIP engine
|
|
try:
|
|
sip_leg_id = await self.sip_engine.make_call(
|
|
number=number,
|
|
caller_id=self.settings.sip_trunk.did,
|
|
)
|
|
self.call_manager.map_leg(sip_leg_id, call.id)
|
|
await self.call_manager.update_status(call.id, CallStatus.RINGING)
|
|
except Exception as e:
|
|
logger.error(f"Failed to place call: {e}")
|
|
await self.call_manager.update_status(call.id, CallStatus.FAILED)
|
|
raise
|
|
|
|
# If hold_slayer mode, launch the Hold Slayer service
|
|
if mode == CallMode.HOLD_SLAYER:
|
|
from services.hold_slayer import HoldSlayerService
|
|
|
|
hold_slayer = HoldSlayerService(
|
|
gateway=self,
|
|
call_manager=self.call_manager,
|
|
sip_engine=self.sip_engine,
|
|
classifier=self._audio_classifier,
|
|
transcription=self._transcription,
|
|
settings=self.settings,
|
|
)
|
|
# Launch as background task — don't block
|
|
import asyncio
|
|
asyncio.create_task(
|
|
hold_slayer.run(call, sip_leg_id, call_flow_id),
|
|
name=f"holdslayer_{call.id}",
|
|
)
|
|
|
|
return call
|
|
|
|
async def transfer_call(self, call_id: str, device_id: str) -> None:
|
|
"""Transfer an active call to a device."""
|
|
call = self.call_manager.get_call(call_id)
|
|
if not call:
|
|
raise ValueError(f"Call {call_id} not found")
|
|
|
|
device = self._devices.get(device_id)
|
|
if not device:
|
|
raise ValueError(f"Device {device_id} not found")
|
|
|
|
await self.call_manager.update_status(call_id, CallStatus.TRANSFERRING)
|
|
|
|
# Place call to device
|
|
device_leg_id = await self.sip_engine.call_device(device)
|
|
self.call_manager.map_leg(device_leg_id, call_id)
|
|
|
|
# Get the original PSTN leg
|
|
pstn_leg_id = None
|
|
for leg_id, cid in self.call_manager._call_legs.items():
|
|
if cid == call_id and leg_id != device_leg_id:
|
|
pstn_leg_id = leg_id
|
|
break
|
|
|
|
if pstn_leg_id:
|
|
# Bridge the PSTN leg and device leg
|
|
await self.sip_engine.bridge_calls(pstn_leg_id, device_leg_id)
|
|
await self.call_manager.update_status(call_id, CallStatus.BRIDGED)
|
|
else:
|
|
logger.error(f"Could not find PSTN leg for call {call_id}")
|
|
await self.call_manager.update_status(call_id, CallStatus.FAILED)
|
|
|
|
async def hangup_call(self, call_id: str) -> None:
|
|
"""Hang up a call."""
|
|
call = self.call_manager.get_call(call_id)
|
|
if not call:
|
|
raise ValueError(f"Call {call_id} not found")
|
|
|
|
# Hang up all legs associated with this call
|
|
for leg_id, cid in list(self.call_manager._call_legs.items()):
|
|
if cid == call_id:
|
|
await self.sip_engine.hangup(leg_id)
|
|
|
|
await self.call_manager.end_call(call_id)
|
|
|
|
def get_call(self, call_id: str) -> Optional[ActiveCall]:
|
|
"""Get an active call."""
|
|
return self.call_manager.get_call(call_id)
|
|
|
|
# ================================================================
|
|
# Device Management
|
|
# ================================================================
|
|
|
|
def register_device(self, device: Device) -> None:
|
|
"""Register a device with the gateway, auto-assigning an extension."""
|
|
# Auto-assign a 2XX extension if not already set
|
|
if device.extension is None:
|
|
used = {
|
|
d.extension
|
|
for d in self._devices.values()
|
|
if d.extension is not None
|
|
}
|
|
device.extension = next_extension(used)
|
|
|
|
# Build a sip_uri from the extension if not provided
|
|
if device.sip_uri is None and device.extension is not None:
|
|
domain = self.settings.gateway_sip.domain
|
|
device.sip_uri = f"sip:{device.extension}@{domain}"
|
|
|
|
self._devices[device.id] = device
|
|
logger.info(
|
|
f"📱 Device registered: {device.name} "
|
|
f"ext={device.extension} uri={device.sip_uri}"
|
|
)
|
|
|
|
def unregister_device(self, device_id: str) -> None:
|
|
"""Unregister a device."""
|
|
device = self._devices.pop(device_id, None)
|
|
if device:
|
|
logger.info(f"📱 Device unregistered: {device.name}")
|
|
|
|
async def _on_sip_device_registered(
|
|
self, aor: str, contact: str, expires: int
|
|
) -> None:
|
|
"""
|
|
Called by SippyEngine when a phone sends SIP REGISTER.
|
|
|
|
Finds or creates a Device entry and ensures it has an extension
|
|
and a sip_uri. Publishes a DEVICE_REGISTERED event on the bus.
|
|
"""
|
|
import uuid
|
|
|
|
# Look for an existing device with this AOR
|
|
existing = next(
|
|
(d for d in self._devices.values() if d.sip_uri == aor),
|
|
None,
|
|
)
|
|
if existing:
|
|
existing.is_online = expires > 0
|
|
existing.last_seen = datetime.now()
|
|
logger.info(
|
|
f"📱 Device refreshed: {existing.name} "
|
|
f"ext={existing.extension} expires={expires}"
|
|
)
|
|
if expires == 0:
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.DEVICE_OFFLINE,
|
|
message=f"{existing.name} (ext {existing.extension}) unregistered",
|
|
data={"device_id": existing.id, "aor": aor},
|
|
))
|
|
return
|
|
|
|
# New device — auto-register it
|
|
device_id = f"dev_{uuid.uuid4().hex[:8]}"
|
|
# Derive a friendly name from the AOR username (sip:alice@host → alice)
|
|
user_part = aor.split(":")[-1].split("@")[0] if ":" in aor else aor
|
|
dev = Device(
|
|
id=device_id,
|
|
name=user_part,
|
|
type="sip_phone",
|
|
sip_uri=aor,
|
|
is_online=True,
|
|
last_seen=datetime.now(),
|
|
)
|
|
self.register_device(dev) # assigns extension + sip_uri
|
|
await self.event_bus.publish(GatewayEvent(
|
|
type=EventType.DEVICE_REGISTERED,
|
|
message=(
|
|
f"{dev.name} registered as ext {dev.extension} "
|
|
f"({dev.sip_uri})"
|
|
),
|
|
data={
|
|
"device_id": dev.id,
|
|
"name": dev.name,
|
|
"extension": dev.extension,
|
|
"sip_uri": dev.sip_uri,
|
|
"contact": contact,
|
|
},
|
|
))
|
|
|
|
def preferred_device(self) -> Optional[Device]:
|
|
"""Get the highest-priority online device."""
|
|
online_devices = [
|
|
d for d in self._devices.values()
|
|
if d.can_receive_call
|
|
]
|
|
if online_devices:
|
|
return sorted(online_devices, key=lambda d: d.priority)[0]
|
|
|
|
# Fallback: any device that can receive calls (e.g., cell phone)
|
|
fallback = [
|
|
d for d in self._devices.values()
|
|
if d.type == DeviceType.CELL and d.phone_number
|
|
]
|
|
return sorted(fallback, key=lambda d: d.priority)[0] if fallback else None
|
|
|
|
@property
|
|
def devices(self) -> dict[str, Device]:
|
|
"""All registered devices."""
|
|
return dict(self._devices)
|
|
|
|
# ================================================================
|
|
# Status
|
|
# ================================================================
|
|
|
|
async def status(self) -> dict:
|
|
"""Full gateway status."""
|
|
trunk = await self.sip_engine.get_trunk_status()
|
|
return {
|
|
"uptime": self.uptime,
|
|
"trunk": trunk,
|
|
"devices": {d.id: {"name": d.name, "online": d.is_online} for d in self._devices.values()},
|
|
"active_calls": self.call_manager.active_call_count,
|
|
"event_subscribers": self.event_bus.subscriber_count,
|
|
}
|