Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
231 lines
7.5 KiB
Python
231 lines
7.5 KiB
Python
"""
|
|
Hold Slayer Gateway — FastAPI Application Entry Point.
|
|
|
|
Your personal AI-powered telephony platform.
|
|
Navigates IVRs, waits on hold, and connects you when a human answers.
|
|
|
|
Usage:
|
|
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
|
|
|
|
# Or directly:
|
|
python main.py
|
|
"""
|
|
|
|
import logging
|
|
import sys
|
|
from contextlib import asynccontextmanager
|
|
|
|
from fastapi import FastAPI
|
|
from fastapi.staticfiles import StaticFiles
|
|
|
|
from api import calls, call_flows, devices, websocket
|
|
from config import get_settings
|
|
from core.gateway import AIPSTNGateway
|
|
from db.database import close_db, init_db
|
|
from mcp_server.server import create_mcp_server
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
stream=sys.stdout,
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _handle_db_error(exc: Exception) -> None:
|
|
"""Log a clear, human-readable database error and exit cleanly."""
|
|
# Walk the exception chain to find the root asyncpg/psycopg cause
|
|
cause = getattr(exc, "__cause__", None) or getattr(exc, "__context__", None)
|
|
root = cause or exc
|
|
root_type = type(root).__name__
|
|
root_msg = str(root)
|
|
|
|
if "InvalidPasswordError" in root_type or "password authentication failed" in root_msg:
|
|
logger.critical(
|
|
"\n"
|
|
"❌ Database authentication failed — wrong password.\n"
|
|
" The password in DATABASE_URL does not match the PostgreSQL user.\n"
|
|
" Fix DATABASE_URL in your .env file and restart.\n"
|
|
" Default: DATABASE_URL=postgresql+asyncpg://holdslayer:changeme@localhost:5432/holdslayer"
|
|
)
|
|
elif "InvalidCatalogNameError" in root_type or "does not exist" in root_msg:
|
|
logger.critical(
|
|
"\n"
|
|
"❌ Database does not exist.\n"
|
|
" Create it first: createdb holdslayer\n"
|
|
" Or update DATABASE_URL in your .env file."
|
|
)
|
|
elif (
|
|
"Connection refused" in root_msg
|
|
or "could not connect" in root_msg.lower()
|
|
):
|
|
logger.critical(
|
|
"\n"
|
|
"\u274c Cannot reach PostgreSQL \u2014 connection refused.\n"
|
|
" Is PostgreSQL running? Check DATABASE_URL in your .env file."
|
|
)
|
|
elif (
|
|
"nodename nor servname" in root_msg
|
|
or "Name or service not known" in root_msg
|
|
):
|
|
logger.critical(
|
|
"\n"
|
|
f"❌ Cannot resolve the database hostname.\n"
|
|
f" Check the host in DATABASE_URL in your .env file. (detail: {root_msg})"
|
|
)
|
|
else:
|
|
logger.critical(
|
|
f"\n❌ Database initialisation failed: {root_msg}\n"
|
|
f" Check DATABASE_URL in your .env file."
|
|
)
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Startup: Initialize database, SIP engine, and services."""
|
|
settings = get_settings()
|
|
|
|
# Initialize database
|
|
logger.info("Initializing database...")
|
|
try:
|
|
await init_db()
|
|
except Exception as e:
|
|
_handle_db_error(e)
|
|
|
|
# Boot the telephony engine
|
|
gateway = AIPSTNGateway.from_config()
|
|
await gateway.start()
|
|
app.state.gateway = gateway
|
|
|
|
# Start auxiliary services
|
|
from services.notification import NotificationService
|
|
from services.recording import RecordingService
|
|
from services.call_analytics import CallAnalytics
|
|
from services.call_flow_learner import CallFlowLearner
|
|
|
|
notification_svc = NotificationService(gateway.event_bus, settings)
|
|
await notification_svc.start()
|
|
app.state.notification_service = notification_svc
|
|
|
|
recording_svc = RecordingService()
|
|
await recording_svc.start()
|
|
app.state.recording_service = recording_svc
|
|
|
|
analytics_svc = CallAnalytics()
|
|
app.state.analytics_service = analytics_svc
|
|
|
|
flow_learner = CallFlowLearner()
|
|
app.state.flow_learner = flow_learner
|
|
|
|
# Create and mount MCP server
|
|
mcp = create_mcp_server(gateway)
|
|
app.state.mcp = mcp
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("🔥 Hold Slayer Gateway is LIVE")
|
|
# Show a usable URL — 0.0.0.0 is the bind address, not a browser URL
|
|
display_host = "localhost" if settings.host in ("0.0.0.0", "::") else settings.host
|
|
# When launched via `uvicorn main:app --port XXXX`, the CLI --port arg
|
|
# takes precedence over settings.port (which comes from .env).
|
|
display_port = settings.port
|
|
for i, arg in enumerate(sys.argv):
|
|
if arg in ("--port", "-p") and i + 1 < len(sys.argv):
|
|
try:
|
|
display_port = int(sys.argv[i + 1])
|
|
except ValueError:
|
|
pass
|
|
logger.info(f" API: http://{display_host}:{display_port}")
|
|
logger.info(f" API Docs: http://{display_host}:{display_port}/docs")
|
|
logger.info(f" WebSocket: ws://{display_host}:{display_port}/ws/events")
|
|
logger.info(f" MCP: Available via FastMCP")
|
|
logger.info("=" * 60)
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
logger.info("Shutting down Hold Slayer Gateway...")
|
|
await notification_svc.stop()
|
|
await gateway.stop()
|
|
await close_db()
|
|
logger.info("Gateway shut down cleanly. 👋")
|
|
|
|
|
|
app = FastAPI(
|
|
title="Hold Slayer Gateway",
|
|
description=(
|
|
"🗡️ AI PSTN Gateway — Navigate IVRs, wait on hold, "
|
|
"and connect you when a human answers.\n\n"
|
|
"## Quick Start\n"
|
|
"1. **POST /api/calls/hold-slayer** — Launch the Hold Slayer\n"
|
|
"2. **GET /api/calls/{call_id}** — Check call status\n"
|
|
"3. **WS /ws/events** — Real-time event stream\n"
|
|
"4. **GET /api/call-flows** — Manage stored IVR trees\n"
|
|
),
|
|
version="0.1.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# === API Routes ===
|
|
app.include_router(calls.router, prefix="/api/calls", tags=["Calls"])
|
|
app.include_router(call_flows.router, prefix="/api/call-flows", tags=["Call Flows"])
|
|
app.include_router(devices.router, prefix="/api/devices", tags=["Devices"])
|
|
app.include_router(websocket.router, prefix="/ws", tags=["WebSocket"])
|
|
|
|
|
|
# === Root Endpoint ===
|
|
@app.get("/", tags=["System"])
|
|
async def root():
|
|
"""Gateway root — health check and quick status."""
|
|
gateway = getattr(app.state, "gateway", None)
|
|
if gateway:
|
|
status = await gateway.status()
|
|
return {
|
|
"name": "Hold Slayer Gateway",
|
|
"version": "0.1.0",
|
|
"status": "running",
|
|
"uptime": status["uptime"],
|
|
"active_calls": status["active_calls"],
|
|
"trunk": status["trunk"],
|
|
}
|
|
return {
|
|
"name": "Hold Slayer Gateway",
|
|
"version": "0.1.0",
|
|
"status": "starting",
|
|
}
|
|
|
|
|
|
@app.get("/health", tags=["System"])
|
|
async def health():
|
|
"""Health check endpoint."""
|
|
gateway = getattr(app.state, "gateway", None)
|
|
ready = gateway is not None and await gateway.sip_engine.is_ready()
|
|
trunk_status = await gateway.sip_engine.get_trunk_status() if gateway else {"registered": False}
|
|
return {
|
|
"status": "healthy" if ready else "degraded",
|
|
"gateway": "ready" if gateway else "not initialized",
|
|
"sip_engine": "ready" if ready else "not ready",
|
|
"sip_trunk": {
|
|
"registered": trunk_status.get("registered", False),
|
|
"host": trunk_status.get("host"),
|
|
"mock": trunk_status.get("mock", False),
|
|
"reason": trunk_status.get("reason"),
|
|
},
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
settings = get_settings()
|
|
uvicorn.run(
|
|
"main:app",
|
|
host=settings.host,
|
|
port=settings.port,
|
|
reload=settings.debug,
|
|
log_level=settings.log_level,
|
|
)
|