Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
392 lines
13 KiB
Python
392 lines
13 KiB
Python
"""
|
|
LLM Client — Unified interface for LLM-powered decision making.
|
|
|
|
Used by Hold Slayer (IVR navigation fallback), Call Flow Learner,
|
|
Receptionist, and Smart Routing services.
|
|
|
|
Supports OpenAI-compatible APIs (OpenAI, Ollama, LM Studio, etc.)
|
|
via httpx async client. No SDK dependency — just HTTP.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import Any, Optional
|
|
|
|
import httpx
|
|
|
|
from config import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LLMClient:
|
|
"""
|
|
Async LLM client for OpenAI-compatible chat completion APIs.
|
|
|
|
Works with:
|
|
- OpenAI API (api.openai.com)
|
|
- Ollama (localhost:11434)
|
|
- LM Studio (localhost:1234)
|
|
- Any OpenAI-compatible endpoint
|
|
|
|
Usage:
|
|
client = LLMClient(base_url="http://localhost:11434/v1", model="llama3")
|
|
response = await client.chat("What is 2+2?")
|
|
# or structured:
|
|
result = await client.chat_json(
|
|
"Extract the menu options from this IVR transcript...",
|
|
system="You are a phone menu parser.",
|
|
)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str = "http://localhost:11434/v1",
|
|
model: str = "llama3",
|
|
api_key: str = "not-needed",
|
|
timeout: float = 30.0,
|
|
max_tokens: int = 1024,
|
|
temperature: float = 0.3,
|
|
):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.model = model
|
|
self.api_key = api_key
|
|
self.timeout = timeout
|
|
self.max_tokens = max_tokens
|
|
self.temperature = temperature
|
|
|
|
self._client = httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
headers={
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
"Content-Type": "application/json",
|
|
},
|
|
timeout=httpx.Timeout(timeout),
|
|
)
|
|
|
|
# Stats
|
|
self._total_requests = 0
|
|
self._total_tokens = 0
|
|
self._total_errors = 0
|
|
self._avg_latency_ms = 0.0
|
|
|
|
async def close(self):
|
|
"""Close the HTTP client."""
|
|
await self._client.aclose()
|
|
|
|
# ================================================================
|
|
# Core Chat Methods
|
|
# ================================================================
|
|
|
|
async def chat(
|
|
self,
|
|
user_message: str,
|
|
system: Optional[str] = None,
|
|
temperature: Optional[float] = None,
|
|
max_tokens: Optional[int] = None,
|
|
) -> str:
|
|
"""
|
|
Send a chat completion request and return the text response.
|
|
|
|
Args:
|
|
user_message: The user's message/prompt.
|
|
system: Optional system prompt.
|
|
temperature: Override default temperature.
|
|
max_tokens: Override default max tokens.
|
|
|
|
Returns:
|
|
The assistant's response text.
|
|
"""
|
|
messages = []
|
|
if system:
|
|
messages.append({"role": "system", "content": system})
|
|
messages.append({"role": "user", "content": user_message})
|
|
|
|
return await self._complete(
|
|
messages,
|
|
temperature=temperature or self.temperature,
|
|
max_tokens=max_tokens or self.max_tokens,
|
|
)
|
|
|
|
async def chat_json(
|
|
self,
|
|
user_message: str,
|
|
system: Optional[str] = None,
|
|
temperature: Optional[float] = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Chat completion that parses the response as JSON.
|
|
|
|
The system prompt is augmented to request JSON output.
|
|
Falls back to extracting JSON from markdown code blocks.
|
|
|
|
Returns:
|
|
Parsed JSON dict, or {"error": "..."} on parse failure.
|
|
"""
|
|
json_system = (system or "") + (
|
|
"\n\nIMPORTANT: Respond with valid JSON only. "
|
|
"No markdown, no explanation, just the JSON object."
|
|
)
|
|
|
|
response_text = await self.chat(
|
|
user_message,
|
|
system=json_system.strip(),
|
|
temperature=temperature or 0.1, # Lower temp for structured output
|
|
)
|
|
|
|
return self._parse_json_response(response_text)
|
|
|
|
async def chat_with_history(
|
|
self,
|
|
messages: list[dict[str, str]],
|
|
temperature: Optional[float] = None,
|
|
max_tokens: Optional[int] = None,
|
|
) -> str:
|
|
"""
|
|
Chat with full message history (multi-turn conversation).
|
|
|
|
Args:
|
|
messages: List of {"role": "system|user|assistant", "content": "..."}
|
|
|
|
Returns:
|
|
The assistant's response text.
|
|
"""
|
|
return await self._complete(
|
|
messages,
|
|
temperature=temperature or self.temperature,
|
|
max_tokens=max_tokens or self.max_tokens,
|
|
)
|
|
|
|
# ================================================================
|
|
# Hold Slayer Specific Methods
|
|
# ================================================================
|
|
|
|
async def analyze_ivr_menu(
|
|
self,
|
|
transcript: str,
|
|
intent: str,
|
|
previous_selections: Optional[list[str]] = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze an IVR menu transcript and decide which option to press.
|
|
|
|
This is the LLM fallback when regex-based menu parsing fails.
|
|
|
|
Args:
|
|
transcript: The IVR audio transcript.
|
|
intent: What the user wants to accomplish.
|
|
previous_selections: DTMF digits already pressed in this call.
|
|
|
|
Returns:
|
|
{"digit": "3", "reason": "Option 3 is for card cancellation",
|
|
"confidence": 0.85}
|
|
"""
|
|
system = (
|
|
"You are an expert at navigating phone menus (IVR systems). "
|
|
"Given an IVR transcript and the caller's intent, determine "
|
|
"which menu option (DTMF digit) to press.\n\n"
|
|
"Rules:\n"
|
|
"- If there's a direct match for the intent, choose it.\n"
|
|
"- If no direct match, choose 'speak to representative' or 'agent' option.\n"
|
|
"- If menu says 'press 0 for operator', that's always a safe fallback.\n"
|
|
"- Return the single digit to press.\n"
|
|
"- If you truly can't determine the right option, return digit: null.\n"
|
|
)
|
|
|
|
context = f"IVR Transcript:\n{transcript}\n\n"
|
|
context += f"Caller's Intent: {intent}\n"
|
|
if previous_selections:
|
|
context += f"Already pressed: {', '.join(previous_selections)}\n"
|
|
context += "\nWhich digit should be pressed? Return JSON."
|
|
|
|
result = await self.chat_json(context, system=system)
|
|
|
|
# Normalize response
|
|
if "digit" not in result:
|
|
# Try to extract from various response formats
|
|
for key in ["option", "press", "choice", "dtmf"]:
|
|
if key in result:
|
|
result["digit"] = str(result[key])
|
|
break
|
|
|
|
return result
|
|
|
|
async def detect_human_speech(
|
|
self,
|
|
transcript: str,
|
|
context: str = "",
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Analyze a transcript to determine if a human agent is speaking.
|
|
|
|
Used as a secondary check when audio classifier detects speech
|
|
but we need to distinguish between IVR prompts and a live human.
|
|
|
|
Returns:
|
|
{"is_human": true, "confidence": 0.9, "reason": "Agent greeting detected"}
|
|
"""
|
|
system = (
|
|
"You are analyzing a phone call transcript to determine if "
|
|
"a live human agent is speaking (vs an automated IVR system).\n\n"
|
|
"Human indicators:\n"
|
|
"- Personal greeting ('Hi, my name is...')\n"
|
|
"- Asking for account details\n"
|
|
"- Conversational tone, filler words\n"
|
|
"- Acknowledging hold time ('Thanks for waiting')\n"
|
|
"\nIVR indicators:\n"
|
|
"- 'Press N for...', 'Say...'\n"
|
|
"- Robotic phrasing\n"
|
|
"- Menu options\n"
|
|
"- 'Your call is important to us'\n"
|
|
)
|
|
|
|
prompt = f"Transcript:\n{transcript}\n"
|
|
if context:
|
|
prompt += f"\nContext: {context}\n"
|
|
prompt += "\nIs this a live human agent? Return JSON."
|
|
|
|
return await self.chat_json(prompt, system=system)
|
|
|
|
async def summarize_call(
|
|
self,
|
|
transcript_chunks: list[str],
|
|
intent: str,
|
|
duration_seconds: int,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Generate a call summary from transcript chunks.
|
|
|
|
Used for call history and analytics.
|
|
|
|
Returns:
|
|
{"summary": "...", "outcome": "resolved|unresolved|transferred",
|
|
"key_info": [...], "sentiment": "positive|neutral|negative"}
|
|
"""
|
|
system = (
|
|
"Summarize this phone call concisely. Include:\n"
|
|
"- What the caller wanted\n"
|
|
"- What happened (IVR navigation, hold time, agent interaction)\n"
|
|
"- The outcome\n"
|
|
"Return as JSON with: summary, outcome, key_info (list), sentiment."
|
|
)
|
|
|
|
full_transcript = "\n".join(transcript_chunks)
|
|
prompt = (
|
|
f"Caller's intent: {intent}\n"
|
|
f"Call duration: {duration_seconds} seconds\n\n"
|
|
f"Full transcript:\n{full_transcript}\n\n"
|
|
"Summarize this call."
|
|
)
|
|
|
|
return await self.chat_json(prompt, system=system)
|
|
|
|
# ================================================================
|
|
# Internal
|
|
# ================================================================
|
|
|
|
async def _complete(
|
|
self,
|
|
messages: list[dict[str, str]],
|
|
temperature: float = 0.3,
|
|
max_tokens: int = 1024,
|
|
) -> str:
|
|
"""Execute a chat completion request."""
|
|
self._total_requests += 1
|
|
start = time.monotonic()
|
|
|
|
try:
|
|
payload = {
|
|
"model": self.model,
|
|
"messages": messages,
|
|
"temperature": temperature,
|
|
"max_tokens": max_tokens,
|
|
}
|
|
|
|
response = await self._client.post("/chat/completions", json=payload)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
|
|
# Track token usage
|
|
if "usage" in data:
|
|
self._total_tokens += data["usage"].get("total_tokens", 0)
|
|
|
|
# Track latency
|
|
elapsed_ms = (time.monotonic() - start) * 1000
|
|
self._avg_latency_ms = (
|
|
self._avg_latency_ms * 0.9 + elapsed_ms * 0.1
|
|
)
|
|
|
|
# Extract response text
|
|
choices = data.get("choices", [])
|
|
if choices:
|
|
return choices[0].get("message", {}).get("content", "")
|
|
return ""
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
self._total_errors += 1
|
|
logger.error(f"LLM API error: {e.response.status_code} {e.response.text[:200]}")
|
|
return ""
|
|
except httpx.TimeoutException:
|
|
self._total_errors += 1
|
|
logger.error(f"LLM API timeout after {self.timeout}s")
|
|
return ""
|
|
except Exception as e:
|
|
self._total_errors += 1
|
|
logger.error(f"LLM client error: {e}")
|
|
return ""
|
|
|
|
@staticmethod
|
|
def _parse_json_response(text: str) -> dict[str, Any]:
|
|
"""Parse JSON from LLM response, handling common formatting issues."""
|
|
text = text.strip()
|
|
|
|
# Try direct parse
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Try extracting from markdown code block
|
|
if "```" in text:
|
|
# Find content between ```json and ``` or ``` and ```
|
|
parts = text.split("```")
|
|
for i, part in enumerate(parts):
|
|
if i % 2 == 1: # Odd indices are inside code blocks
|
|
# Remove optional language tag
|
|
content = part.strip()
|
|
if content.startswith("json"):
|
|
content = content[4:].strip()
|
|
try:
|
|
return json.loads(content)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Try finding JSON object in the text
|
|
brace_start = text.find("{")
|
|
brace_end = text.rfind("}")
|
|
if brace_start != -1 and brace_end != -1:
|
|
try:
|
|
return json.loads(text[brace_start : brace_end + 1])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
logger.warning(f"Failed to parse JSON from LLM response: {text[:200]}")
|
|
return {"error": "Failed to parse JSON response", "raw": text[:500]}
|
|
|
|
# ================================================================
|
|
# Stats
|
|
# ================================================================
|
|
|
|
@property
|
|
def stats(self) -> dict:
|
|
return {
|
|
"total_requests": self._total_requests,
|
|
"total_tokens": self._total_tokens,
|
|
"total_errors": self._total_errors,
|
|
"avg_latency_ms": round(self._avg_latency_ms, 1),
|
|
"model": self.model,
|
|
"base_url": self.base_url,
|
|
}
|