Files
hold-slayer/api/call_flows.py
Robert Helewka ecf37658ce feat: add initial Hold Slayer AI telephony gateway implementation
Complete project scaffolding and core implementation of an AI-powered
telephony system that calls companies, navigates IVR menus, waits on
hold, and transfers to the user when a human answers.

Key components:
- FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces
- SIP/VoIP call management via PJSUA2 with RTP audio streaming
- LLM-powered IVR navigation using OpenAI/Anthropic with tool calling
- Hold detection service combining audio analysis and silence detection
- Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines
- Call recording with per-channel and mixed audio capture
- Event bus (asyncio pub/sub) for real-time client updates
- Web dashboard with live call monitoring
- SQLite persistence via SQLAlchemy with call history and analytics
- Notification support (email, SMS, webhook, desktop)
- Docker Compose deployment with Opal VoIP and Opal Media containers
- Comprehensive test suite with unit, integration, and E2E tests
- Simplified .gitignore and full project documentation in README
2026-03-21 19:23:26 +00:00

215 lines
5.9 KiB
Python

"""
Call Flows API — Store and manage IVR navigation trees.
The system gets smarter every time you call somewhere.
"""
import uuid
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from slugify import slugify
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from api.deps import get_gateway
from core.gateway import AIPSTNGateway
from db.database import StoredCallFlow, get_db
from models.call_flow import (
CallFlow,
CallFlowCreate,
CallFlowStep,
CallFlowSummary,
CallFlowUpdate,
)
router = APIRouter()
@router.post("/", response_model=CallFlow)
async def create_call_flow(
flow: CallFlowCreate,
db: AsyncSession = Depends(get_db),
):
"""Store a new call flow for a phone number."""
flow_id = slugify(flow.name)
# Check if ID already exists
existing = await db.execute(
select(StoredCallFlow).where(StoredCallFlow.id == flow_id)
)
if existing.scalar_one_or_none():
raise HTTPException(
status_code=409,
detail=f"Call flow '{flow_id}' already exists. Use PUT to update.",
)
db_flow = StoredCallFlow(
id=flow_id,
name=flow.name,
phone_number=flow.phone_number,
description=flow.description,
steps=[s.model_dump() for s in flow.steps],
tags=flow.tags,
notes=flow.notes,
last_verified=datetime.now(),
)
db.add(db_flow)
await db.flush()
return CallFlow(
id=flow_id,
name=flow.name,
phone_number=flow.phone_number,
description=flow.description,
steps=flow.steps,
tags=flow.tags,
notes=flow.notes,
last_verified=datetime.now(),
)
@router.get("/", response_model=list[CallFlowSummary])
async def list_call_flows(
db: AsyncSession = Depends(get_db),
):
"""List all stored call flows."""
result = await db.execute(select(StoredCallFlow))
rows = result.scalars().all()
return [
CallFlowSummary(
id=row.id,
name=row.name,
phone_number=row.phone_number,
description=row.description or "",
step_count=len(row.steps) if row.steps else 0,
avg_hold_time=row.avg_hold_time,
success_rate=row.success_rate,
last_used=row.last_used,
times_used=row.times_used or 0,
tags=row.tags or [],
)
for row in rows
]
@router.get("/{flow_id}", response_model=CallFlow)
async def get_call_flow(
flow_id: str,
db: AsyncSession = Depends(get_db),
):
"""Get a stored call flow by ID."""
result = await db.execute(
select(StoredCallFlow).where(StoredCallFlow.id == flow_id)
)
row = result.scalar_one_or_none()
if not row:
raise HTTPException(status_code=404, detail=f"Call flow '{flow_id}' not found")
return CallFlow(
id=row.id,
name=row.name,
phone_number=row.phone_number,
description=row.description or "",
steps=[CallFlowStep(**s) for s in row.steps],
tags=row.tags or [],
notes=row.notes,
avg_hold_time=row.avg_hold_time,
success_rate=row.success_rate,
last_used=row.last_used,
times_used=row.times_used or 0,
)
@router.get("/by-number/{phone_number}", response_model=CallFlow)
async def get_flow_for_number(
phone_number: str,
db: AsyncSession = Depends(get_db),
):
"""Look up stored call flow by phone number."""
result = await db.execute(
select(StoredCallFlow).where(StoredCallFlow.phone_number == phone_number)
)
row = result.scalar_one_or_none()
if not row:
raise HTTPException(
status_code=404,
detail=f"No call flow found for {phone_number}",
)
return CallFlow(
id=row.id,
name=row.name,
phone_number=row.phone_number,
description=row.description or "",
steps=[CallFlowStep(**s) for s in row.steps],
tags=row.tags or [],
notes=row.notes,
avg_hold_time=row.avg_hold_time,
success_rate=row.success_rate,
last_used=row.last_used,
times_used=row.times_used or 0,
)
@router.put("/{flow_id}", response_model=CallFlow)
async def update_call_flow(
flow_id: str,
update: CallFlowUpdate,
db: AsyncSession = Depends(get_db),
):
"""Update an existing call flow."""
result = await db.execute(
select(StoredCallFlow).where(StoredCallFlow.id == flow_id)
)
row = result.scalar_one_or_none()
if not row:
raise HTTPException(status_code=404, detail=f"Call flow '{flow_id}' not found")
if update.name is not None:
row.name = update.name
if update.description is not None:
row.description = update.description
if update.steps is not None:
row.steps = [s.model_dump() for s in update.steps]
if update.tags is not None:
row.tags = update.tags
if update.notes is not None:
row.notes = update.notes
if update.last_verified is not None:
row.last_verified = update.last_verified
await db.flush()
return CallFlow(
id=row.id,
name=row.name,
phone_number=row.phone_number,
description=row.description or "",
steps=[CallFlowStep(**s) for s in row.steps],
tags=row.tags or [],
notes=row.notes,
avg_hold_time=row.avg_hold_time,
success_rate=row.success_rate,
last_used=row.last_used,
times_used=row.times_used or 0,
)
@router.delete("/{flow_id}")
async def delete_call_flow(
flow_id: str,
db: AsyncSession = Depends(get_db),
):
"""Delete a stored call flow."""
result = await db.execute(
select(StoredCallFlow).where(StoredCallFlow.id == flow_id)
)
row = result.scalar_one_or_none()
if not row:
raise HTTPException(status_code=404, detail=f"Call flow '{flow_id}' not found")
await db.delete(row)
return {"status": "deleted", "flow_id": flow_id}