Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
18 lines
530 B
Python
18 lines
530 B
Python
"""
|
|
API Dependencies — Shared dependency injection for all routes.
|
|
"""
|
|
|
|
from fastapi import Depends, HTTPException, Request
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.gateway import AIPSTNGateway
|
|
from db.database import get_db
|
|
|
|
|
|
def get_gateway(request: Request) -> AIPSTNGateway:
|
|
"""Get the gateway instance from app state."""
|
|
gateway = getattr(request.app.state, "gateway", None)
|
|
if gateway is None:
|
|
raise HTTPException(status_code=503, detail="Gateway not initialized")
|
|
return gateway
|