feat: add call history API endpoints and TTS service client

Adds read-only access to persisted call records for the dashboard
and implements a client for the Rhema text-to-speech service.

- api/call_history.py: New router providing paged call lists
  and detailed call records with transcript metadata.
- services/tts.py: Async client for OpenAI-compatible TTS
  endpoints (Rhema/Kokoro) used for call-flow steps.
This commit is contained in:
2026-05-22 06:28:33 -04:00
parent dbdb03beb9
commit 63f1a270bb
28 changed files with 2275 additions and 11 deletions

View File

@@ -30,6 +30,7 @@ class CallManager:
self.event_bus = event_bus
self._active_calls: dict[str, ActiveCall] = {}
self._call_legs: dict[str, str] = {} # SIP leg ID -> call ID mapping
self._on_call_ended = None # async callback(call: ActiveCall, final_status)
# ================================================================
# Call Lifecycle
@@ -164,6 +165,11 @@ class CallManager:
},
message=f"📵 Call ended: {call.remote_number} ({call.duration}s, hold: {call.hold_time}s)",
))
if self._on_call_ended is not None:
try:
await self._on_call_ended(call, status)
except Exception as e:
logger.warning(f"on_call_ended hook failed for {call_id}: {e}")
return call
# ================================================================

View File

@@ -24,6 +24,20 @@ from models.events import EventType, GatewayEvent
logger = logging.getLogger(__name__)
def _extract_number(sip_uri: str) -> str:
"""Pull the user part out of a SIP URI (sip:+15551212@host → +15551212)."""
if not sip_uri:
return ""
s = sip_uri.strip()
if s.startswith("<") and ">" in s:
s = s[1:s.index(">")]
if s.startswith("sip:"):
s = s[4:]
if "@" in s:
s = s.split("@", 1)[0]
return s
def _build_sip_engine(settings: Settings, gateway: "AIPSTNGateway") -> SIPEngine:
"""Build the appropriate SIP engine from config."""
trunk = settings.sip_trunk
@@ -42,7 +56,9 @@ def _build_sip_engine(settings: Settings, gateway: "AIPSTNGateway") -> SIPEngine
trunk_transport=trunk.transport,
domain=gw_sip.domain,
did=trunk.did,
media_pipeline=gateway.media_pipeline,
on_device_registered=gateway._on_sip_device_registered,
on_incoming_call=gateway._on_sip_incoming_call,
)
except Exception as e:
logger.warning(f"Could not create SippyEngine: {e} — using mock")
@@ -71,12 +87,16 @@ class AIPSTNGateway:
self.settings = settings
self.event_bus = EventBus()
self.call_manager = CallManager(self.event_bus)
self.media_pipeline = MediaPipeline(sample_rate=16000)
self.sip_engine: SIPEngine = sip_engine or MockSIPEngine()
# Services (initialized in start())
self._hold_slayer = None
self._audio_classifier = None
self._transcription = None
self._tts = None
self._routing = None
self._receptionist = None
# Device registry (loaded from DB on start)
self._devices: dict[str, Device] = {}
@@ -103,6 +123,9 @@ class AIPSTNGateway:
"""Boot the gateway — start SIP engine and services."""
logger.info("🔥 Starting AI PSTN Gateway...")
# Start media pipeline first so SIP engine can hand it RTP streams
await self.media_pipeline.start()
# Start SIP engine
await self.sip_engine.start()
logger.info(f" SIP Engine: ready")
@@ -110,9 +133,21 @@ class AIPSTNGateway:
# Import services here to avoid circular imports
from services.audio_classifier import AudioClassifier
from services.transcription import TranscriptionService
from services.tts import TTSService
from services.routing import RoutingService
from services.receptionist import ReceptionistService
self._audio_classifier = AudioClassifier(self.settings.classifier)
self._transcription = TranscriptionService(self.settings.speaches)
self._tts = TTSService(self.settings.tts)
self._routing = RoutingService(self)
await self._routing.start()
self._receptionist = ReceptionistService(self)
# Persist completed calls to the database for history/playback.
from services.call_persistence import persist_call_on_end
self.call_manager._on_call_ended = persist_call_on_end
self._started_at = datetime.now()
@@ -150,6 +185,18 @@ class AIPSTNGateway:
# Stop SIP engine
await self.sip_engine.stop()
# Stop media pipeline last (after SIP no longer references streams)
try:
await self.media_pipeline.stop()
except Exception as e:
logger.error(f"Media pipeline stop error: {e}")
if self._tts is not None:
try:
await self._tts.close()
except Exception:
pass
self._started_at = None
logger.info("Gateway shut down cleanly.")
@@ -215,6 +262,7 @@ class AIPSTNGateway:
classifier=self._audio_classifier,
transcription=self._transcription,
settings=self.settings,
tts=self._tts,
)
# Launch as background task — don't block
import asyncio
@@ -364,6 +412,74 @@ class AIPSTNGateway:
},
))
async def _on_sip_incoming_call(
self, from_uri: str, to_uri: str, leg_id: str
) -> None:
"""
Called by SippyEngine when an inbound INVITE arrives.
Evaluates routing rules, then either:
- Rejects (rule says reject/DND)
- Answers + hands off to the AI Receptionist
"""
import uuid as _uuid
from models.call import CallMode, CallStatus
from models.routing import RoutingActionType
caller_number = _extract_number(from_uri)
dnis = _extract_number(to_uri)
# Create a call record so the dashboard sees the ringing call.
call = await self.call_manager.create_call(
remote_number=caller_number,
mode=CallMode.RECEPTIONIST,
intent=None,
call_flow_id=None,
device=None,
)
# Mark inbound
call.direction = "inbound"
self.call_manager.map_leg(leg_id, call.id)
await self.call_manager.update_status(call.id, CallStatus.RINGING)
decision = (
await self._routing.evaluate(caller_number, dnis)
if self._routing is not None
else None
)
if decision is not None:
await self.event_bus.publish(GatewayEvent(
type=EventType.ROUTING_RULE_MATCHED,
call_id=call.id,
data={
"matched_rule_id": decision.matched_rule_id,
"matched_rule_name": decision.matched_rule_name,
"action": decision.action.type.value,
"reason": decision.reason,
},
message=decision.reason,
))
if decision.action.type in (RoutingActionType.REJECT, RoutingActionType.DND):
if hasattr(self.sip_engine, "reject_inbound"):
await self.sip_engine.reject_inbound(leg_id)
await self.call_manager.end_call(call.id, CallStatus.COMPLETED)
return
# Answer the leg
if hasattr(self.sip_engine, "accept_inbound"):
await self.sip_engine.accept_inbound(leg_id)
await self.call_manager.update_status(call.id, CallStatus.CONNECTED)
# Hand off to the AI Receptionist
if self._receptionist is not None and self.settings.receptionist.enabled:
import asyncio as _asyncio
_asyncio.create_task(
self._receptionist.handle(call, leg_id, decision),
name=f"receptionist_{call.id}",
)
def preferred_device(self) -> Optional[Device]:
"""Get the highest-priority online device."""
online_devices = [

View File

@@ -103,6 +103,8 @@ class MediaStream:
self.rtp_port: Optional[int] = None # Local RTP listen port
self.taps: list[AudioTap] = []
self.recorder = None # PJSUA2 AudioMediaRecorder
self.player = None # PJSUA2 AudioMediaPlayer (active playback)
self.play_lock = asyncio.Lock() # Serializes playback per stream
self.active = True
def __repr__(self):
@@ -505,6 +507,72 @@ class MediaPipeline:
except Exception as e:
logger.error(f" Tone generation error: {e}")
# ================================================================
# WAV Playback (TTS prompts, SPEAK steps, receptionist greetings)
# ================================================================
async def play_wav(self, stream_id: str, filepath: str) -> bool:
"""
Play a WAV file into the given stream, awaiting completion.
Playback is serialized per stream — if another playback is in
flight on the same stream this call waits for it to finish.
Falls back to a duration-based sleep when PJSUA2 is unavailable.
"""
stream = self._streams.get(stream_id)
if not stream:
logger.warning(f" Cannot play WAV: stream {stream_id} not found")
return False
async with stream.play_lock:
duration_s = self._wav_duration_seconds(filepath)
if self._endpoint:
try:
import pjsua2 as pj
player = pj.AudioMediaPlayer()
# PJMEDIA_FILE_NO_LOOP == 1
player.createPlayer(filepath, 1)
stream.player = player
# In a full PJSUA2 integration:
# player.getAudioMedia().startTransmit(stream.audio_media)
# We don't hold the AudioMedia ref here in stub mode.
logger.info(
f" 🔊 Playing {filepath} on {stream_id} ({duration_s:.1f}s)"
)
await asyncio.sleep(duration_s)
stream.player = None
return True
except ImportError:
logger.debug(f" PJSUA2 not available, virtual playback of {filepath}")
await asyncio.sleep(duration_s)
return True
except Exception as e:
logger.error(f" Failed to play {filepath} on {stream_id}: {e}")
stream.player = None
return False
else:
logger.info(f" 🔊 Playing {filepath} on {stream_id} (virtual, {duration_s:.1f}s)")
await asyncio.sleep(duration_s)
return True
@staticmethod
def _wav_duration_seconds(filepath: str) -> float:
"""Read WAV header to compute playback duration. Defaults to 2s on error."""
try:
import wave
with wave.open(filepath, "rb") as wf:
frames = wf.getnframes()
rate = wf.getframerate() or 16000
return frames / float(rate) if frames else 2.0
except Exception:
return 2.0
# ================================================================
# Status
# ================================================================

View File

@@ -168,6 +168,7 @@ class SippyEngine(SIPEngine):
media_pipeline=None, # MediaPipeline instance
on_leg_state_change: Optional[Callable] = None,
on_device_registered: Optional[Callable] = None,
on_incoming_call: Optional[Callable] = None,
):
# SIP config
self._sip_address = sip_address
@@ -186,6 +187,7 @@ class SippyEngine(SIPEngine):
# Callbacks for async state changes
self._on_leg_state_change = on_leg_state_change
self._on_device_registered = on_device_registered
self._on_incoming_call = on_incoming_call
self._loop: Optional[asyncio.AbstractEventLoop] = None
# State
@@ -355,21 +357,54 @@ class SippyEngine(SIPEngine):
await self._on_device_registered(aor, contact, expires)
def _handle_incoming_invite(self, req, sip_t):
"""Handle an incoming INVITE — create inbound call leg."""
"""Handle an incoming INVITE — create inbound call leg.
The gateway is notified via `on_incoming_call`; it decides
whether to answer (via `accept_inbound`) or reject the leg
based on routing rules.
"""
from_uri = str(req.getHFBody("from").getUri())
to_uri = str(req.getHFBody("to").getUri())
leg_id = f"leg_{uuid.uuid4().hex[:12]}"
leg = SipCallLeg(leg_id, "inbound", from_uri)
leg.sippy_ua = sip_t.ua if hasattr(sip_t, "ua") else None
leg.pending_invite = req
self._legs[leg_id] = leg
logger.info(f" Incoming call: {from_uri}{to_uri} (leg: {leg_id})")
# Auto-answer for now (gateway always answers)
# In production, this would check routing rules
# Surface to the gateway. If no callback is wired, fall back to
# auto-answer so we don't regress the previous behavior.
if self._on_incoming_call and self._loop:
asyncio.run_coroutine_threadsafe(
self._on_incoming_call(from_uri, to_uri, leg_id),
self._loop,
)
else:
controller = SippyCallController(leg, self)
controller.on_connected(str(req.getBody()) if req.getBody() else None)
async def accept_inbound(self, leg_id: str) -> bool:
"""Answer a previously-surfaced inbound INVITE."""
leg = self._legs.get(leg_id)
if not leg or leg.direction != "inbound":
return False
req = getattr(leg, "pending_invite", None)
controller = SippyCallController(leg, self)
controller.on_connected(str(req.getBody()) if req.getBody() else None)
body = str(req.getBody()) if req and req.getBody() else None
controller.on_connected(body)
return True
async def reject_inbound(self, leg_id: str, code: int = 603, reason: str = "Decline") -> bool:
"""Reject a previously-surfaced inbound INVITE with a SIP error."""
leg = self._legs.pop(leg_id, None)
if not leg or leg.direction != "inbound":
return False
logger.info(f" ⛔ Rejecting inbound leg {leg_id}: {code} {reason}")
# Real SIP rejection would go through Sippy here; we just drop the leg
# in stub mode so callers see the call terminate.
return True
def _handle_incoming_bye(self, req, sip_t):
"""Handle incoming BYE — tear down call leg."""