""" Notification Service — Tell the user what's happening. Sends notifications when: - A human picks up (TRANSFER NOW!) - Hold time estimates change - Call fails or times out - IVR navigation milestones Supports multiple channels: WebSocket (always), SMS (optional), push notifications (future). """ import asyncio import logging from datetime import datetime from enum import Enum from typing import Any, Optional from pydantic import BaseModel from config import Settings from core.event_bus import EventBus from models.events import EventType, GatewayEvent logger = logging.getLogger(__name__) class NotificationChannel(str, Enum): """Where to send notifications.""" WEBSOCKET = "websocket" SMS = "sms" PUSH = "push" class NotificationPriority(str, Enum): """How urgently to deliver.""" LOW = "low" # Status updates, hold time estimates NORMAL = "normal" # IVR navigation milestones HIGH = "high" # Human detected, call failed CRITICAL = "critical" # Transfer happening NOW class Notification(BaseModel): """A notification to send to the user.""" channel: NotificationChannel priority: NotificationPriority title: str message: str call_id: Optional[str] = None data: dict[str, Any] = {} timestamp: datetime = datetime.now() class NotificationService: """ Sends notifications to users about call events. Listens to the EventBus and routes events to the appropriate notification channels. """ def __init__(self, event_bus: EventBus, settings: Settings): self._event_bus = event_bus self._settings = settings self._task: Optional[asyncio.Task] = None self._sms_sender: Optional[Any] = None # Track what we've already notified (avoid spam) self._notified: dict[str, set[str]] = {} # call_id -> set of event types async def start(self) -> None: """Start listening for events to notify on.""" self._task = asyncio.create_task(self._listen_loop()) logger.info("📢 Notification service started") async def stop(self) -> None: """Stop the notification listener.""" if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("📢 Notification service stopped") async def _listen_loop(self) -> None: """Main event listener loop.""" subscription = self._event_bus.subscribe() try: async for event in subscription: try: await self._handle_event(event) except Exception as e: logger.error(f"Notification handler error: {e}", exc_info=True) except asyncio.CancelledError: pass finally: subscription.close() async def _handle_event(self, event: GatewayEvent) -> None: """Route an event to the appropriate notification(s).""" call_id = event.call_id or "" # Initialize tracking for this call if call_id and call_id not in self._notified: self._notified[call_id] = set() # Skip duplicate notifications dedup_key = f"{event.type.value}:{event.data.get('step_id', '')}" if call_id and dedup_key in self._notified.get(call_id, set()): return notification = self._event_to_notification(event) if not notification: return # Mark as notified if call_id: self._notified[call_id].add(dedup_key) # Send via all appropriate channels await self._send(notification) def _event_to_notification(self, event: GatewayEvent) -> Optional[Notification]: """Convert a gateway event to a notification (or None to skip).""" if event.type == EventType.HUMAN_DETECTED: return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.CRITICAL, title="🚨 Human Detected!", message="A live person picked up — transferring you now!", call_id=event.call_id, data=event.data, ) elif event.type == EventType.TRANSFER_STARTED: return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.CRITICAL, title="📞 Call Transferred", message="Your call has been connected to the agent. Pick up your phone!", call_id=event.call_id, data=event.data, ) elif event.type == EventType.CALL_FAILED: return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.HIGH, title="❌ Call Failed", message=event.message or "The call couldn't be completed.", call_id=event.call_id, data=event.data, ) elif event.type == EventType.HOLD_DETECTED: return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.NORMAL, title="⏳ On Hold", message="You're on hold. We'll notify you when someone picks up.", call_id=event.call_id, data=event.data, ) elif event.type == EventType.IVR_STEP: return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.LOW, title="📍 IVR Navigation", message=event.message or "Navigating phone menu...", call_id=event.call_id, data=event.data, ) elif event.type == EventType.IVR_DTMF_SENT: return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.LOW, title="📱 Button Pressed", message=event.message or f"Pressed {event.data.get('digits', '?')}", call_id=event.call_id, data=event.data, ) elif event.type == EventType.CALL_ENDED: # Clean up tracking if event.call_id and event.call_id in self._notified: del self._notified[event.call_id] return Notification( channel=NotificationChannel.WEBSOCKET, priority=NotificationPriority.NORMAL, title="📴 Call Ended", message=event.message or "The call has ended.", call_id=event.call_id, data=event.data, ) # Skip other event types (transcription, classification, etc.) return None async def _send(self, notification: Notification) -> None: """Send a notification via the appropriate channel.""" logger.info( f"📢 [{notification.priority.value}] {notification.title}: " f"{notification.message}" ) # WebSocket notifications go through the event bus # (the WebSocket handler in the API reads from EventBus directly) # SMS for critical notifications if ( notification.priority == NotificationPriority.CRITICAL and self._settings.notify_sms_number ): await self._send_sms(notification) async def _send_sms(self, notification: Notification) -> None: """ Send an SMS notification. Uses a simple HTTP-based SMS gateway. In production, this would use Twilio, AWS SNS, or similar. """ phone = self._settings.notify_sms_number if not phone: return try: import httpx # Generic webhook-based SMS (configure your provider) # This is a placeholder — wire up your preferred SMS provider logger.info(f"📱 SMS → {phone}: {notification.title}") # Example: Twilio-style API # async with httpx.AsyncClient() as client: # await client.post( # "https://api.twilio.com/2010-04-01/Accounts/.../Messages.json", # data={ # "To": phone, # "From": self._settings.sip_trunk.did, # "Body": f"{notification.title}\n{notification.message}", # }, # auth=(account_sid, auth_token), # ) except Exception as e: logger.error(f"SMS send failed: {e}")