""" LLM Client — Unified interface for LLM-powered decision making. Used by Hold Slayer (IVR navigation fallback), Call Flow Learner, Receptionist, and Smart Routing services. Supports OpenAI-compatible APIs (OpenAI, Ollama, LM Studio, etc.) via httpx async client. No SDK dependency — just HTTP. """ import json import logging import time from typing import Any, Optional import httpx from config import get_settings logger = logging.getLogger(__name__) class LLMClient: """ Async LLM client for OpenAI-compatible chat completion APIs. Works with: - OpenAI API (api.openai.com) - Ollama (localhost:11434) - LM Studio (localhost:1234) - Any OpenAI-compatible endpoint Usage: client = LLMClient(base_url="http://localhost:11434/v1", model="llama3") response = await client.chat("What is 2+2?") # or structured: result = await client.chat_json( "Extract the menu options from this IVR transcript...", system="You are a phone menu parser.", ) """ def __init__( self, base_url: str = "http://localhost:11434/v1", model: str = "llama3", api_key: str = "not-needed", timeout: float = 30.0, max_tokens: int = 1024, temperature: float = 0.3, ): self.base_url = base_url.rstrip("/") self.model = model self.api_key = api_key self.timeout = timeout self.max_tokens = max_tokens self.temperature = temperature self._client = httpx.AsyncClient( base_url=self.base_url, headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", }, timeout=httpx.Timeout(timeout), ) # Stats self._total_requests = 0 self._total_tokens = 0 self._total_errors = 0 self._avg_latency_ms = 0.0 async def close(self): """Close the HTTP client.""" await self._client.aclose() # ================================================================ # Core Chat Methods # ================================================================ async def chat( self, user_message: str, system: Optional[str] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, ) -> str: """ Send a chat completion request and return the text response. Args: user_message: The user's message/prompt. system: Optional system prompt. temperature: Override default temperature. max_tokens: Override default max tokens. Returns: The assistant's response text. """ messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": user_message}) return await self._complete( messages, temperature=temperature or self.temperature, max_tokens=max_tokens or self.max_tokens, ) async def chat_json( self, user_message: str, system: Optional[str] = None, temperature: Optional[float] = None, ) -> dict[str, Any]: """ Chat completion that parses the response as JSON. The system prompt is augmented to request JSON output. Falls back to extracting JSON from markdown code blocks. Returns: Parsed JSON dict, or {"error": "..."} on parse failure. """ json_system = (system or "") + ( "\n\nIMPORTANT: Respond with valid JSON only. " "No markdown, no explanation, just the JSON object." ) response_text = await self.chat( user_message, system=json_system.strip(), temperature=temperature or 0.1, # Lower temp for structured output ) return self._parse_json_response(response_text) async def chat_with_history( self, messages: list[dict[str, str]], temperature: Optional[float] = None, max_tokens: Optional[int] = None, ) -> str: """ Chat with full message history (multi-turn conversation). Args: messages: List of {"role": "system|user|assistant", "content": "..."} Returns: The assistant's response text. """ return await self._complete( messages, temperature=temperature or self.temperature, max_tokens=max_tokens or self.max_tokens, ) # ================================================================ # Hold Slayer Specific Methods # ================================================================ async def analyze_ivr_menu( self, transcript: str, intent: str, previous_selections: Optional[list[str]] = None, ) -> dict[str, Any]: """ Analyze an IVR menu transcript and decide which option to press. This is the LLM fallback when regex-based menu parsing fails. Args: transcript: The IVR audio transcript. intent: What the user wants to accomplish. previous_selections: DTMF digits already pressed in this call. Returns: {"digit": "3", "reason": "Option 3 is for card cancellation", "confidence": 0.85} """ system = ( "You are an expert at navigating phone menus (IVR systems). " "Given an IVR transcript and the caller's intent, determine " "which menu option (DTMF digit) to press.\n\n" "Rules:\n" "- If there's a direct match for the intent, choose it.\n" "- If no direct match, choose 'speak to representative' or 'agent' option.\n" "- If menu says 'press 0 for operator', that's always a safe fallback.\n" "- Return the single digit to press.\n" "- If you truly can't determine the right option, return digit: null.\n" ) context = f"IVR Transcript:\n{transcript}\n\n" context += f"Caller's Intent: {intent}\n" if previous_selections: context += f"Already pressed: {', '.join(previous_selections)}\n" context += "\nWhich digit should be pressed? Return JSON." result = await self.chat_json(context, system=system) # Normalize response if "digit" not in result: # Try to extract from various response formats for key in ["option", "press", "choice", "dtmf"]: if key in result: result["digit"] = str(result[key]) break return result async def detect_human_speech( self, transcript: str, context: str = "", ) -> dict[str, Any]: """ Analyze a transcript to determine if a human agent is speaking. Used as a secondary check when audio classifier detects speech but we need to distinguish between IVR prompts and a live human. Returns: {"is_human": true, "confidence": 0.9, "reason": "Agent greeting detected"} """ system = ( "You are analyzing a phone call transcript to determine if " "a live human agent is speaking (vs an automated IVR system).\n\n" "Human indicators:\n" "- Personal greeting ('Hi, my name is...')\n" "- Asking for account details\n" "- Conversational tone, filler words\n" "- Acknowledging hold time ('Thanks for waiting')\n" "\nIVR indicators:\n" "- 'Press N for...', 'Say...'\n" "- Robotic phrasing\n" "- Menu options\n" "- 'Your call is important to us'\n" ) prompt = f"Transcript:\n{transcript}\n" if context: prompt += f"\nContext: {context}\n" prompt += "\nIs this a live human agent? Return JSON." return await self.chat_json(prompt, system=system) async def summarize_call( self, transcript_chunks: list[str], intent: str, duration_seconds: int, ) -> dict[str, Any]: """ Generate a call summary from transcript chunks. Used for call history and analytics. Returns: {"summary": "...", "outcome": "resolved|unresolved|transferred", "key_info": [...], "sentiment": "positive|neutral|negative"} """ system = ( "Summarize this phone call concisely. Include:\n" "- What the caller wanted\n" "- What happened (IVR navigation, hold time, agent interaction)\n" "- The outcome\n" "Return as JSON with: summary, outcome, key_info (list), sentiment." ) full_transcript = "\n".join(transcript_chunks) prompt = ( f"Caller's intent: {intent}\n" f"Call duration: {duration_seconds} seconds\n\n" f"Full transcript:\n{full_transcript}\n\n" "Summarize this call." ) return await self.chat_json(prompt, system=system) # ================================================================ # Internal # ================================================================ async def _complete( self, messages: list[dict[str, str]], temperature: float = 0.3, max_tokens: int = 1024, ) -> str: """Execute a chat completion request.""" self._total_requests += 1 start = time.monotonic() try: payload = { "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, } response = await self._client.post("/chat/completions", json=payload) response.raise_for_status() data = response.json() # Track token usage if "usage" in data: self._total_tokens += data["usage"].get("total_tokens", 0) # Track latency elapsed_ms = (time.monotonic() - start) * 1000 self._avg_latency_ms = ( self._avg_latency_ms * 0.9 + elapsed_ms * 0.1 ) # Extract response text choices = data.get("choices", []) if choices: return choices[0].get("message", {}).get("content", "") return "" except httpx.HTTPStatusError as e: self._total_errors += 1 logger.error(f"LLM API error: {e.response.status_code} {e.response.text[:200]}") return "" except httpx.TimeoutException: self._total_errors += 1 logger.error(f"LLM API timeout after {self.timeout}s") return "" except Exception as e: self._total_errors += 1 logger.error(f"LLM client error: {e}") return "" @staticmethod def _parse_json_response(text: str) -> dict[str, Any]: """Parse JSON from LLM response, handling common formatting issues.""" text = text.strip() # Try direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try extracting from markdown code block if "```" in text: # Find content between ```json and ``` or ``` and ``` parts = text.split("```") for i, part in enumerate(parts): if i % 2 == 1: # Odd indices are inside code blocks # Remove optional language tag content = part.strip() if content.startswith("json"): content = content[4:].strip() try: return json.loads(content) except json.JSONDecodeError: continue # Try finding JSON object in the text brace_start = text.find("{") brace_end = text.rfind("}") if brace_start != -1 and brace_end != -1: try: return json.loads(text[brace_start : brace_end + 1]) except json.JSONDecodeError: pass logger.warning(f"Failed to parse JSON from LLM response: {text[:200]}") return {"error": "Failed to parse JSON response", "raw": text[:500]} # ================================================================ # Stats # ================================================================ @property def stats(self) -> dict: return { "total_requests": self._total_requests, "total_tokens": self._total_tokens, "total_errors": self._total_errors, "avg_latency_ms": round(self._avg_latency_ms, 1), "model": self.model, "base_url": self.base_url, }