Files
hold-slayer/main.py
Robert Helewka ecf37658ce feat: add initial Hold Slayer AI telephony gateway implementation
Complete project scaffolding and core implementation of an AI-powered
telephony system that calls companies, navigates IVR menus, waits on
hold, and transfers to the user when a human answers.

Key components:
- FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces
- SIP/VoIP call management via PJSUA2 with RTP audio streaming
- LLM-powered IVR navigation using OpenAI/Anthropic with tool calling
- Hold detection service combining audio analysis and silence detection
- Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines
- Call recording with per-channel and mixed audio capture
- Event bus (asyncio pub/sub) for real-time client updates
- Web dashboard with live call monitoring
- SQLite persistence via SQLAlchemy with call history and analytics
- Notification support (email, SMS, webhook, desktop)
- Docker Compose deployment with Opal VoIP and Opal Media containers
- Comprehensive test suite with unit, integration, and E2E tests
- Simplified .gitignore and full project documentation in README
2026-03-21 19:23:26 +00:00

231 lines
7.5 KiB
Python

"""
Hold Slayer Gateway — FastAPI Application Entry Point.
Your personal AI-powered telephony platform.
Navigates IVRs, waits on hold, and connects you when a human answers.
Usage:
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
# Or directly:
python main.py
"""
import logging
import sys
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from api import calls, call_flows, devices, websocket
from config import get_settings
from core.gateway import AIPSTNGateway
from db.database import close_db, init_db
from mcp_server.server import create_mcp_server
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-7s | %(name)s | %(message)s",
datefmt="%H:%M:%S",
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
def _handle_db_error(exc: Exception) -> None:
"""Log a clear, human-readable database error and exit cleanly."""
# Walk the exception chain to find the root asyncpg/psycopg cause
cause = getattr(exc, "__cause__", None) or getattr(exc, "__context__", None)
root = cause or exc
root_type = type(root).__name__
root_msg = str(root)
if "InvalidPasswordError" in root_type or "password authentication failed" in root_msg:
logger.critical(
"\n"
"❌ Database authentication failed — wrong password.\n"
" The password in DATABASE_URL does not match the PostgreSQL user.\n"
" Fix DATABASE_URL in your .env file and restart.\n"
" Default: DATABASE_URL=postgresql+asyncpg://holdslayer:changeme@localhost:5432/holdslayer"
)
elif "InvalidCatalogNameError" in root_type or "does not exist" in root_msg:
logger.critical(
"\n"
"❌ Database does not exist.\n"
" Create it first: createdb holdslayer\n"
" Or update DATABASE_URL in your .env file."
)
elif (
"Connection refused" in root_msg
or "could not connect" in root_msg.lower()
):
logger.critical(
"\n"
"\u274c Cannot reach PostgreSQL \u2014 connection refused.\n"
" Is PostgreSQL running? Check DATABASE_URL in your .env file."
)
elif (
"nodename nor servname" in root_msg
or "Name or service not known" in root_msg
):
logger.critical(
"\n"
f"❌ Cannot resolve the database hostname.\n"
f" Check the host in DATABASE_URL in your .env file. (detail: {root_msg})"
)
else:
logger.critical(
f"\n❌ Database initialisation failed: {root_msg}\n"
f" Check DATABASE_URL in your .env file."
)
sys.exit(1)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup: Initialize database, SIP engine, and services."""
settings = get_settings()
# Initialize database
logger.info("Initializing database...")
try:
await init_db()
except Exception as e:
_handle_db_error(e)
# Boot the telephony engine
gateway = AIPSTNGateway.from_config()
await gateway.start()
app.state.gateway = gateway
# Start auxiliary services
from services.notification import NotificationService
from services.recording import RecordingService
from services.call_analytics import CallAnalytics
from services.call_flow_learner import CallFlowLearner
notification_svc = NotificationService(gateway.event_bus, settings)
await notification_svc.start()
app.state.notification_service = notification_svc
recording_svc = RecordingService()
await recording_svc.start()
app.state.recording_service = recording_svc
analytics_svc = CallAnalytics()
app.state.analytics_service = analytics_svc
flow_learner = CallFlowLearner()
app.state.flow_learner = flow_learner
# Create and mount MCP server
mcp = create_mcp_server(gateway)
app.state.mcp = mcp
logger.info("=" * 60)
logger.info("🔥 Hold Slayer Gateway is LIVE")
# Show a usable URL — 0.0.0.0 is the bind address, not a browser URL
display_host = "localhost" if settings.host in ("0.0.0.0", "::") else settings.host
# When launched via `uvicorn main:app --port XXXX`, the CLI --port arg
# takes precedence over settings.port (which comes from .env).
display_port = settings.port
for i, arg in enumerate(sys.argv):
if arg in ("--port", "-p") and i + 1 < len(sys.argv):
try:
display_port = int(sys.argv[i + 1])
except ValueError:
pass
logger.info(f" API: http://{display_host}:{display_port}")
logger.info(f" API Docs: http://{display_host}:{display_port}/docs")
logger.info(f" WebSocket: ws://{display_host}:{display_port}/ws/events")
logger.info(f" MCP: Available via FastMCP")
logger.info("=" * 60)
yield
# Shutdown
logger.info("Shutting down Hold Slayer Gateway...")
await notification_svc.stop()
await gateway.stop()
await close_db()
logger.info("Gateway shut down cleanly. 👋")
app = FastAPI(
title="Hold Slayer Gateway",
description=(
"🗡️ AI PSTN Gateway — Navigate IVRs, wait on hold, "
"and connect you when a human answers.\n\n"
"## Quick Start\n"
"1. **POST /api/calls/hold-slayer** — Launch the Hold Slayer\n"
"2. **GET /api/calls/{call_id}** — Check call status\n"
"3. **WS /ws/events** — Real-time event stream\n"
"4. **GET /api/call-flows** — Manage stored IVR trees\n"
),
version="0.1.0",
lifespan=lifespan,
)
# === API Routes ===
app.include_router(calls.router, prefix="/api/calls", tags=["Calls"])
app.include_router(call_flows.router, prefix="/api/call-flows", tags=["Call Flows"])
app.include_router(devices.router, prefix="/api/devices", tags=["Devices"])
app.include_router(websocket.router, prefix="/ws", tags=["WebSocket"])
# === Root Endpoint ===
@app.get("/", tags=["System"])
async def root():
"""Gateway root — health check and quick status."""
gateway = getattr(app.state, "gateway", None)
if gateway:
status = await gateway.status()
return {
"name": "Hold Slayer Gateway",
"version": "0.1.0",
"status": "running",
"uptime": status["uptime"],
"active_calls": status["active_calls"],
"trunk": status["trunk"],
}
return {
"name": "Hold Slayer Gateway",
"version": "0.1.0",
"status": "starting",
}
@app.get("/health", tags=["System"])
async def health():
"""Health check endpoint."""
gateway = getattr(app.state, "gateway", None)
ready = gateway is not None and await gateway.sip_engine.is_ready()
trunk_status = await gateway.sip_engine.get_trunk_status() if gateway else {"registered": False}
return {
"status": "healthy" if ready else "degraded",
"gateway": "ready" if gateway else "not initialized",
"sip_engine": "ready" if ready else "not ready",
"sip_trunk": {
"registered": trunk_status.get("registered", False),
"host": trunk_status.get("host"),
"mock": trunk_status.get("mock", False),
"reason": trunk_status.get("reason"),
},
}
if __name__ == "__main__":
import uvicorn
settings = get_settings()
uvicorn.run(
"main:app",
host=settings.host,
port=settings.port,
reload=settings.debug,
log_level=settings.log_level,
)