diff --git a/api/call_history.py b/api/call_history.py new file mode 100644 index 0000000..7105945 --- /dev/null +++ b/api/call_history.py @@ -0,0 +1,126 @@ +""" +Call History API β€” Read-only access to persisted call records, +transcript chunks, and recording files for the dashboard. +""" + +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, Depends, HTTPException, Query +from fastapi.responses import FileResponse +from sqlalchemy import desc, select +from sqlalchemy.ext.asyncio import AsyncSession + +from db.database import ( + CallRecord, + RecordingRecord, + TranscriptChunk, + get_db, +) + +router = APIRouter() + + +@router.get("/history") +async def list_history( + limit: int = Query(50, ge=1, le=500), + offset: int = Query(0, ge=0), + number: Optional[str] = None, + status: Optional[str] = None, + since: Optional[datetime] = None, + until: Optional[datetime] = None, + db: AsyncSession = Depends(get_db), +): + """Paged list of past calls, newest first.""" + stmt = select(CallRecord).order_by(desc(CallRecord.started_at)) + if number: + stmt = stmt.where(CallRecord.remote_number == number) + if status: + stmt = stmt.where(CallRecord.status == status) + if since: + stmt = stmt.where(CallRecord.started_at >= since) + if until: + stmt = stmt.where(CallRecord.started_at <= until) + + rows = (await db.execute(stmt.offset(offset).limit(limit))).scalars().all() + return [ + { + "id": r.id, + "direction": r.direction, + "remote_number": r.remote_number, + "status": r.status, + "mode": r.mode, + "intent": r.intent, + "started_at": r.started_at.isoformat() if r.started_at else None, + "ended_at": r.ended_at.isoformat() if r.ended_at else None, + "duration": r.duration, + "hold_time": r.hold_time, + "device_used": r.device_used, + "summary": r.summary, + } + for r in rows + ] + + +@router.get("/{call_id}/record") +async def get_record(call_id: str, db: AsyncSession = Depends(get_db)): + """Full CallRecord with classification_timeline.""" + row = (await db.execute( + select(CallRecord).where(CallRecord.id == call_id) + )).scalar_one_or_none() + if not row: + raise HTTPException(status_code=404, detail=f"Call {call_id} not found") + return { + "id": row.id, + "direction": row.direction, + "remote_number": row.remote_number, + "status": row.status, + "mode": row.mode, + "intent": row.intent, + "started_at": row.started_at.isoformat() if row.started_at else None, + "ended_at": row.ended_at.isoformat() if row.ended_at else None, + "duration": row.duration, + "hold_time": row.hold_time, + "device_used": row.device_used, + "summary": row.summary, + "action_items": row.action_items, + "sentiment": row.sentiment, + "call_flow_id": row.call_flow_id, + "classification_timeline": row.classification_timeline, + } + + +@router.get("/{call_id}/transcript") +async def get_transcript(call_id: str, db: AsyncSession = Depends(get_db)): + """Ordered transcript chunks for a call.""" + rows = (await db.execute( + select(TranscriptChunk) + .where(TranscriptChunk.call_id == call_id) + .order_by(TranscriptChunk.seq) + )).scalars().all() + return [ + { + "seq": c.seq, + "t_offset_ms": c.t_offset_ms, + "speaker": c.speaker, + "text": c.text, + "confidence": c.confidence, + } + for c in rows + ] + + +@router.get("/{call_id}/recording") +async def get_recording(call_id: str, db: AsyncSession = Depends(get_db)): + """Stream the WAV recording for a call.""" + row = (await db.execute( + select(RecordingRecord) + .where(RecordingRecord.call_id == call_id) + .order_by(desc(RecordingRecord.started_at)) + )).scalar_one_or_none() + if not row or not row.path: + raise HTTPException(status_code=404, detail="Recording not found") + import os + if not os.path.exists(row.path): + raise HTTPException(status_code=404, detail="Recording file missing on disk") + return FileResponse(row.path, media_type="audio/wav", filename=os.path.basename(row.path)) diff --git a/api/routing.py b/api/routing.py new file mode 100644 index 0000000..9f7330e --- /dev/null +++ b/api/routing.py @@ -0,0 +1,85 @@ +""" +Routing Rules API β€” CRUD for inbound routing rules and per-device DND. +""" + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from api.deps import get_gateway +from core.gateway import AIPSTNGateway +from db.database import Device as DeviceDB +from db.database import get_db +from models.routing import ( + RoutingRule, + RoutingRuleCreate, + RoutingRuleUpdate, +) + +router = APIRouter() + + +@router.get("/rules", response_model=list[RoutingRule]) +async def list_rules(gateway: AIPSTNGateway = Depends(get_gateway)): + if gateway._routing is None: + raise HTTPException(status_code=503, detail="Routing service not ready") + return sorted(gateway._routing.rules, key=lambda r: (r.priority, r.id)) + + +@router.post("/rules", response_model=RoutingRule, status_code=201) +async def create_rule( + payload: RoutingRuleCreate, + gateway: AIPSTNGateway = Depends(get_gateway), +): + if gateway._routing is None: + raise HTTPException(status_code=503, detail="Routing service not ready") + return await gateway._routing.create_rule(payload) + + +@router.put("/rules/{rule_id}", response_model=RoutingRule) +async def update_rule( + rule_id: str, + payload: RoutingRuleUpdate, + gateway: AIPSTNGateway = Depends(get_gateway), +): + if gateway._routing is None: + raise HTTPException(status_code=503, detail="Routing service not ready") + rule = await gateway._routing.update_rule(rule_id, payload) + if rule is None: + raise HTTPException(status_code=404, detail=f"Rule {rule_id} not found") + return rule + + +@router.delete("/rules/{rule_id}") +async def delete_rule( + rule_id: str, + gateway: AIPSTNGateway = Depends(get_gateway), +): + if gateway._routing is None: + raise HTTPException(status_code=503, detail="Routing service not ready") + ok = await gateway._routing.delete_rule(rule_id) + if not ok: + raise HTTPException(status_code=404, detail=f"Rule {rule_id} not found") + return {"status": "deleted", "rule_id": rule_id} + + +@router.patch("/devices/{device_id}/dnd") +async def set_device_dnd( + device_id: str, + payload: dict, + gateway: AIPSTNGateway = Depends(get_gateway), + db: AsyncSession = Depends(get_db), +): + """Toggle Do-Not-Disturb on a device.""" + enabled = bool(payload.get("enabled", True)) + device = gateway.devices.get(device_id) + if not device: + raise HTTPException(status_code=404, detail=f"Device {device_id} not found") + device.dnd = enabled + + result = await db.execute(select(DeviceDB).where(DeviceDB.id == device_id)) + row = result.scalar_one_or_none() + if row is not None: + row.dnd = enabled + + return {"device_id": device_id, "dnd": enabled} diff --git a/config.py b/config.py index 362b83a..52a3dd6 100644 --- a/config.py +++ b/config.py @@ -76,6 +76,38 @@ class HoldSlayerSettings(BaseSettings): hold_check_interval: float = Field(default=2.0, validation_alias="HOLD_CHECK_INTERVAL") +class TTSSettings(BaseSettings): + """Rhema TTS service configuration (OpenAI-compatible /v1/audio/speech).""" + + model_config = SettingsConfigDict(env_prefix="TTS_") + + base_url: str = "http://localhost:8000" + model: str = "speaches-ai/Kokoro-82M-v1.0-ONNX" + voice: str = "af_heart" + api_key: str = "" + timeout: float = 30.0 + sample_rate: int = 16000 + + +class ReceptionistSettings(BaseSettings): + """AI Receptionist behavior settings.""" + + model_config = SettingsConfigDict(env_prefix="RECEPTIONIST_") + + enabled: bool = True + greeting_template: str = ( + "Hi, you've reached Robert's line. Who's calling, and what's this about?" + ) + message_prompt: str = "Please leave your message after the tone." + listen_timeout_s: float = 15.0 + end_of_utterance_silence_s: float = 1.2 + message_max_seconds: int = 90 + llm_persona: str = ( + "You are a helpful, concise phone receptionist. Decide whether to ring " + "the owner, take a message, or politely decline." + ) + + class Settings(BaseSettings): """Root application settings.""" @@ -104,6 +136,8 @@ class Settings(BaseSettings): classifier: ClassifierSettings = Field(default_factory=ClassifierSettings) llm: LLMSettings = Field(default_factory=LLMSettings) hold_slayer: HoldSlayerSettings = Field(default_factory=HoldSlayerSettings) + tts: TTSSettings = Field(default_factory=TTSSettings) + receptionist: ReceptionistSettings = Field(default_factory=ReceptionistSettings) # Singleton diff --git a/core/call_manager.py b/core/call_manager.py index 8c33794..1eb229e 100644 --- a/core/call_manager.py +++ b/core/call_manager.py @@ -30,6 +30,7 @@ class CallManager: self.event_bus = event_bus self._active_calls: dict[str, ActiveCall] = {} self._call_legs: dict[str, str] = {} # SIP leg ID -> call ID mapping + self._on_call_ended = None # async callback(call: ActiveCall, final_status) # ================================================================ # Call Lifecycle @@ -164,6 +165,11 @@ class CallManager: }, message=f"πŸ“΅ Call ended: {call.remote_number} ({call.duration}s, hold: {call.hold_time}s)", )) + if self._on_call_ended is not None: + try: + await self._on_call_ended(call, status) + except Exception as e: + logger.warning(f"on_call_ended hook failed for {call_id}: {e}") return call # ================================================================ diff --git a/core/gateway.py b/core/gateway.py index c278c1d..2eb8445 100644 --- a/core/gateway.py +++ b/core/gateway.py @@ -24,6 +24,20 @@ from models.events import EventType, GatewayEvent logger = logging.getLogger(__name__) +def _extract_number(sip_uri: str) -> str: + """Pull the user part out of a SIP URI (sip:+15551212@host β†’ +15551212).""" + if not sip_uri: + return "" + s = sip_uri.strip() + if s.startswith("<") and ">" in s: + s = s[1:s.index(">")] + if s.startswith("sip:"): + s = s[4:] + if "@" in s: + s = s.split("@", 1)[0] + return s + + def _build_sip_engine(settings: Settings, gateway: "AIPSTNGateway") -> SIPEngine: """Build the appropriate SIP engine from config.""" trunk = settings.sip_trunk @@ -42,7 +56,9 @@ def _build_sip_engine(settings: Settings, gateway: "AIPSTNGateway") -> SIPEngine trunk_transport=trunk.transport, domain=gw_sip.domain, did=trunk.did, + media_pipeline=gateway.media_pipeline, on_device_registered=gateway._on_sip_device_registered, + on_incoming_call=gateway._on_sip_incoming_call, ) except Exception as e: logger.warning(f"Could not create SippyEngine: {e} β€” using mock") @@ -71,12 +87,16 @@ class AIPSTNGateway: self.settings = settings self.event_bus = EventBus() self.call_manager = CallManager(self.event_bus) + self.media_pipeline = MediaPipeline(sample_rate=16000) self.sip_engine: SIPEngine = sip_engine or MockSIPEngine() # Services (initialized in start()) self._hold_slayer = None self._audio_classifier = None self._transcription = None + self._tts = None + self._routing = None + self._receptionist = None # Device registry (loaded from DB on start) self._devices: dict[str, Device] = {} @@ -103,6 +123,9 @@ class AIPSTNGateway: """Boot the gateway β€” start SIP engine and services.""" logger.info("πŸ”₯ Starting AI PSTN Gateway...") + # Start media pipeline first so SIP engine can hand it RTP streams + await self.media_pipeline.start() + # Start SIP engine await self.sip_engine.start() logger.info(f" SIP Engine: ready") @@ -110,9 +133,21 @@ class AIPSTNGateway: # Import services here to avoid circular imports from services.audio_classifier import AudioClassifier from services.transcription import TranscriptionService + from services.tts import TTSService + from services.routing import RoutingService + from services.receptionist import ReceptionistService self._audio_classifier = AudioClassifier(self.settings.classifier) self._transcription = TranscriptionService(self.settings.speaches) + self._tts = TTSService(self.settings.tts) + self._routing = RoutingService(self) + await self._routing.start() + self._receptionist = ReceptionistService(self) + + # Persist completed calls to the database for history/playback. + from services.call_persistence import persist_call_on_end + + self.call_manager._on_call_ended = persist_call_on_end self._started_at = datetime.now() @@ -150,6 +185,18 @@ class AIPSTNGateway: # Stop SIP engine await self.sip_engine.stop() + # Stop media pipeline last (after SIP no longer references streams) + try: + await self.media_pipeline.stop() + except Exception as e: + logger.error(f"Media pipeline stop error: {e}") + + if self._tts is not None: + try: + await self._tts.close() + except Exception: + pass + self._started_at = None logger.info("Gateway shut down cleanly.") @@ -215,6 +262,7 @@ class AIPSTNGateway: classifier=self._audio_classifier, transcription=self._transcription, settings=self.settings, + tts=self._tts, ) # Launch as background task β€” don't block import asyncio @@ -364,6 +412,74 @@ class AIPSTNGateway: }, )) + async def _on_sip_incoming_call( + self, from_uri: str, to_uri: str, leg_id: str + ) -> None: + """ + Called by SippyEngine when an inbound INVITE arrives. + + Evaluates routing rules, then either: + - Rejects (rule says reject/DND) + - Answers + hands off to the AI Receptionist + """ + import uuid as _uuid + from models.call import CallMode, CallStatus + from models.routing import RoutingActionType + + caller_number = _extract_number(from_uri) + dnis = _extract_number(to_uri) + + # Create a call record so the dashboard sees the ringing call. + call = await self.call_manager.create_call( + remote_number=caller_number, + mode=CallMode.RECEPTIONIST, + intent=None, + call_flow_id=None, + device=None, + ) + # Mark inbound + call.direction = "inbound" + self.call_manager.map_leg(leg_id, call.id) + await self.call_manager.update_status(call.id, CallStatus.RINGING) + + decision = ( + await self._routing.evaluate(caller_number, dnis) + if self._routing is not None + else None + ) + + if decision is not None: + await self.event_bus.publish(GatewayEvent( + type=EventType.ROUTING_RULE_MATCHED, + call_id=call.id, + data={ + "matched_rule_id": decision.matched_rule_id, + "matched_rule_name": decision.matched_rule_name, + "action": decision.action.type.value, + "reason": decision.reason, + }, + message=decision.reason, + )) + + if decision.action.type in (RoutingActionType.REJECT, RoutingActionType.DND): + if hasattr(self.sip_engine, "reject_inbound"): + await self.sip_engine.reject_inbound(leg_id) + await self.call_manager.end_call(call.id, CallStatus.COMPLETED) + return + + # Answer the leg + if hasattr(self.sip_engine, "accept_inbound"): + await self.sip_engine.accept_inbound(leg_id) + await self.call_manager.update_status(call.id, CallStatus.CONNECTED) + + # Hand off to the AI Receptionist + if self._receptionist is not None and self.settings.receptionist.enabled: + import asyncio as _asyncio + _asyncio.create_task( + self._receptionist.handle(call, leg_id, decision), + name=f"receptionist_{call.id}", + ) + def preferred_device(self) -> Optional[Device]: """Get the highest-priority online device.""" online_devices = [ diff --git a/core/media_pipeline.py b/core/media_pipeline.py index ef370a3..2edf05b 100644 --- a/core/media_pipeline.py +++ b/core/media_pipeline.py @@ -103,6 +103,8 @@ class MediaStream: self.rtp_port: Optional[int] = None # Local RTP listen port self.taps: list[AudioTap] = [] self.recorder = None # PJSUA2 AudioMediaRecorder + self.player = None # PJSUA2 AudioMediaPlayer (active playback) + self.play_lock = asyncio.Lock() # Serializes playback per stream self.active = True def __repr__(self): @@ -505,6 +507,72 @@ class MediaPipeline: except Exception as e: logger.error(f" Tone generation error: {e}") + # ================================================================ + # WAV Playback (TTS prompts, SPEAK steps, receptionist greetings) + # ================================================================ + + async def play_wav(self, stream_id: str, filepath: str) -> bool: + """ + Play a WAV file into the given stream, awaiting completion. + + Playback is serialized per stream β€” if another playback is in + flight on the same stream this call waits for it to finish. + Falls back to a duration-based sleep when PJSUA2 is unavailable. + """ + stream = self._streams.get(stream_id) + if not stream: + logger.warning(f" Cannot play WAV: stream {stream_id} not found") + return False + + async with stream.play_lock: + duration_s = self._wav_duration_seconds(filepath) + + if self._endpoint: + try: + import pjsua2 as pj + + player = pj.AudioMediaPlayer() + # PJMEDIA_FILE_NO_LOOP == 1 + player.createPlayer(filepath, 1) + stream.player = player + + # In a full PJSUA2 integration: + # player.getAudioMedia().startTransmit(stream.audio_media) + # We don't hold the AudioMedia ref here in stub mode. + logger.info( + f" πŸ”Š Playing {filepath} on {stream_id} ({duration_s:.1f}s)" + ) + + await asyncio.sleep(duration_s) + stream.player = None + return True + + except ImportError: + logger.debug(f" PJSUA2 not available, virtual playback of {filepath}") + await asyncio.sleep(duration_s) + return True + except Exception as e: + logger.error(f" Failed to play {filepath} on {stream_id}: {e}") + stream.player = None + return False + else: + logger.info(f" πŸ”Š Playing {filepath} on {stream_id} (virtual, {duration_s:.1f}s)") + await asyncio.sleep(duration_s) + return True + + @staticmethod + def _wav_duration_seconds(filepath: str) -> float: + """Read WAV header to compute playback duration. Defaults to 2s on error.""" + try: + import wave + + with wave.open(filepath, "rb") as wf: + frames = wf.getnframes() + rate = wf.getframerate() or 16000 + return frames / float(rate) if frames else 2.0 + except Exception: + return 2.0 + # ================================================================ # Status # ================================================================ diff --git a/core/sippy_engine.py b/core/sippy_engine.py index 55f9b11..7e6a67b 100644 --- a/core/sippy_engine.py +++ b/core/sippy_engine.py @@ -168,6 +168,7 @@ class SippyEngine(SIPEngine): media_pipeline=None, # MediaPipeline instance on_leg_state_change: Optional[Callable] = None, on_device_registered: Optional[Callable] = None, + on_incoming_call: Optional[Callable] = None, ): # SIP config self._sip_address = sip_address @@ -186,6 +187,7 @@ class SippyEngine(SIPEngine): # Callbacks for async state changes self._on_leg_state_change = on_leg_state_change self._on_device_registered = on_device_registered + self._on_incoming_call = on_incoming_call self._loop: Optional[asyncio.AbstractEventLoop] = None # State @@ -355,21 +357,54 @@ class SippyEngine(SIPEngine): await self._on_device_registered(aor, contact, expires) def _handle_incoming_invite(self, req, sip_t): - """Handle an incoming INVITE β€” create inbound call leg.""" + """Handle an incoming INVITE β€” create inbound call leg. + + The gateway is notified via `on_incoming_call`; it decides + whether to answer (via `accept_inbound`) or reject the leg + based on routing rules. + """ from_uri = str(req.getHFBody("from").getUri()) to_uri = str(req.getHFBody("to").getUri()) leg_id = f"leg_{uuid.uuid4().hex[:12]}" leg = SipCallLeg(leg_id, "inbound", from_uri) leg.sippy_ua = sip_t.ua if hasattr(sip_t, "ua") else None + leg.pending_invite = req self._legs[leg_id] = leg logger.info(f" Incoming call: {from_uri} β†’ {to_uri} (leg: {leg_id})") - # Auto-answer for now (gateway always answers) - # In production, this would check routing rules + # Surface to the gateway. If no callback is wired, fall back to + # auto-answer so we don't regress the previous behavior. + if self._on_incoming_call and self._loop: + asyncio.run_coroutine_threadsafe( + self._on_incoming_call(from_uri, to_uri, leg_id), + self._loop, + ) + else: + controller = SippyCallController(leg, self) + controller.on_connected(str(req.getBody()) if req.getBody() else None) + + async def accept_inbound(self, leg_id: str) -> bool: + """Answer a previously-surfaced inbound INVITE.""" + leg = self._legs.get(leg_id) + if not leg or leg.direction != "inbound": + return False + req = getattr(leg, "pending_invite", None) controller = SippyCallController(leg, self) - controller.on_connected(str(req.getBody()) if req.getBody() else None) + body = str(req.getBody()) if req and req.getBody() else None + controller.on_connected(body) + return True + + async def reject_inbound(self, leg_id: str, code: int = 603, reason: str = "Decline") -> bool: + """Reject a previously-surfaced inbound INVITE with a SIP error.""" + leg = self._legs.pop(leg_id, None) + if not leg or leg.direction != "inbound": + return False + logger.info(f" β›” Rejecting inbound leg {leg_id}: {code} {reason}") + # Real SIP rejection would go through Sippy here; we just drop the leg + # in stub mode so callers see the call terminate. + return True def _handle_incoming_bye(self, req, sip_t): """Handle incoming BYE β€” tear down call leg.""" diff --git a/dashboard/src/lib/api.ts b/dashboard/src/lib/api.ts index bfd71fe..0cb2e41 100644 --- a/dashboard/src/lib/api.ts +++ b/dashboard/src/lib/api.ts @@ -1,4 +1,13 @@ -import type { CallSummary, DeviceStatus, GatewayEvent, GatewayStatus, HealthStatus } from './types'; +import type { + CallHistoryRow, + CallSummary, + DeviceStatus, + GatewayEvent, + GatewayStatus, + HealthStatus, + RoutingRule, + TranscriptRow, +} from './types'; async function get(path: string): Promise { const res = await fetch(path); @@ -27,6 +36,67 @@ export async function hangupCall(callId: string): Promise { if (!res.ok) throw new Error(`${res.status} ${res.statusText}`); } +export async function fetchCallHistory( + limit = 50, + offset = 0, +): Promise { + const params = new URLSearchParams({ limit: String(limit), offset: String(offset) }); + return get(`/api/calls/history?${params}`); +} + +export async function fetchCallRecord(callId: string): Promise { + return get(`/api/calls/${callId}/record`); +} + +export async function fetchTranscript(callId: string): Promise { + return get(`/api/calls/${callId}/transcript`); +} + +export function recordingUrl(callId: string): string { + return `/api/calls/${callId}/recording`; +} + +export async function fetchRoutingRules(): Promise { + return get('/api/routing/rules'); +} + +export async function createRoutingRule(rule: Partial): Promise { + const res = await fetch('/api/routing/rules', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(rule), + }); + if (!res.ok) throw new Error(`${res.status} ${res.statusText}`); + return res.json() as Promise; +} + +export async function updateRoutingRule( + ruleId: string, + patch: Partial, +): Promise { + const res = await fetch(`/api/routing/rules/${ruleId}`, { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(patch), + }); + if (!res.ok) throw new Error(`${res.status} ${res.statusText}`); + return res.json() as Promise; +} + +export async function deleteRoutingRule(ruleId: string): Promise { + const res = await fetch(`/api/routing/rules/${ruleId}`, { method: 'DELETE' }); + if (!res.ok) throw new Error(`${res.status} ${res.statusText}`); +} + +export async function setDeviceDnd(deviceId: string, enabled: boolean): Promise { + const res = await fetch(`/api/routing/devices/${deviceId}/dnd`, { + method: 'PATCH', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ enabled }), + }); + if (!res.ok) throw new Error(`${res.status} ${res.statusText}`); +} + export function connectEventStream( onEvent: (e: GatewayEvent) => void, onClose: () => void, diff --git a/dashboard/src/lib/types.ts b/dashboard/src/lib/types.ts index f51ff1b..4a51835 100644 --- a/dashboard/src/lib/types.ts +++ b/dashboard/src/lib/types.ts @@ -72,3 +72,65 @@ export interface GatewayEvent { data: Record; message: string; } + +export interface CallHistoryRow { + id: string; + direction: string; + remote_number: string; + status: string; + mode: string; + intent?: string; + started_at?: string; + ended_at?: string; + duration: number; + hold_time: number; + device_used?: string; + summary?: string; +} + +export interface TranscriptRow { + seq: number; + t_offset_ms: number; + speaker: string; + text: string; + confidence?: number; +} + +export type RoutingActionType = + | 'ring_device' + | 'ring_chain' + | 'take_message' + | 'reject' + | 'dnd'; + +export interface TimeRange { + start: string; + end: string; + tz: string; + days: number[]; +} + +export interface RoutingMatch { + caller_pattern?: string | null; + dnis?: string | null; + time_range?: TimeRange | null; +} + +export interface RoutingAction { + type: RoutingActionType; + device_id?: string | null; + device_ids: string[]; + ring_timeout: number; + message?: string | null; +} + +export interface RoutingRule { + id: string; + name: string; + priority: number; + enabled: boolean; + match: RoutingMatch; + action: RoutingAction; + created_at?: string; + updated_at?: string; +} diff --git a/dashboard/src/routes/+layout.svelte b/dashboard/src/routes/+layout.svelte index c1d4040..3760e07 100644 --- a/dashboard/src/routes/+layout.svelte +++ b/dashboard/src/routes/+layout.svelte @@ -42,7 +42,11 @@ return () => mq.removeEventListener('change', onSystemChange); }); - const nav = [{ href: '/', label: 'Dashboard' }]; + const nav = [ + { href: '/', label: 'Dashboard' }, + { href: '/history', label: 'History' }, + { href: '/routing', label: 'Routing' }, + ];
diff --git a/dashboard/src/routes/calls/[call_id]/+page.svelte b/dashboard/src/routes/calls/[call_id]/+page.svelte new file mode 100644 index 0000000..84267e3 --- /dev/null +++ b/dashboard/src/routes/calls/[call_id]/+page.svelte @@ -0,0 +1,80 @@ + + +← Back to history + +{#if loading} +
Loading…
+{:else if error} +
Failed: {error}
+{:else if record} +
+

{record.remote_number}

+
+ {record.direction} Β· {record.mode} Β· {record.status} Β· + {record.duration}s + {#if record.intent} Β· {record.intent}{/if} +
+
+ +
+ +
+ +
+

Transcript

+ {#if transcript.length === 0} +
No transcript stored.
+ {:else} +
    + {#each transcript as chunk} +
  • + +
  • + {/each} +
+ {/if} +
+{/if} diff --git a/dashboard/src/routes/history/+page.svelte b/dashboard/src/routes/history/+page.svelte new file mode 100644 index 0000000..3bb82ce --- /dev/null +++ b/dashboard/src/routes/history/+page.svelte @@ -0,0 +1,76 @@ + + +

Call History

+ +{#if loading} +
Loading…
+{:else if error} +
Failed to load history: {error}
+{:else if rows.length === 0} +
No calls yet.
+{:else} +
+ + + + + + + + + + + + + + + {#each rows as row} + + + + + + + + + + + {/each} + +
WhenNumberDirModeStatusDurationHoldIntent
+ + {fmtDate(row.started_at)} + + {row.remote_number}{row.direction}{row.mode}{row.status}{fmtDuration(row.duration)}{fmtDuration(row.hold_time)}{row.intent ?? ''}
+
+{/if} diff --git a/dashboard/src/routes/routing/+page.svelte b/dashboard/src/routes/routing/+page.svelte new file mode 100644 index 0000000..e58fd37 --- /dev/null +++ b/dashboard/src/routes/routing/+page.svelte @@ -0,0 +1,219 @@ + + +

Routing Rules

+ +{#if error} +
{error}
+{/if} + +
+

Devices Β· DND

+ {#if devices.length === 0} +
No devices registered.
+ {:else} +
    + {#each devices as d} +
  • + {d.name} + ({d.type}) + +
  • + {/each} +
+ {/if} +
+ +
+

Add Rule

+
+ + + + + + {#if draft.action_type === 'ring_device'} + + {:else} + + {/if} + +
+ +
+ +
+

Rules

+ {#if loading} +
Loading…
+ {:else if rules.length === 0} +
No rules yet.
+ {:else} + + + + + + + + + + + + + + {#each rules as rule} + + + + + + + + + + {/each} + +
PriNameCallerDNISActionEnabled
{rule.priority}{rule.name}{rule.match.caller_pattern ?? ''}{rule.match.dnis ?? ''}{rule.action.type} + + + +
+ {/if} +
diff --git a/db/database.py b/db/database.py index f829049..e4d7377 100644 --- a/db/database.py +++ b/db/database.py @@ -8,6 +8,7 @@ from datetime import datetime from sqlalchemy import ( JSON, + Boolean, Column, DateTime, Float, @@ -111,6 +112,7 @@ class Device(Base): priority = Column(Integer, default=10) # Routing priority (lower = higher priority) is_online = Column(String, default="false") capabilities = Column(JSON, default=list) # ["voice", "video", "sms"] + dnd = Column(Boolean, default=False, nullable=False) last_seen = Column(DateTime, nullable=True) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) @@ -119,6 +121,55 @@ class Device(Base): return f"" +class RoutingRuleRecord(Base): + __tablename__ = "routing_rules" + + id = Column(String, primary_key=True) + name = Column(String, nullable=False) + priority = Column(Integer, default=100, nullable=False) # lower runs first + enabled = Column(Boolean, default=True, nullable=False) + match = Column(JSON, nullable=False) # caller_pattern, dnis, time_range, days + action = Column(JSON, nullable=False) # {type, ...} + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) + + def __repr__(self) -> str: + return f"" + + +class TranscriptChunk(Base): + __tablename__ = "transcript_chunks" + + id = Column(String, primary_key=True) + call_id = Column(String, index=True, nullable=False) + seq = Column(Integer, nullable=False) + t_offset_ms = Column(Integer, default=0) # offset from call start + speaker = Column(String, default="unknown") # caller / agent / receptionist / unknown + text = Column(Text, nullable=False) + confidence = Column(Float, nullable=True) + created_at = Column(DateTime, default=func.now()) + + def __repr__(self) -> str: + return f"" + + +class RecordingRecord(Base): + __tablename__ = "recordings" + + id = Column(String, primary_key=True) + call_id = Column(String, index=True, nullable=False) + path = Column(String, nullable=False) + format = Column(String, default="wav") + duration_s = Column(Float, default=0.0) + size_bytes = Column(Integer, default=0) + channels = Column(Integer, default=1) + started_at = Column(DateTime, default=func.now()) + ended_at = Column(DateTime, nullable=True) + + def __repr__(self) -> str: + return f"" + + # ============================================================ # Engine & Session # ============================================================ diff --git a/main.py b/main.py index 2b9d089..53fd3ef 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.staticfiles import StaticFiles -from api import calls, call_flows, devices, websocket +from api import call_flows, call_history, calls, devices, routing, websocket from config import get_settings from core.gateway import AIPSTNGateway from db.database import close_db, init_db @@ -114,6 +114,7 @@ async def lifespan(app: FastAPI): recording_svc = RecordingService() await recording_svc.start() app.state.recording_service = recording_svc + gateway._recording_service = recording_svc analytics_svc = CallAnalytics() app.state.analytics_service = analytics_svc @@ -171,10 +172,22 @@ app = FastAPI( # === API Routes === app.include_router(calls.router, prefix="/api/calls", tags=["Calls"]) +app.include_router(call_history.router, prefix="/api/calls", tags=["Call History"]) app.include_router(call_flows.router, prefix="/api/call-flows", tags=["Call Flows"]) app.include_router(devices.router, prefix="/api/devices", tags=["Devices"]) +app.include_router(routing.router, prefix="/api/routing", tags=["Routing"]) app.include_router(websocket.router, prefix="/ws", tags=["WebSocket"]) +# === Dashboard (built SvelteKit static) === +import os as _os +_dashboard_build = _os.path.join(_os.path.dirname(__file__), "dashboard", "build") +if _os.path.isdir(_dashboard_build): + app.mount( + "/dashboard", + StaticFiles(directory=_dashboard_build, html=True), + name="dashboard", + ) + # === Root Endpoint === @app.get("/", tags=["System"]) diff --git a/models/call.py b/models/call.py index 3c03688..785e795 100644 --- a/models/call.py +++ b/models/call.py @@ -31,6 +31,7 @@ class CallMode(str, Enum): DIRECT = "direct" # Call and connect immediately HOLD_SLAYER = "hold_slayer" # Navigate IVR, wait on hold, transfer when human AI_ASSISTED = "ai_assisted" # Connect with transcription, recording, noise cancel + RECEPTIONIST = "receptionist" # AI screens inbound caller, then routes or takes a message class AudioClassification(str, Enum): diff --git a/models/device.py b/models/device.py index 3de5a19..641b538 100644 --- a/models/device.py +++ b/models/device.py @@ -38,6 +38,7 @@ class Device(DeviceBase): id: str is_online: bool = False + dnd: bool = False last_seen: Optional[datetime] = None created_at: Optional[datetime] = None updated_at: Optional[datetime] = None @@ -45,6 +46,8 @@ class Device(DeviceBase): @property def can_receive_call(self) -> bool: """Can this device receive a call right now?""" + if self.dnd: + return False if self.type in (DeviceType.SIP_PHONE, DeviceType.SOFTPHONE, DeviceType.WEBRTC): return self.is_online and self.sip_uri is not None if self.type == DeviceType.CELL: diff --git a/models/events.py b/models/events.py index d825539..3df66e8 100644 --- a/models/events.py +++ b/models/events.py @@ -32,6 +32,19 @@ class EventType(str, Enum): # Audio AUDIO_CLASSIFIED = "audio.classified" TRANSCRIPT_CHUNK = "audio.transcript_chunk" + SPEAK_PLAYED = "audio.speak_played" + + # AI Receptionist + RECEPTIONIST_GREETING = "receptionist.greeting" + RECEPTIONIST_LISTENING = "receptionist.listening" + RECEPTIONIST_CAPTURED_INTENT = "receptionist.captured_intent" + RECEPTIONIST_ROUTING = "receptionist.routing" + RECEPTIONIST_MESSAGE_SAVED = "receptionist.message_saved" + RECEPTIONIST_REJECTED = "receptionist.rejected" + + # Routing + ROUTING_RULE_MATCHED = "routing.rule_matched" + ROUTING_DEVICE_DND = "routing.device_dnd" # Device DEVICE_REGISTERED = "device.registered" diff --git a/models/routing.py b/models/routing.py new file mode 100644 index 0000000..39f20f4 --- /dev/null +++ b/models/routing.py @@ -0,0 +1,84 @@ +""" +Routing rule models β€” Smart routing for inbound calls. + +Rules are evaluated in priority order (lower first). The first enabled +rule whose match clause is satisfied wins, and its action is returned +to the receptionist / gateway as a routing decision. +""" + +from datetime import datetime +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class RoutingActionType(str, Enum): + RING_DEVICE = "ring_device" # Ring a single device + RING_CHAIN = "ring_chain" # Ring a chain of devices (priority order) + TAKE_MESSAGE = "take_message" # Skip the ring, go straight to voicemail + REJECT = "reject" # Decline the call (busy / 603) + DND = "dnd" # Do Not Disturb β€” also reject, but tagged for analytics + + +class TimeRange(BaseModel): + """A recurring time-of-day window.""" + + start: str # "HH:MM" + end: str # "HH:MM" (may wrap past midnight if end < start) + tz: str = "UTC" + days: list[int] = Field(default_factory=lambda: [0, 1, 2, 3, 4, 5, 6]) # 0=Mon..6=Sun + + +class RoutingMatch(BaseModel): + """Conditions a call must satisfy to match a rule.""" + + caller_pattern: Optional[str] = None # glob, e.g. "+1800*" + dnis: Optional[str] = None # DID dialed (exact match) + time_range: Optional[TimeRange] = None # only fires inside this window + + +class RoutingAction(BaseModel): + """What to do when a rule matches.""" + + type: RoutingActionType + device_id: Optional[str] = None # for RING_DEVICE + device_ids: list[str] = Field(default_factory=list) # for RING_CHAIN + ring_timeout: int = 25 # seconds per device + message: Optional[str] = None # optional TTS string for REJECT/DND + + +class RoutingRule(BaseModel): + id: str + name: str + priority: int = 100 + enabled: bool = True + match: RoutingMatch = Field(default_factory=RoutingMatch) + action: RoutingAction + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +class RoutingRuleCreate(BaseModel): + name: str + priority: int = 100 + enabled: bool = True + match: RoutingMatch = Field(default_factory=RoutingMatch) + action: RoutingAction + + +class RoutingRuleUpdate(BaseModel): + name: Optional[str] = None + priority: Optional[int] = None + enabled: Optional[bool] = None + match: Optional[RoutingMatch] = None + action: Optional[RoutingAction] = None + + +class RoutingDecision(BaseModel): + """Result of evaluating the routing rules for an inbound call.""" + + matched_rule_id: Optional[str] = None + matched_rule_name: Optional[str] = None + action: RoutingAction + reason: str = "" diff --git a/services/call_persistence.py b/services/call_persistence.py new file mode 100644 index 0000000..393954a --- /dev/null +++ b/services/call_persistence.py @@ -0,0 +1,70 @@ +""" +Call Persistence β€” Writes completed calls and their transcript chunks +to the database when CallManager.end_call() fires. +""" + +import logging +import uuid +from datetime import datetime + +from db.database import CallRecord, TranscriptChunk, get_session_factory +from models.call import ActiveCall, CallStatus + +logger = logging.getLogger(__name__) + + +async def persist_call_on_end(call: ActiveCall, final_status: CallStatus) -> None: + """Insert a CallRecord and any transcript chunks for `call`. + + Wired into CallManager via _on_call_ended in gateway.start(). + """ + try: + async with get_session_factory()() as session: + record = CallRecord( + id=call.id, + direction=call.direction, + remote_number=call.remote_number, + status=final_status.value, + mode=call.mode.value, + intent=call.intent, + started_at=call.started_at, + ended_at=datetime.now(), + duration=int(call.duration), + hold_time=int(call.hold_time), + device_used=call.device, + call_flow_id=call.call_flow_id, + classification_timeline=[ + { + "timestamp": c.timestamp, + "audio_type": c.audio_type.value, + "confidence": c.confidence, + } + for c in call.classification_history + ], + metadata_={"services": list(call.services)}, + ) + session.add(record) + + # Each transcript chunk gets its own row with a sequence number + # so the dashboard can render them in order with click-to-seek. + for seq, text in enumerate(call.transcript_chunks): + speaker = "unknown" + payload = text + if ":" in text: + head, rest = text.split(":", 1) + head = head.strip().lower() + if head in {"caller", "agent", "receptionist", "caller_message"}: + speaker = head if head != "caller_message" else "caller" + payload = rest.strip() + session.add(TranscriptChunk( + id=f"tc_{uuid.uuid4().hex[:10]}", + call_id=call.id, + seq=seq, + t_offset_ms=0, + speaker=speaker, + text=payload, + )) + + await session.commit() + except Exception as e: + logger.warning(f"Could not persist call {call.id}: {e}") diff --git a/services/hold_slayer.py b/services/hold_slayer.py index e6fa2cc..1eee29b 100644 --- a/services/hold_slayer.py +++ b/services/hold_slayer.py @@ -24,6 +24,7 @@ from models.call_flow import ActionType, CallFlow, CallFlowStep from models.events import EventType, GatewayEvent from services.audio_classifier import AudioClassifier from services.transcription import TranscriptionService +from services.tts import TTSService logger = logging.getLogger(__name__) @@ -68,6 +69,7 @@ class HoldSlayerService: classifier: AudioClassifier, transcription: TranscriptionService, settings: Settings, + tts: Optional[TTSService] = None, ): self.gateway = gateway self.call_manager = call_manager @@ -75,6 +77,7 @@ class HoldSlayerService: self.classifier = classifier self.transcription = transcription self.settings = settings + self.tts = tts async def run( self, @@ -257,10 +260,7 @@ class HoldSlayerService: current_step_id = step.next_step elif step.action == ActionType.SPEAK: - # Say something into the call (TTS) - # TODO: Implement TTS integration - logger.info(f"πŸ—£οΈ Would say: '{step.action_value}' (TTS not yet implemented)") - await asyncio.sleep(3.0) + await self._speak(call, sip_leg_id, step.action_value or "") current_step_id = step.next_step elif step.action == ActionType.TRANSFER: @@ -715,3 +715,53 @@ class HoldSlayerService: logger.error(f"Failed to load call flow '{flow_id}': {e}") return None + + async def _speak(self, call: ActiveCall, sip_leg_id: str, text: str) -> bool: + """ + Synthesize `text` via TTS and play it into the call leg. + + Falls back to a brief sleep if TTS is unavailable so a SPEAK step + doesn't block the flow indefinitely. + """ + if not text.strip(): + return False + + if not self.tts or not getattr(self.gateway, "media_pipeline", None): + logger.warning(f"πŸ—£οΈ TTS unavailable, skipping SPEAK: '{text[:60]}'") + await asyncio.sleep(2.0) + return False + + import os + import tempfile + + fd, tmp_path = tempfile.mkstemp(suffix=".wav", prefix=f"speak_{call.id}_") + os.close(fd) + + try: + ok = await self.tts.synthesize_to_file(text, tmp_path) + if not ok: + logger.warning(f"πŸ—£οΈ TTS synthesis returned no audio for: '{text[:60]}'") + return False + + logger.info(f"πŸ—£οΈ Speaking: '{text[:80]}'") + await self.gateway.media_pipeline.play_wav(sip_leg_id, tmp_path) + + # Publish event so the dashboard/transcript shows what we said. + try: + await self.gateway.event_bus.publish( + GatewayEvent( + type=EventType.SPEAK_PLAYED, + call_id=call.id, + data={"text": text}, + message=f"Played TTS: {text[:80]}", + ) + ) + except Exception: + pass + + return True + finally: + try: + os.unlink(tmp_path) + except OSError: + pass diff --git a/services/receptionist.py b/services/receptionist.py new file mode 100644 index 0000000..a27e827 --- /dev/null +++ b/services/receptionist.py @@ -0,0 +1,345 @@ +""" +AI Receptionist β€” Screens inbound calls, then routes or takes a message. + +State machine: + GREET β†’ TTS greeting plays into the call leg + LISTEN β†’ buffer audio from the leg's tap until end-of-utterance + CLASSIFY β†’ LLM extracts intent, urgency, recommended action + DECIDE β†’ combine LLM recommendation with the routing decision + (rules win on conflict) + RING β†’ ring_chain devices; bridge on pickup + RECORD β†’ TTS prompt + WAV record up to message_max_seconds; transcribe + and notify +""" + +import asyncio +import logging +import time as _time +import uuid +from datetime import datetime +from pathlib import Path +from typing import Optional + +from models.call import ActiveCall, CallStatus +from models.events import EventType, GatewayEvent +from models.routing import RoutingAction, RoutingActionType, RoutingDecision + +logger = logging.getLogger(__name__) + + +class ReceptionistService: + """Drives the receptionist state machine for a single inbound call.""" + + def __init__(self, gateway): + self.gateway = gateway + self.settings = gateway.settings.receptionist + + async def handle( + self, + call: ActiveCall, + sip_leg_id: str, + routing_decision: Optional[RoutingDecision] = None, + ) -> None: + """Run the full receptionist flow for an inbound call.""" + try: + await self._greet(call, sip_leg_id) + + transcript = await self._listen(call, sip_leg_id) + if transcript: + call.transcript_chunks.append(f"caller: {transcript}") + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.TRANSCRIPT_CHUNK, + call_id=call.id, + data={"text": transcript, "speaker": "caller"}, + message=f"πŸ“ caller: {transcript[:80]}", + )) + + classification = await self._classify(call, transcript, routing_decision) + call.intent = classification.get("intent") + + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.RECEPTIONIST_CAPTURED_INTENT, + call_id=call.id, + data=classification, + message=f"Intent: {classification.get('intent', '?')}", + )) + + action = self._decide(routing_decision, classification) + + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.RECEPTIONIST_ROUTING, + call_id=call.id, + data={"action": action.type.value}, + message=f"Routing decision: {action.type.value}", + )) + + if action.type in (RoutingActionType.REJECT, RoutingActionType.DND): + if action.message: + await self._speak(call, sip_leg_id, action.message) + await self._hangup(call, sip_leg_id) + return + + if action.type in (RoutingActionType.RING_DEVICE, RoutingActionType.RING_CHAIN): + devices = self._resolve_device_list(action, classification) + if not devices: + logger.info("Receptionist: no devices to ring, falling back to message") + await self._take_message(call, sip_leg_id) + return + + await self._speak( + call, sip_leg_id, "One moment, I'll connect you now." + ) + answered = await self.gateway._routing.ring_chain( + call.id, devices, action.ring_timeout + ) + if answered: + return # Bridged to a device β€” receptionist done + # Nobody home β€” take a message + await self._take_message(call, sip_leg_id) + return + + # Default: take a message + await self._take_message(call, sip_leg_id) + + except Exception as e: + logger.error(f"Receptionist failed for {call.id}: {e}", exc_info=True) + try: + await self._hangup(call, sip_leg_id) + except Exception: + pass + + # ---------------------------------------------------------------- + # State machine steps + # ---------------------------------------------------------------- + + async def _greet(self, call: ActiveCall, sip_leg_id: str) -> None: + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.RECEPTIONIST_GREETING, + call_id=call.id, + data={"text": self.settings.greeting_template}, + message="Playing greeting", + )) + await self._speak(call, sip_leg_id, self.settings.greeting_template) + + async def _listen(self, call: ActiveCall, sip_leg_id: str) -> str: + """Buffer audio from the call's tap until silence or timeout.""" + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.RECEPTIONIST_LISTENING, + call_id=call.id, + message="Listening for caller", + )) + + media = self.gateway.media_pipeline + if media is None: + return "" + + tap = media.create_tap(sip_leg_id) + audio = bytearray() + deadline = _time.monotonic() + self.settings.listen_timeout_s + silent_for = 0.0 + frame_ms = 20 + + try: + while _time.monotonic() < deadline: + remaining = max(0.05, deadline - _time.monotonic()) + frame = await tap.read_frame(timeout=min(0.5, remaining)) + if frame is None: + silent_for += 0.5 + else: + audio.extend(frame) + if self._frame_is_silent(frame): + silent_for += frame_ms / 1000.0 + else: + silent_for = 0.0 + if silent_for >= self.settings.end_of_utterance_silence_s and audio: + break + finally: + tap.close() + + if not audio: + return "" + + return await self.gateway._transcription.transcribe(bytes(audio)) + + async def _classify( + self, + call: ActiveCall, + transcript: str, + routing_decision: Optional[RoutingDecision], + ) -> dict: + """Ask the LLM to interpret the caller's utterance.""" + from services.hold_slayer import _get_llm + + llm = _get_llm() + if llm is None or not transcript.strip(): + return { + "intent": transcript or "unknown", + "urgency": "normal", + "recommended_action": "ring", + "device_hint": None, + } + + rules_summary = "" + if routing_decision and routing_decision.matched_rule_name: + rules_summary = ( + f"A routing rule already matched: '{routing_decision.matched_rule_name}' " + f"(action: {routing_decision.action.type.value})." + ) + + try: + return await llm.chat_json( + user_message=( + f"Caller: {call.remote_number}\n" + f"Transcript: {transcript}\n" + f"{rules_summary}\n\n" + "Return JSON with keys: intent (short string), " + "urgency (low|normal|high), " + "recommended_action (ring|message|reject), " + "device_hint (string or null)." + ), + system=self.settings.llm_persona, + ) + except Exception as e: + logger.warning(f"Receptionist LLM classify failed: {e}") + return { + "intent": transcript, + "urgency": "normal", + "recommended_action": "ring", + "device_hint": None, + } + + def _decide( + self, + routing_decision: Optional[RoutingDecision], + classification: dict, + ) -> RoutingAction: + """Rules win on conflict; otherwise use the LLM's recommendation.""" + if routing_decision and routing_decision.action.type not in ( + RoutingActionType.TAKE_MESSAGE, + ): + return routing_decision.action + + recommended = (classification.get("recommended_action") or "ring").lower() + if recommended == "reject": + return RoutingAction(type=RoutingActionType.REJECT, + message="Sorry, I can't connect that call right now.") + if recommended == "message": + return RoutingAction(type=RoutingActionType.TAKE_MESSAGE) + return RoutingAction(type=RoutingActionType.RING_CHAIN) + + def _resolve_device_list( + self, action: RoutingAction, classification: dict + ) -> list[str]: + if action.type == RoutingActionType.RING_DEVICE and action.device_id: + return [action.device_id] + if action.device_ids: + return action.device_ids + # Default chain: every device that can take a call, in priority order + devices = sorted( + (d for d in self.gateway.devices.values() if d.can_receive_call), + key=lambda d: d.priority, + ) + return [d.id for d in devices] + + async def _take_message(self, call: ActiveCall, sip_leg_id: str) -> None: + await self._speak(call, sip_leg_id, self.settings.message_prompt) + + media = self.gateway.media_pipeline + recording_svc = getattr(self.gateway, "_recording_service", None) + if recording_svc is None or media is None: + logger.warning("Receptionist: recording unavailable, ending call") + await self._hangup(call, sip_leg_id) + return + + # RecordingService writes a WAV file and the recordings row. + session = await recording_svc.start_recording( + call.id, media_pipeline=media, leg_ids=[sip_leg_id] + ) + try: + await asyncio.sleep(self.settings.message_max_seconds) + finally: + session = await recording_svc.stop_recording( + call.id, media_pipeline=media + ) + + message_text = "" + rec_path = session.filepath_mixed if session else None + if rec_path and Path(rec_path).exists(): + try: + audio_bytes = Path(rec_path).read_bytes() + message_text = await self.gateway._transcription.transcribe(audio_bytes) + except Exception as e: + logger.warning(f"Receptionist transcribe failed: {e}") + + if message_text: + call.transcript_chunks.append(f"caller_message: {message_text}") + + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.RECEPTIONIST_MESSAGE_SAVED, + call_id=call.id, + data={ + "path": rec_path, + "transcript": message_text, + "caller": call.remote_number, + }, + message=f"πŸ“₯ Message saved from {call.remote_number}", + )) + + await self._hangup(call, sip_leg_id) + + # ---------------------------------------------------------------- + # Helpers + # ---------------------------------------------------------------- + + async def _speak(self, call: ActiveCall, sip_leg_id: str, text: str) -> None: + tts = self.gateway._tts + media = self.gateway.media_pipeline + if tts is None or media is None or not text.strip(): + return + + import os + import tempfile + + fd, tmp_path = tempfile.mkstemp(suffix=".wav", prefix=f"recept_{call.id}_") + os.close(fd) + try: + ok = await tts.synthesize_to_file(text, tmp_path) + if not ok: + return + await media.play_wav(sip_leg_id, tmp_path) + await self.gateway.event_bus.publish(GatewayEvent( + type=EventType.SPEAK_PLAYED, + call_id=call.id, + data={"text": text, "speaker": "receptionist"}, + message=f"πŸ—£οΈ {text[:80]}", + )) + finally: + try: + os.unlink(tmp_path) + except OSError: + pass + + async def _hangup(self, call: ActiveCall, sip_leg_id: str) -> None: + try: + await self.gateway.sip_engine.hangup(sip_leg_id) + except Exception as e: + logger.warning(f"Receptionist hangup failed: {e}") + await self.gateway.call_manager.end_call(call.id, CallStatus.COMPLETED) + + @staticmethod + def _frame_is_silent(frame: bytes, threshold: int = 500) -> bool: + """Crude RMS-style check on a 16-bit PCM frame (mono, signed LE).""" + if not frame or len(frame) < 2: + return True + # Inline RMS β€” `audioop` was removed in Python 3.13. + import struct + + n = len(frame) // 2 + if n == 0: + return True + samples = struct.unpack_from(f"<{n}h", frame) + sq = 0 + for s in samples: + sq += s * s + rms = (sq / n) ** 0.5 + return rms < threshold diff --git a/services/recording.py b/services/recording.py index 84479be..e5d949b 100644 --- a/services/recording.py +++ b/services/recording.py @@ -138,6 +138,9 @@ class RecordingService: # Store metadata self._metadata.append(session.to_dict()) + # Persist a recording row so the dashboard can find it later + await self._persist_recording(session) + logger.info( f"⏹ Recording stopped: {call_id} " f"({session.duration_seconds}s, " @@ -145,6 +148,29 @@ class RecordingService: ) return session + @staticmethod + async def _persist_recording(session: "RecordingSession") -> None: + """Write a recordings row for this session. Failures are non-fatal.""" + try: + import uuid as _uuid + from db.database import RecordingRecord, get_session_factory + + async with get_session_factory()() as db: + db.add(RecordingRecord( + id=f"rec_{_uuid.uuid4().hex[:10]}", + call_id=session.call_id, + path=session.filepath_mixed or "", + format="wav", + duration_s=float(session.duration_seconds or 0), + size_bytes=int(session.file_size_bytes or 0), + channels=1, + started_at=session.started_at, + ended_at=session.stopped_at, + )) + await db.commit() + except Exception as e: + logger.warning(f"Recording persistence failed: {e}") + async def _recording_timeout(self, call_id: str) -> None: """Auto-stop recording after max duration.""" await asyncio.sleep(self._max_recording_seconds) diff --git a/services/routing.py b/services/routing.py new file mode 100644 index 0000000..bcc41c5 --- /dev/null +++ b/services/routing.py @@ -0,0 +1,258 @@ +""" +Routing Service β€” Smart routing for inbound calls. + +Evaluates `RoutingRule` records against an incoming call's caller ID, +DNIS, and the current time, returning a `RoutingDecision`. Also exposes +a ring-chain helper that tries devices in priority order until one +answers (or all timeouts elapse). +""" + +import asyncio +import fnmatch +import logging +import uuid +from datetime import datetime, time +from typing import Optional +from zoneinfo import ZoneInfo + +from sqlalchemy import select + +from db.database import RoutingRuleRecord, get_session_factory +from models.routing import ( + RoutingAction, + RoutingActionType, + RoutingDecision, + RoutingMatch, + RoutingRule, + RoutingRuleCreate, + RoutingRuleUpdate, + TimeRange, +) + +logger = logging.getLogger(__name__) + + +def _parse_hhmm(s: str) -> time: + h, m = s.split(":") + return time(hour=int(h), minute=int(m)) + + +def _time_in_range(now: datetime, tr: TimeRange) -> bool: + """True if `now` (interpreted in tr.tz) falls inside the window.""" + try: + tz = ZoneInfo(tr.tz) + except Exception: + tz = ZoneInfo("UTC") + + local = now.astimezone(tz) + if local.weekday() not in tr.days: + return False + + start = _parse_hhmm(tr.start) + end = _parse_hhmm(tr.end) + cur = local.time() + + if start <= end: + return start <= cur < end + # Wraps past midnight (e.g. 22:00 β†’ 06:00) + return cur >= start or cur < end + + +def _caller_matches(pattern: Optional[str], caller_number: str) -> bool: + if not pattern: + return True + return fnmatch.fnmatch(caller_number or "", pattern) + + +def _dnis_matches(rule_dnis: Optional[str], dnis: str) -> bool: + if not rule_dnis: + return True + return rule_dnis == dnis + + +class RoutingService: + """Caches enabled rules and evaluates them per inbound call.""" + + DEFAULT_ACTION = RoutingAction(type=RoutingActionType.TAKE_MESSAGE) + + def __init__(self, gateway): + self.gateway = gateway + self._rules: list[RoutingRule] = [] + self._lock = asyncio.Lock() + + async def start(self) -> None: + await self.reload() + + async def reload(self) -> None: + async with self._lock: + self._rules = await self._load_rules_from_db() + logger.info(f"πŸ“‹ Loaded {len(self._rules)} routing rules") + + @property + def rules(self) -> list[RoutingRule]: + return list(self._rules) + + async def evaluate( + self, + caller_number: str, + dnis: str, + now: Optional[datetime] = None, + ) -> RoutingDecision: + """Walk rules in priority order, return first match or default.""" + now = now or datetime.now().astimezone() + for rule in sorted(self._rules, key=lambda r: (r.priority, r.id)): + if not rule.enabled: + continue + m = rule.match + if not _caller_matches(m.caller_pattern, caller_number): + continue + if not _dnis_matches(m.dnis, dnis): + continue + if m.time_range and not _time_in_range(now, m.time_range): + continue + return RoutingDecision( + matched_rule_id=rule.id, + matched_rule_name=rule.name, + action=rule.action, + reason=f"matched rule '{rule.name}'", + ) + return RoutingDecision( + action=self.DEFAULT_ACTION, + reason="no rule matched β€” default take_message", + ) + + # ---------------------------------------------------------------- + # Ring chain + # ---------------------------------------------------------------- + + async def ring_chain( + self, + call_id: str, + device_ids: list[str], + ring_timeout: int = 25, + ) -> Optional[str]: + """ + Try each device sequentially. Returns the device_id that answered, + or None if all timed out. + """ + for device_id in device_ids: + device = self.gateway.devices.get(device_id) + if not device: + continue + if getattr(device, "dnd", False): + logger.info(f"πŸ“ž Skipping {device_id} (DND)") + continue + try: + await self.gateway.transfer_call(call_id, device_id) + # If transfer_call returned normally, treat as picked up. + return device_id + except asyncio.TimeoutError: + logger.info(f"πŸ“ž Ring timeout for device {device_id}") + continue + except Exception as e: + logger.warning(f"πŸ“ž Ring failed for device {device_id}: {e}") + continue + return None + + # ---------------------------------------------------------------- + # CRUD + # ---------------------------------------------------------------- + + async def create_rule(self, payload: RoutingRuleCreate) -> RoutingRule: + rule_id = f"rule_{uuid.uuid4().hex[:8]}" + record = RoutingRuleRecord( + id=rule_id, + name=payload.name, + priority=payload.priority, + enabled=payload.enabled, + match=payload.match.model_dump(), + action=payload.action.model_dump(), + ) + async with get_session_factory()() as session: + session.add(record) + await session.commit() + + rule = RoutingRule( + id=rule_id, + name=payload.name, + priority=payload.priority, + enabled=payload.enabled, + match=payload.match, + action=payload.action, + ) + async with self._lock: + self._rules.append(rule) + return rule + + async def update_rule( + self, rule_id: str, payload: RoutingRuleUpdate + ) -> Optional[RoutingRule]: + async with get_session_factory()() as session: + result = await session.execute( + select(RoutingRuleRecord).where(RoutingRuleRecord.id == rule_id) + ) + record = result.scalar_one_or_none() + if not record: + return None + if payload.name is not None: + record.name = payload.name + if payload.priority is not None: + record.priority = payload.priority + if payload.enabled is not None: + record.enabled = payload.enabled + if payload.match is not None: + record.match = payload.match.model_dump() + if payload.action is not None: + record.action = payload.action.model_dump() + await session.commit() + + await self.reload() + return next((r for r in self._rules if r.id == rule_id), None) + + async def delete_rule(self, rule_id: str) -> bool: + async with get_session_factory()() as session: + result = await session.execute( + select(RoutingRuleRecord).where(RoutingRuleRecord.id == rule_id) + ) + record = result.scalar_one_or_none() + if not record: + return False + await session.delete(record) + await session.commit() + async with self._lock: + self._rules = [r for r in self._rules if r.id != rule_id] + return True + + # ---------------------------------------------------------------- + # Internals + # ---------------------------------------------------------------- + + async def _load_rules_from_db(self) -> list[RoutingRule]: + try: + async with get_session_factory()() as session: + result = await session.execute(select(RoutingRuleRecord)) + rows = result.scalars().all() + except Exception as e: + logger.warning(f"Routing rules load failed: {e}") + return [] + + rules: list[RoutingRule] = [] + for row in rows: + try: + match = RoutingMatch(**(row.match or {})) + action = RoutingAction(**(row.action or {})) + rules.append( + RoutingRule( + id=row.id, + name=row.name, + priority=row.priority, + enabled=row.enabled, + match=match, + action=action, + created_at=row.created_at, + updated_at=row.updated_at, + ) + ) + except Exception as e: + logger.warning(f"Skipping malformed rule {row.id}: {e}") + return rules diff --git a/services/tts.py b/services/tts.py new file mode 100644 index 0000000..efe6b2d --- /dev/null +++ b/services/tts.py @@ -0,0 +1,90 @@ +""" +TTS Service β€” Rhema (OpenAI-compatible) text-to-speech client. + +Synthesizes speech for the SPEAK call-flow step and the AI Receptionist. +Rhema exposes POST /v1/audio/speech (OpenAI-compatible) with Kokoro voices. +""" + +import logging +from pathlib import Path +from typing import Optional + +import httpx + +from config import TTSSettings + +logger = logging.getLogger(__name__) + + +class TTSService: + """Client for Rhema TTS service.""" + + def __init__(self, settings: TTSSettings): + self.settings = settings + self._client: Optional[httpx.AsyncClient] = None + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + headers = {} + if self.settings.api_key: + headers["Authorization"] = f"Bearer {self.settings.api_key}" + self._client = httpx.AsyncClient( + base_url=self.settings.base_url, + timeout=httpx.Timeout(self.settings.timeout, connect=5.0), + headers=headers, + ) + return self._client + + async def synthesize( + self, + text: str, + voice: Optional[str] = None, + response_format: str = "wav", + ) -> bytes: + """Synthesize speech and return audio bytes.""" + if not text or not text.strip(): + return b"" + + client = await self._get_client() + body = { + "model": self.settings.model, + "input": text, + "voice": voice or self.settings.voice, + "response_format": response_format, + "sample_rate": self.settings.sample_rate, + } + + try: + response = await client.post("/v1/audio/speech", json=body) + response.raise_for_status() + return response.content + except httpx.HTTPStatusError as e: + logger.error(f"Rhema TTS error: {e.response.status_code} {e.response.text}") + return b"" + except httpx.ConnectError: + logger.error(f"Cannot connect to Rhema at {self.settings.base_url}") + return b"" + except Exception as e: + logger.error(f"TTS synthesis failed: {e}") + return b"" + + async def synthesize_to_file( + self, + text: str, + filepath: str | Path, + voice: Optional[str] = None, + ) -> bool: + """Synthesize to a WAV file. Returns True on success.""" + audio = await self.synthesize(text, voice=voice, response_format="wav") + if not audio: + return False + path = Path(filepath) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_bytes(audio) + logger.debug(f"TTS wrote {len(audio)} bytes to {path}") + return True + + async def close(self) -> None: + if self._client and not self._client.is_closed: + await self._client.aclose() + self._client = None diff --git a/tests/test_receptionist.py b/tests/test_receptionist.py new file mode 100644 index 0000000..d0b32f0 --- /dev/null +++ b/tests/test_receptionist.py @@ -0,0 +1,74 @@ +"""Tests for the AI Receptionist decision logic (services/receptionist.py).""" + +from types import SimpleNamespace + +import pytest + +from config import ReceptionistSettings, Settings +from models.routing import ( + RoutingAction, + RoutingActionType, + RoutingDecision, +) +from services.receptionist import ReceptionistService + + +def _make_gateway(): + settings = Settings() + settings.receptionist = ReceptionistSettings() + return SimpleNamespace(settings=settings, devices={}) + + +class TestReceptionistDecide: + def test_rule_wins_over_llm_when_rule_is_actionable(self): + gw = _make_gateway() + svc = ReceptionistService(gw) + rule_action = RoutingAction(type=RoutingActionType.REJECT, message="nope") + decision = RoutingDecision(action=rule_action, reason="rule said so") + chosen = svc._decide(decision, {"recommended_action": "ring"}) + assert chosen.type == RoutingActionType.REJECT + + def test_falls_back_to_llm_when_rule_is_default_take_message(self): + gw = _make_gateway() + svc = ReceptionistService(gw) + decision = RoutingDecision( + action=RoutingAction(type=RoutingActionType.TAKE_MESSAGE), + reason="default", + ) + chosen = svc._decide(decision, {"recommended_action": "ring"}) + assert chosen.type == RoutingActionType.RING_CHAIN + + def test_llm_reject_recommendation_is_honored(self): + gw = _make_gateway() + svc = ReceptionistService(gw) + chosen = svc._decide(None, {"recommended_action": "reject"}) + assert chosen.type == RoutingActionType.REJECT + assert chosen.message # should carry a polite decline message + + +class TestReceptionistDeviceList: + def test_ring_device_returns_explicit_device(self): + gw = _make_gateway() + svc = ReceptionistService(gw) + action = RoutingAction(type=RoutingActionType.RING_DEVICE, device_id="dev_a") + assert svc._resolve_device_list(action, {}) == ["dev_a"] + + def test_ring_chain_uses_action_list_when_present(self): + gw = _make_gateway() + svc = ReceptionistService(gw) + action = RoutingAction( + type=RoutingActionType.RING_CHAIN, + device_ids=["dev_a", "dev_b"], + ) + assert svc._resolve_device_list(action, {}) == ["dev_a", "dev_b"] + + +class TestFrameSilenceHeuristic: + def test_zeroes_are_silent(self): + # 16-bit PCM zeros β†’ silent + assert ReceptionistService._frame_is_silent(b"\x00\x00" * 80) is True + + def test_loud_pattern_not_silent(self): + # Loud values β†’ not silent + loud = b"\xff\x7f" * 80 # max int16 every sample + assert ReceptionistService._frame_is_silent(loud) is False diff --git a/tests/test_routing.py b/tests/test_routing.py new file mode 100644 index 0000000..b668804 --- /dev/null +++ b/tests/test_routing.py @@ -0,0 +1,133 @@ +"""Tests for the routing evaluator (services/routing.py).""" + +from datetime import datetime +from zoneinfo import ZoneInfo + +import pytest + +from models.routing import ( + RoutingAction, + RoutingActionType, + RoutingMatch, + RoutingRule, + TimeRange, +) +from services.routing import RoutingService + + +def _rule( + name: str, + *, + priority: int = 100, + enabled: bool = True, + caller: str | None = None, + dnis: str | None = None, + time_range: TimeRange | None = None, + action_type: RoutingActionType = RoutingActionType.RING_CHAIN, + device_id: str | None = None, +) -> RoutingRule: + return RoutingRule( + id=f"rule_{name}", + name=name, + priority=priority, + enabled=enabled, + match=RoutingMatch(caller_pattern=caller, dnis=dnis, time_range=time_range), + action=RoutingAction(type=action_type, device_id=device_id), + ) + + +class TestRoutingEvaluator: + def _make_service(self, rules): + svc = RoutingService(gateway=None) + svc._rules = rules + return svc + + @pytest.mark.asyncio + async def test_no_rules_returns_default_take_message(self): + svc = self._make_service([]) + decision = await svc.evaluate("+15551112222", "+15553334444") + assert decision.action.type == RoutingActionType.TAKE_MESSAGE + assert decision.matched_rule_id is None + + @pytest.mark.asyncio + async def test_priority_lower_wins(self): + svc = self._make_service([ + _rule("low", priority=10, action_type=RoutingActionType.RING_DEVICE, + device_id="dev_a"), + _rule("high", priority=200, action_type=RoutingActionType.REJECT), + ]) + decision = await svc.evaluate("+15551112222", "+15553334444") + assert decision.matched_rule_name == "low" + assert decision.action.type == RoutingActionType.RING_DEVICE + + @pytest.mark.asyncio + async def test_disabled_rule_skipped(self): + svc = self._make_service([ + _rule("first", priority=10, enabled=False, + action_type=RoutingActionType.REJECT), + _rule("second", priority=20, + action_type=RoutingActionType.RING_CHAIN), + ]) + decision = await svc.evaluate("+1", "+2") + assert decision.matched_rule_name == "second" + + @pytest.mark.asyncio + async def test_caller_glob_pattern(self): + svc = self._make_service([ + _rule("tollfree", caller="+1800*", + action_type=RoutingActionType.REJECT), + ]) + toll = await svc.evaluate("+18001234567", "+15551112222") + assert toll.action.type == RoutingActionType.REJECT + normal = await svc.evaluate("+14155551212", "+15551112222") + assert normal.action.type == RoutingActionType.TAKE_MESSAGE + + @pytest.mark.asyncio + async def test_dnis_must_match_exactly(self): + svc = self._make_service([ + _rule("by_did", dnis="+15553334444", + action_type=RoutingActionType.RING_CHAIN), + ]) + match = await svc.evaluate("+1", "+15553334444") + assert match.matched_rule_name == "by_did" + miss = await svc.evaluate("+1", "+19998887777") + assert miss.matched_rule_name is None + + @pytest.mark.asyncio + async def test_time_range_in_window(self): + # Mon 10:30 UTC is inside Mon-Fri 09:00-17:00 UTC + tr = TimeRange(start="09:00", end="17:00", tz="UTC", days=[0, 1, 2, 3, 4]) + svc = self._make_service([ + _rule("business_hours", time_range=tr, + action_type=RoutingActionType.RING_CHAIN), + ]) + monday_10_30 = datetime(2026, 1, 5, 10, 30, tzinfo=ZoneInfo("UTC")) + decision = await svc.evaluate("+1", "+2", now=monday_10_30) + assert decision.matched_rule_name == "business_hours" + + @pytest.mark.asyncio + async def test_time_range_outside_window(self): + tr = TimeRange(start="09:00", end="17:00", tz="UTC", days=[0, 1, 2, 3, 4]) + svc = self._make_service([ + _rule("business_hours", time_range=tr, + action_type=RoutingActionType.RING_CHAIN), + ]) + saturday_noon = datetime(2026, 1, 10, 12, 0, tzinfo=ZoneInfo("UTC")) + decision = await svc.evaluate("+1", "+2", now=saturday_noon) + # Weekend β†’ no match β†’ default take_message + assert decision.matched_rule_name is None + + @pytest.mark.asyncio + async def test_time_range_wraps_midnight(self): + # Overnight 22:00 β†’ 06:00, every day + tr = TimeRange(start="22:00", end="06:00", tz="UTC", days=list(range(7))) + svc = self._make_service([ + _rule("overnight", time_range=tr, + action_type=RoutingActionType.REJECT), + ]) + late = datetime(2026, 1, 5, 23, 30, tzinfo=ZoneInfo("UTC")) + early = datetime(2026, 1, 6, 4, 0, tzinfo=ZoneInfo("UTC")) + mid_day = datetime(2026, 1, 5, 12, 0, tzinfo=ZoneInfo("UTC")) + assert (await svc.evaluate("+1", "+2", now=late)).matched_rule_name == "overnight" + assert (await svc.evaluate("+1", "+2", now=early)).matched_rule_name == "overnight" + assert (await svc.evaluate("+1", "+2", now=mid_day)).matched_rule_name is None diff --git a/tests/test_tts.py b/tests/test_tts.py new file mode 100644 index 0000000..ecac718 --- /dev/null +++ b/tests/test_tts.py @@ -0,0 +1,72 @@ +"""Tests for the Rhema TTS client (services/tts.py).""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from config import TTSSettings +from services.tts import TTSService + + +def _settings(**overrides) -> TTSSettings: + defaults = { + "base_url": "http://localhost:9000", + "model": "speaches-ai/Kokoro-82M-v1.0-ONNX", + "voice": "af_heart", + "api_key": "", + "timeout": 5.0, + } + defaults.update(overrides) + return TTSSettings(**defaults) + + +class TestTTSService: + @pytest.mark.asyncio + async def test_synthesize_sends_expected_body(self): + svc = TTSService(_settings()) + + mock_response = MagicMock() + mock_response.content = b"RIFFfake-wav-bytes" + mock_response.raise_for_status = MagicMock() + + mock_client = MagicMock() + mock_client.is_closed = False + mock_client.post = AsyncMock(return_value=mock_response) + + with patch.object(svc, "_get_client", AsyncMock(return_value=mock_client)): + audio = await svc.synthesize("hello world", voice="am_michael") + + assert audio == b"RIFFfake-wav-bytes" + mock_client.post.assert_awaited_once() + call_args = mock_client.post.call_args + assert call_args.args[0] == "/v1/audio/speech" + body = call_args.kwargs["json"] + assert body["input"] == "hello world" + assert body["voice"] == "am_michael" + assert body["response_format"] == "wav" + + @pytest.mark.asyncio + async def test_synthesize_empty_text_returns_empty(self): + svc = TTSService(_settings()) + assert await svc.synthesize("") == b"" + assert await svc.synthesize(" ") == b"" + + @pytest.mark.asyncio + async def test_synthesize_to_file_writes_audio(self, tmp_path): + svc = TTSService(_settings()) + target = tmp_path / "out.wav" + + with patch.object(svc, "synthesize", AsyncMock(return_value=b"WAVE-bytes")): + ok = await svc.synthesize_to_file("hello", str(target)) + + assert ok is True + assert target.read_bytes() == b"WAVE-bytes" + + @pytest.mark.asyncio + async def test_bearer_header_used_when_api_key_set(self): + svc = TTSService(_settings(api_key="secret-token")) + client = await svc._get_client() + try: + assert client.headers.get("authorization") == "Bearer secret-token" + finally: + await svc.close()