Files
hold-slayer/services/receptionist.py
Robert Helewka 63f1a270bb feat: add call history API endpoints and TTS service client
Adds read-only access to persisted call records for the dashboard
and implements a client for the Rhema text-to-speech service.

- api/call_history.py: New router providing paged call lists
  and detailed call records with transcript metadata.
- services/tts.py: Async client for OpenAI-compatible TTS
  endpoints (Rhema/Kokoro) used for call-flow steps.
2026-05-22 06:28:33 -04:00

346 lines
13 KiB
Python

"""
AI Receptionist — Screens inbound calls, then routes or takes a message.
State machine:
GREET → TTS greeting plays into the call leg
LISTEN → buffer audio from the leg's tap until end-of-utterance
CLASSIFY → LLM extracts intent, urgency, recommended action
DECIDE → combine LLM recommendation with the routing decision
(rules win on conflict)
RING → ring_chain devices; bridge on pickup
RECORD → TTS prompt + WAV record up to message_max_seconds; transcribe
and notify
"""
import asyncio
import logging
import time as _time
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from models.call import ActiveCall, CallStatus
from models.events import EventType, GatewayEvent
from models.routing import RoutingAction, RoutingActionType, RoutingDecision
logger = logging.getLogger(__name__)
class ReceptionistService:
"""Drives the receptionist state machine for a single inbound call."""
def __init__(self, gateway):
self.gateway = gateway
self.settings = gateway.settings.receptionist
async def handle(
self,
call: ActiveCall,
sip_leg_id: str,
routing_decision: Optional[RoutingDecision] = None,
) -> None:
"""Run the full receptionist flow for an inbound call."""
try:
await self._greet(call, sip_leg_id)
transcript = await self._listen(call, sip_leg_id)
if transcript:
call.transcript_chunks.append(f"caller: {transcript}")
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.TRANSCRIPT_CHUNK,
call_id=call.id,
data={"text": transcript, "speaker": "caller"},
message=f"📝 caller: {transcript[:80]}",
))
classification = await self._classify(call, transcript, routing_decision)
call.intent = classification.get("intent")
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.RECEPTIONIST_CAPTURED_INTENT,
call_id=call.id,
data=classification,
message=f"Intent: {classification.get('intent', '?')}",
))
action = self._decide(routing_decision, classification)
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.RECEPTIONIST_ROUTING,
call_id=call.id,
data={"action": action.type.value},
message=f"Routing decision: {action.type.value}",
))
if action.type in (RoutingActionType.REJECT, RoutingActionType.DND):
if action.message:
await self._speak(call, sip_leg_id, action.message)
await self._hangup(call, sip_leg_id)
return
if action.type in (RoutingActionType.RING_DEVICE, RoutingActionType.RING_CHAIN):
devices = self._resolve_device_list(action, classification)
if not devices:
logger.info("Receptionist: no devices to ring, falling back to message")
await self._take_message(call, sip_leg_id)
return
await self._speak(
call, sip_leg_id, "One moment, I'll connect you now."
)
answered = await self.gateway._routing.ring_chain(
call.id, devices, action.ring_timeout
)
if answered:
return # Bridged to a device — receptionist done
# Nobody home — take a message
await self._take_message(call, sip_leg_id)
return
# Default: take a message
await self._take_message(call, sip_leg_id)
except Exception as e:
logger.error(f"Receptionist failed for {call.id}: {e}", exc_info=True)
try:
await self._hangup(call, sip_leg_id)
except Exception:
pass
# ----------------------------------------------------------------
# State machine steps
# ----------------------------------------------------------------
async def _greet(self, call: ActiveCall, sip_leg_id: str) -> None:
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.RECEPTIONIST_GREETING,
call_id=call.id,
data={"text": self.settings.greeting_template},
message="Playing greeting",
))
await self._speak(call, sip_leg_id, self.settings.greeting_template)
async def _listen(self, call: ActiveCall, sip_leg_id: str) -> str:
"""Buffer audio from the call's tap until silence or timeout."""
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.RECEPTIONIST_LISTENING,
call_id=call.id,
message="Listening for caller",
))
media = self.gateway.media_pipeline
if media is None:
return ""
tap = media.create_tap(sip_leg_id)
audio = bytearray()
deadline = _time.monotonic() + self.settings.listen_timeout_s
silent_for = 0.0
frame_ms = 20
try:
while _time.monotonic() < deadline:
remaining = max(0.05, deadline - _time.monotonic())
frame = await tap.read_frame(timeout=min(0.5, remaining))
if frame is None:
silent_for += 0.5
else:
audio.extend(frame)
if self._frame_is_silent(frame):
silent_for += frame_ms / 1000.0
else:
silent_for = 0.0
if silent_for >= self.settings.end_of_utterance_silence_s and audio:
break
finally:
tap.close()
if not audio:
return ""
return await self.gateway._transcription.transcribe(bytes(audio))
async def _classify(
self,
call: ActiveCall,
transcript: str,
routing_decision: Optional[RoutingDecision],
) -> dict:
"""Ask the LLM to interpret the caller's utterance."""
from services.hold_slayer import _get_llm
llm = _get_llm()
if llm is None or not transcript.strip():
return {
"intent": transcript or "unknown",
"urgency": "normal",
"recommended_action": "ring",
"device_hint": None,
}
rules_summary = ""
if routing_decision and routing_decision.matched_rule_name:
rules_summary = (
f"A routing rule already matched: '{routing_decision.matched_rule_name}' "
f"(action: {routing_decision.action.type.value})."
)
try:
return await llm.chat_json(
user_message=(
f"Caller: {call.remote_number}\n"
f"Transcript: {transcript}\n"
f"{rules_summary}\n\n"
"Return JSON with keys: intent (short string), "
"urgency (low|normal|high), "
"recommended_action (ring|message|reject), "
"device_hint (string or null)."
),
system=self.settings.llm_persona,
)
except Exception as e:
logger.warning(f"Receptionist LLM classify failed: {e}")
return {
"intent": transcript,
"urgency": "normal",
"recommended_action": "ring",
"device_hint": None,
}
def _decide(
self,
routing_decision: Optional[RoutingDecision],
classification: dict,
) -> RoutingAction:
"""Rules win on conflict; otherwise use the LLM's recommendation."""
if routing_decision and routing_decision.action.type not in (
RoutingActionType.TAKE_MESSAGE,
):
return routing_decision.action
recommended = (classification.get("recommended_action") or "ring").lower()
if recommended == "reject":
return RoutingAction(type=RoutingActionType.REJECT,
message="Sorry, I can't connect that call right now.")
if recommended == "message":
return RoutingAction(type=RoutingActionType.TAKE_MESSAGE)
return RoutingAction(type=RoutingActionType.RING_CHAIN)
def _resolve_device_list(
self, action: RoutingAction, classification: dict
) -> list[str]:
if action.type == RoutingActionType.RING_DEVICE and action.device_id:
return [action.device_id]
if action.device_ids:
return action.device_ids
# Default chain: every device that can take a call, in priority order
devices = sorted(
(d for d in self.gateway.devices.values() if d.can_receive_call),
key=lambda d: d.priority,
)
return [d.id for d in devices]
async def _take_message(self, call: ActiveCall, sip_leg_id: str) -> None:
await self._speak(call, sip_leg_id, self.settings.message_prompt)
media = self.gateway.media_pipeline
recording_svc = getattr(self.gateway, "_recording_service", None)
if recording_svc is None or media is None:
logger.warning("Receptionist: recording unavailable, ending call")
await self._hangup(call, sip_leg_id)
return
# RecordingService writes a WAV file and the recordings row.
session = await recording_svc.start_recording(
call.id, media_pipeline=media, leg_ids=[sip_leg_id]
)
try:
await asyncio.sleep(self.settings.message_max_seconds)
finally:
session = await recording_svc.stop_recording(
call.id, media_pipeline=media
)
message_text = ""
rec_path = session.filepath_mixed if session else None
if rec_path and Path(rec_path).exists():
try:
audio_bytes = Path(rec_path).read_bytes()
message_text = await self.gateway._transcription.transcribe(audio_bytes)
except Exception as e:
logger.warning(f"Receptionist transcribe failed: {e}")
if message_text:
call.transcript_chunks.append(f"caller_message: {message_text}")
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.RECEPTIONIST_MESSAGE_SAVED,
call_id=call.id,
data={
"path": rec_path,
"transcript": message_text,
"caller": call.remote_number,
},
message=f"📥 Message saved from {call.remote_number}",
))
await self._hangup(call, sip_leg_id)
# ----------------------------------------------------------------
# Helpers
# ----------------------------------------------------------------
async def _speak(self, call: ActiveCall, sip_leg_id: str, text: str) -> None:
tts = self.gateway._tts
media = self.gateway.media_pipeline
if tts is None or media is None or not text.strip():
return
import os
import tempfile
fd, tmp_path = tempfile.mkstemp(suffix=".wav", prefix=f"recept_{call.id}_")
os.close(fd)
try:
ok = await tts.synthesize_to_file(text, tmp_path)
if not ok:
return
await media.play_wav(sip_leg_id, tmp_path)
await self.gateway.event_bus.publish(GatewayEvent(
type=EventType.SPEAK_PLAYED,
call_id=call.id,
data={"text": text, "speaker": "receptionist"},
message=f"🗣️ {text[:80]}",
))
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
async def _hangup(self, call: ActiveCall, sip_leg_id: str) -> None:
try:
await self.gateway.sip_engine.hangup(sip_leg_id)
except Exception as e:
logger.warning(f"Receptionist hangup failed: {e}")
await self.gateway.call_manager.end_call(call.id, CallStatus.COMPLETED)
@staticmethod
def _frame_is_silent(frame: bytes, threshold: int = 500) -> bool:
"""Crude RMS-style check on a 16-bit PCM frame (mono, signed LE)."""
if not frame or len(frame) < 2:
return True
# Inline RMS — `audioop` was removed in Python 3.13.
import struct
n = len(frame) // 2
if n == 0:
return True
samples = struct.unpack_from(f"<{n}h", frame)
sq = 0
for s in samples:
sq += s * s
rms = (sq / n) ** 0.5
return rms < threshold