""" MCP Server — AI assistant tools and resources for gateway control. Any MCP-compatible AI assistant can use these tools to: - Place calls and launch the Hold Slayer - Check call status - Manage call flows - Search transcripts - Control devices Example from an AI assistant: "Call Chase Bank and dispute the charge from Amazon on Dec 15th" → make_call("+18005551234", "hold_slayer", "dispute Amazon charge Dec 15th", "chase-bank-main") """ import json import logging from typing import Optional from fastmcp import FastMCP from core.gateway import AIPSTNGateway logger = logging.getLogger(__name__) def create_mcp_server(gateway: AIPSTNGateway) -> FastMCP: """Create and configure the MCP server with all tools and resources.""" mcp = FastMCP("Hold Slayer Gateway") # ================================================================ # Tools # ================================================================ @mcp.tool() async def make_call( number: str, mode: str = "direct", intent: str = "", call_flow_id: str = "", device: str = "", ) -> str: """ Place an outbound phone call. Args: number: Phone number to call (E.164 format, e.g., +18005551234) mode: "direct" (connect immediately), "hold_slayer" (navigate IVR + wait on hold), or "ai_assisted" intent: What you need — used by hold_slayer to navigate IVR menus (e.g., "dispute a charge", "cancel my card") call_flow_id: Optional stored call flow ID to follow (e.g., "chase-bank-main") device: Target device to ring/transfer to (e.g., "sip_phone", "cell") Returns: Call ID and status """ from models.call import CallMode mode_map = { "direct": CallMode.DIRECT, "hold_slayer": CallMode.HOLD_SLAYER, "ai_assisted": CallMode.AI_ASSISTED, } call = await gateway.make_call( number=number, mode=mode_map.get(mode, CallMode.DIRECT), intent=intent or None, call_flow_id=call_flow_id or None, device=device or None, ) return ( f"Call {call.id} initiated.\n" f" Number: {number}\n" f" Mode: {mode}\n" f" Status: {call.status.value}\n" f" Intent: {intent or 'N/A'}\n" f" Call Flow: {call_flow_id or 'exploration mode'}" ) @mcp.tool() async def get_call_status(call_id: str) -> str: """ Get the current status of a call. Shows: status, duration, hold time, current audio type, recent transcript. """ call = gateway.get_call(call_id) if not call: return f"Call {call_id} not found. It may have already ended." transcript_tail = call.transcript[-300:] if call.transcript else "No transcript yet" return ( f"Call {call_id}:\n" f" Number: {call.remote_number}\n" f" Status: {call.status.value}\n" f" Mode: {call.mode.value}\n" f" Duration: {call.duration}s\n" f" Hold Time: {call.hold_time}s\n" f" Audio Type: {call.current_classification.value}\n" f" Intent: {call.intent or 'N/A'}\n" f" Current Step: {call.current_step_id or 'N/A'}\n" f" Transcript (last 300 chars): ...{transcript_tail}" ) @mcp.tool() async def transfer_call(call_id: str, device: str) -> str: """ Transfer an active call to a specific device. Args: call_id: The call to transfer device: Target device ID (e.g., "sip_phone", "cell") """ try: await gateway.transfer_call(call_id, device) return f"Call {call_id} transferred to {device}." except ValueError as e: return f"Transfer failed: {e}" @mcp.tool() async def hangup(call_id: str) -> str: """Hang up a call.""" try: await gateway.hangup_call(call_id) return f"Call {call_id} hung up." except ValueError as e: return f"Hangup failed: {e}" @mcp.tool() async def list_active_calls() -> str: """List all currently active calls with their status.""" calls = gateway.call_manager.active_calls if not calls: return "No active calls." lines = ["Active calls:"] for call in calls.values(): lines.append( f" {call.id}: {call.remote_number} " f"({call.status.value}, {call.duration}s, " f"hold: {call.hold_time}s, " f"audio: {call.current_classification.value})" ) return "\n".join(lines) @mcp.tool() async def get_call_flow(phone_number: str) -> str: """ Look up a stored call flow for a phone number. Returns the IVR navigation tree if one exists. """ from db.database import StoredCallFlow, get_session_factory from sqlalchemy import select try: factory = get_session_factory() async with factory() as session: result = await session.execute( select(StoredCallFlow).where( StoredCallFlow.phone_number == phone_number ) ) row = result.scalar_one_or_none() if not row: return f"No stored call flow for {phone_number}." return ( f"Call Flow: {row.name}\n" f" Phone: {row.phone_number}\n" f" Description: {row.description}\n" f" Steps: {len(row.steps)}\n" f" Avg Hold Time: {row.avg_hold_time or 'unknown'}s\n" f" Success Rate: {row.success_rate or 'unknown'}\n" f" Times Used: {row.times_used or 0}\n" f" Last Used: {row.last_used or 'never'}\n" f" Notes: {row.notes or 'none'}\n" f" Flow ID: {row.id}" ) except Exception as e: return f"Error looking up call flow: {e}" @mcp.tool() async def create_call_flow( name: str, phone_number: str, steps_json: str, notes: str = "", ) -> str: """ Store a new IVR call flow for a phone number. The hold slayer will follow this tree instead of exploring blind. Args: name: Human-readable name (e.g., "Chase Bank - Main Customer Service") phone_number: Phone number in E.164 format steps_json: JSON array of call flow steps. Each step has: - id: unique step identifier - description: what this step does - action: "dtmf", "speak", "wait", "listen", "hold", or "transfer" - action_value: DTMF digits, speech text, or device target - expect: regex/keywords for what you expect to hear - timeout: seconds to wait - next_step: ID of next step on success - fallback_step: ID of step if unexpected response - notes: any helpful notes notes: General notes about this call flow """ from slugify import slugify as do_slugify from db.database import StoredCallFlow, get_session_factory try: steps = json.loads(steps_json) flow_id = do_slugify(name) factory = get_session_factory() async with factory() as session: db_flow = StoredCallFlow( id=flow_id, name=name, phone_number=phone_number, description=f"Created by AI assistant", steps=steps, notes=notes or None, tags=["ai-created"], ) session.add(db_flow) await session.commit() return f"Call flow '{name}' saved for {phone_number} (ID: {flow_id})" except json.JSONDecodeError: return "Error: steps_json must be valid JSON." except Exception as e: return f"Error creating call flow: {e}" @mcp.tool() async def send_dtmf(call_id: str, digits: str) -> str: """ Send DTMF tones on an active call. Args: call_id: The call to send tones on digits: DTMF digits to send (e.g., "1", "2", "123#") """ call = gateway.get_call(call_id) if not call: return f"Call {call_id} not found." for leg_id, cid in gateway.call_manager._call_legs.items(): if cid == call_id: await gateway.sip_engine.send_dtmf(leg_id, digits) return f"Sent DTMF '{digits}' on call {call_id}." return f"No active SIP leg found for call {call_id}." @mcp.tool() async def get_call_transcript(call_id: str) -> str: """ Get the full transcript for an active or recent call. Returns the complete transcript text. """ call = gateway.get_call(call_id) if not call: return f"Call {call_id} not found." if not call.transcript: return f"No transcript yet for call {call_id}." return ( f"Transcript for call {call_id} " f"({call.remote_number}, {call.duration}s):\n\n" f"{call.transcript}" ) @mcp.tool() async def get_call_recording(call_id: str) -> str: """ Get info about a call's recording. Returns the recording file path and status. """ from db.database import CallRecord, get_session_factory from sqlalchemy import select try: factory = get_session_factory() async with factory() as session: result = await session.execute( select(CallRecord).where(CallRecord.id == call_id) ) record = result.scalar_one_or_none() if not record: return f"No record found for call {call_id}." if not record.recording_path: return f"Call {call_id} has no recording." return ( f"Recording for call {call_id}:\n" f" Path: {record.recording_path}\n" f" Duration: {record.duration}s\n" f" Number: {record.remote_number}" ) except Exception as e: return f"Error looking up recording: {e}" @mcp.tool() async def get_call_summary(call_id: str) -> str: """ Get an AI-generated summary and action items for a call. Returns the summary, action items, and sentiment analysis. """ from db.database import CallRecord, get_session_factory from sqlalchemy import select try: factory = get_session_factory() async with factory() as session: result = await session.execute( select(CallRecord).where(CallRecord.id == call_id) ) record = result.scalar_one_or_none() if not record: return f"No record found for call {call_id}." lines = [f"Call Summary for {call_id}:"] lines.append(f" Number: {record.remote_number}") lines.append(f" Status: {record.status}") lines.append(f" Duration: {record.duration}s") lines.append(f" Hold Time: {record.hold_time}s") if record.summary: lines.append(f"\n Summary: {record.summary}") else: lines.append("\n Summary: Not yet generated") if record.action_items: lines.append("\n Action Items:") for item in record.action_items: lines.append(f" • {item}") if record.sentiment: lines.append(f"\n Sentiment: {record.sentiment}") return "\n".join(lines) except Exception as e: return f"Error looking up call summary: {e}" @mcp.tool() async def search_call_history( phone_number: str = "", intent: str = "", limit: int = 10, ) -> str: """ Search past call records. Args: phone_number: Filter by phone number (partial match) intent: Filter by intent text (partial match) limit: Max results to return (default 10) """ from db.database import CallRecord, get_session_factory from sqlalchemy import select try: factory = get_session_factory() async with factory() as session: query = select(CallRecord).order_by( CallRecord.started_at.desc() ).limit(limit) if phone_number: query = query.where( CallRecord.remote_number.contains(phone_number) ) if intent: query = query.where( CallRecord.intent.icontains(intent) ) result = await session.execute(query) records = result.scalars().all() if not records: return "No matching call records found." lines = [f"Call History ({len(records)} records):"] for r in records: lines.append( f" {r.id}: {r.remote_number} " f"({r.status}, {r.duration}s, " f"hold: {r.hold_time}s) " f"— {r.intent or 'no intent'} " f"[{r.started_at}]" ) return "\n".join(lines) except Exception as e: return f"Error searching call history: {e}" @mcp.tool() async def learn_call_flow(call_id: str, name: str = "") -> str: """ Learn a call flow from a completed call's event history. Analyzes the IVR navigation events from a call to build a reusable call flow for next time. Args: call_id: The call to learn from name: Optional name for the flow (auto-generated if empty) """ from services.call_flow_learner import CallFlowLearner try: learner = CallFlowLearner(gateway.event_bus, gateway.settings) flow = await learner.learn_from_call(call_id, name or None) if flow: return ( f"Learned call flow '{flow.name}' from call {call_id}:\n" f" Phone: {flow.phone_number}\n" f" Steps: {len(flow.steps)}\n" f" Flow ID: {flow.id}" ) return f"Could not learn a call flow from call {call_id}. Not enough IVR navigation data." except Exception as e: return f"Error learning call flow: {e}" @mcp.tool() async def list_devices() -> str: """List all registered devices and their online/offline status.""" devices = gateway.devices if not devices: return "No devices registered." lines = ["Registered devices:"] for d in devices.values(): status = "🟢 Online" if d.is_online else "🔴 Offline" lines.append(f" {d.id}: {d.name} ({d.type.value}) - {status}") return "\n".join(lines) @mcp.tool() async def gateway_status() -> str: """Get full gateway status — trunk, devices, active calls, uptime.""" status = await gateway.status() trunk = status["trunk"] lines = [ "🔥 Hold Slayer Gateway Status", f" Uptime: {status['uptime'] or 0}s", f" SIP Trunk: {'✅ registered' if trunk.get('registered') else '❌ not registered'}", f" Active Calls: {status['active_calls']}", f" Event Subscribers: {status['event_subscribers']}", f" Devices:", ] for dev_id, info in status.get("devices", {}).items(): online = "🟢" if info.get("online") else "🔴" lines.append(f" {online} {info.get('name', dev_id)}") return "\n".join(lines) # ================================================================ # Resources # ================================================================ @mcp.resource("gateway://status") async def resource_gateway_status() -> str: """Current gateway status — trunk, devices, active calls.""" status = await gateway.status() return json.dumps(status, default=str, indent=2) @mcp.resource("gateway://call-flows") async def resource_call_flows() -> str: """List all stored call flows.""" from db.database import StoredCallFlow, get_session_factory from sqlalchemy import select try: factory = get_session_factory() async with factory() as session: result = await session.execute(select(StoredCallFlow)) rows = result.scalars().all() flows = [ { "id": r.id, "name": r.name, "phone_number": r.phone_number, "steps": len(r.steps) if r.steps else 0, "avg_hold_time": r.avg_hold_time, "times_used": r.times_used, } for r in rows ] return json.dumps(flows, default=str, indent=2) except Exception as e: return json.dumps({"error": str(e)}) @mcp.resource("gateway://active-calls") async def resource_active_calls() -> str: """All currently active calls.""" calls = gateway.call_manager.active_calls return json.dumps( [c.summary() for c in calls.values()], default=str, indent=2, ) return mcp