Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
257 lines
8.5 KiB
Python
257 lines
8.5 KiB
Python
"""
|
|
Notification Service — Tell the user what's happening.
|
|
|
|
Sends notifications when:
|
|
- A human picks up (TRANSFER NOW!)
|
|
- Hold time estimates change
|
|
- Call fails or times out
|
|
- IVR navigation milestones
|
|
|
|
Supports multiple channels: WebSocket (always), SMS (optional),
|
|
push notifications (future).
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Any, Optional
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from config import Settings
|
|
from core.event_bus import EventBus
|
|
from models.events import EventType, GatewayEvent
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class NotificationChannel(str, Enum):
|
|
"""Where to send notifications."""
|
|
|
|
WEBSOCKET = "websocket"
|
|
SMS = "sms"
|
|
PUSH = "push"
|
|
|
|
|
|
class NotificationPriority(str, Enum):
|
|
"""How urgently to deliver."""
|
|
|
|
LOW = "low" # Status updates, hold time estimates
|
|
NORMAL = "normal" # IVR navigation milestones
|
|
HIGH = "high" # Human detected, call failed
|
|
CRITICAL = "critical" # Transfer happening NOW
|
|
|
|
|
|
class Notification(BaseModel):
|
|
"""A notification to send to the user."""
|
|
|
|
channel: NotificationChannel
|
|
priority: NotificationPriority
|
|
title: str
|
|
message: str
|
|
call_id: Optional[str] = None
|
|
data: dict[str, Any] = {}
|
|
timestamp: datetime = datetime.now()
|
|
|
|
|
|
class NotificationService:
|
|
"""
|
|
Sends notifications to users about call events.
|
|
|
|
Listens to the EventBus and routes events to the
|
|
appropriate notification channels.
|
|
"""
|
|
|
|
def __init__(self, event_bus: EventBus, settings: Settings):
|
|
self._event_bus = event_bus
|
|
self._settings = settings
|
|
self._task: Optional[asyncio.Task] = None
|
|
self._sms_sender: Optional[Any] = None
|
|
|
|
# Track what we've already notified (avoid spam)
|
|
self._notified: dict[str, set[str]] = {} # call_id -> set of event types
|
|
|
|
async def start(self) -> None:
|
|
"""Start listening for events to notify on."""
|
|
self._task = asyncio.create_task(self._listen_loop())
|
|
logger.info("📢 Notification service started")
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the notification listener."""
|
|
if self._task:
|
|
self._task.cancel()
|
|
try:
|
|
await self._task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("📢 Notification service stopped")
|
|
|
|
async def _listen_loop(self) -> None:
|
|
"""Main event listener loop."""
|
|
subscription = self._event_bus.subscribe()
|
|
try:
|
|
async for event in subscription:
|
|
try:
|
|
await self._handle_event(event)
|
|
except Exception as e:
|
|
logger.error(f"Notification handler error: {e}", exc_info=True)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
finally:
|
|
subscription.close()
|
|
|
|
async def _handle_event(self, event: GatewayEvent) -> None:
|
|
"""Route an event to the appropriate notification(s)."""
|
|
call_id = event.call_id or ""
|
|
|
|
# Initialize tracking for this call
|
|
if call_id and call_id not in self._notified:
|
|
self._notified[call_id] = set()
|
|
|
|
# Skip duplicate notifications
|
|
dedup_key = f"{event.type.value}:{event.data.get('step_id', '')}"
|
|
if call_id and dedup_key in self._notified.get(call_id, set()):
|
|
return
|
|
|
|
notification = self._event_to_notification(event)
|
|
if not notification:
|
|
return
|
|
|
|
# Mark as notified
|
|
if call_id:
|
|
self._notified[call_id].add(dedup_key)
|
|
|
|
# Send via all appropriate channels
|
|
await self._send(notification)
|
|
|
|
def _event_to_notification(self, event: GatewayEvent) -> Optional[Notification]:
|
|
"""Convert a gateway event to a notification (or None to skip)."""
|
|
|
|
if event.type == EventType.HUMAN_DETECTED:
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.CRITICAL,
|
|
title="🚨 Human Detected!",
|
|
message="A live person picked up — transferring you now!",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
elif event.type == EventType.TRANSFER_STARTED:
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.CRITICAL,
|
|
title="📞 Call Transferred",
|
|
message="Your call has been connected to the agent. Pick up your phone!",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
elif event.type == EventType.CALL_FAILED:
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.HIGH,
|
|
title="❌ Call Failed",
|
|
message=event.message or "The call couldn't be completed.",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
elif event.type == EventType.HOLD_DETECTED:
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.NORMAL,
|
|
title="⏳ On Hold",
|
|
message="You're on hold. We'll notify you when someone picks up.",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
elif event.type == EventType.IVR_STEP:
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.LOW,
|
|
title="📍 IVR Navigation",
|
|
message=event.message or "Navigating phone menu...",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
elif event.type == EventType.IVR_DTMF_SENT:
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.LOW,
|
|
title="📱 Button Pressed",
|
|
message=event.message or f"Pressed {event.data.get('digits', '?')}",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
elif event.type == EventType.CALL_ENDED:
|
|
# Clean up tracking
|
|
if event.call_id and event.call_id in self._notified:
|
|
del self._notified[event.call_id]
|
|
|
|
return Notification(
|
|
channel=NotificationChannel.WEBSOCKET,
|
|
priority=NotificationPriority.NORMAL,
|
|
title="📴 Call Ended",
|
|
message=event.message or "The call has ended.",
|
|
call_id=event.call_id,
|
|
data=event.data,
|
|
)
|
|
|
|
# Skip other event types (transcription, classification, etc.)
|
|
return None
|
|
|
|
async def _send(self, notification: Notification) -> None:
|
|
"""Send a notification via the appropriate channel."""
|
|
logger.info(
|
|
f"📢 [{notification.priority.value}] {notification.title}: "
|
|
f"{notification.message}"
|
|
)
|
|
|
|
# WebSocket notifications go through the event bus
|
|
# (the WebSocket handler in the API reads from EventBus directly)
|
|
|
|
# SMS for critical notifications
|
|
if (
|
|
notification.priority == NotificationPriority.CRITICAL
|
|
and self._settings.notify_sms_number
|
|
):
|
|
await self._send_sms(notification)
|
|
|
|
async def _send_sms(self, notification: Notification) -> None:
|
|
"""
|
|
Send an SMS notification.
|
|
|
|
Uses a simple HTTP-based SMS gateway. In production,
|
|
this would use Twilio, AWS SNS, or similar.
|
|
"""
|
|
phone = self._settings.notify_sms_number
|
|
if not phone:
|
|
return
|
|
|
|
try:
|
|
import httpx
|
|
|
|
# Generic webhook-based SMS (configure your provider)
|
|
# This is a placeholder — wire up your preferred SMS provider
|
|
logger.info(f"📱 SMS → {phone}: {notification.title}")
|
|
|
|
# Example: Twilio-style API
|
|
# async with httpx.AsyncClient() as client:
|
|
# await client.post(
|
|
# "https://api.twilio.com/2010-04-01/Accounts/.../Messages.json",
|
|
# data={
|
|
# "To": phone,
|
|
# "From": self._settings.sip_trunk.did,
|
|
# "Body": f"{notification.title}\n{notification.message}",
|
|
# },
|
|
# auth=(account_sid, auth_token),
|
|
# )
|
|
|
|
except Exception as e:
|
|
logger.error(f"SMS send failed: {e}")
|