Files
hold-slayer/docs/core-engine.md
Robert Helewka ecf37658ce feat: add initial Hold Slayer AI telephony gateway implementation
Complete project scaffolding and core implementation of an AI-powered
telephony system that calls companies, navigates IVR menus, waits on
hold, and transfers to the user when a human answers.

Key components:
- FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces
- SIP/VoIP call management via PJSUA2 with RTP audio streaming
- LLM-powered IVR navigation using OpenAI/Anthropic with tool calling
- Hold detection service combining audio analysis and silence detection
- Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines
- Call recording with per-channel and mixed audio capture
- Event bus (asyncio pub/sub) for real-time client updates
- Web dashboard with live call monitoring
- SQLite persistence via SQLAlchemy with call history and analytics
- Notification support (email, SMS, webhook, desktop)
- Docker Compose deployment with Opal VoIP and Opal Media containers
- Comprehensive test suite with unit, integration, and E2E tests
- Simplified .gitignore and full project documentation in README
2026-03-21 19:23:26 +00:00

8.8 KiB

Core Engine

The core engine provides the foundational infrastructure: SIP call control, media handling, call state management, and event distribution.

SIP Engine (core/sip_engine.py + core/sippy_engine.py)

Abstract Interface

All SIP operations go through the SIPEngine abstract base class, which defines the contract:

class SIPEngine(ABC):
    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    async def make_call(self, to_uri: str, from_uri: str = None) -> str: ...
    async def hangup(self, call_id: str) -> None: ...
    async def send_dtmf(self, call_id: str, digits: str) -> None: ...
    async def bridge(self, call_id_a: str, call_id_b: str) -> None: ...
    async def transfer(self, call_id: str, to_uri: str) -> None: ...
    async def register(self, ...) -> bool: ...
    async def get_trunk_status(self) -> TrunkStatus: ...

This abstraction allows:

  • SippyEngine — Production implementation using Sippy B2BUA
  • MockSIPEngine — Test implementation that simulates calls in memory

Sippy B2BUA Engine

The SippyEngine wraps Sippy B2BUA for SIP signaling:

class SippyEngine(SIPEngine):
    """
    Production SIP engine using Sippy B2BUA.
    
    Sippy runs its own event loop in a daemon thread.
    All async methods bridge to Sippy via run_in_executor().
    """

Key internals:

Class Purpose
SipCallLeg Tracks one leg of a call (call-id, state, RTP endpoint, SDP)
SipBridge Two bridged call legs (outbound + device)
SippyCallController Handles Sippy callbacks (INVITE received, BYE received, DTMF, etc.)

Call lifecycle:

make_call("sip:+18005551234@trunk")
  │
  ├── Create SipCallLeg (state=TRYING)
  ├── Sippy: send INVITE
  ├── Sippy callback: 180 Ringing → state=RINGING
  ├── Sippy callback: 200 OK → state=CONNECTED
  │   └── Extract RTP endpoint from SDP
  │       └── MediaPipeline.add_stream(rtp_host, rtp_port)
  └── Return call_id

send_dtmf(call_id, "1")
  └── Sippy: send RFC 2833 DTMF or SIP INFO

bridge(call_id_a, call_id_b)
  ├── Create SipBridge(leg_a, leg_b)
  └── MediaPipeline.bridge_streams(stream_a, stream_b)

hangup(call_id)
  ├── Sippy: send BYE
  ├── MediaPipeline.remove_stream()
  └── Cleanup SipCallLeg

Graceful fallback: If Sippy B2BUA is not installed, the engine falls back to mock mode with a warning — useful for development and testing without a SIP stack.

Trunk Registration

The engine registers with your SIP trunk provider on startup:

await engine.register(
    registrar="sip.yourprovider.com",
    username="your_username",
    password="your_password",
    realm="sip.yourprovider.com",
)

Registration is refreshed automatically. get_trunk_status() returns the current registration state and health.

Media Pipeline (core/media_pipeline.py)

The media pipeline uses PJSUA2 for all RTP audio handling:

Key Classes

Class Purpose
AudioTap Extracts audio frames from a stream into an async queue (for classifier/STT)
MediaStream Wraps a single RTP stream (transport port, conference slot, optional tap + recording)
MediaPipeline Main orchestrator — manages all streams, bridging, recording

Operations

# Add a new RTP stream (called when SIP call connects)
stream_id = await pipeline.add_stream(rtp_host, rtp_port, codec="PCMU")

# Tap audio for real-time analysis
tap = await pipeline.tap_stream(stream_id)
async for frame in tap:
    classification = classifier.classify(frame)

# Bridge two streams (transfer)
await pipeline.bridge_streams(stream_a, stream_b)

# Record a stream to WAV
await pipeline.start_recording(stream_id, "/path/to/recording.wav")
await pipeline.stop_recording(stream_id)

# Play a tone (e.g., ringback to caller)
await pipeline.play_tone(stream_id, frequency=440, duration_ms=2000)

# Clean up
await pipeline.remove_stream(stream_id)

Conference Bridge

PJSUA2's conference bridge is central to the architecture. Every stream gets a conference slot, and bridging is done by connecting slots:

Conference Bridge
├── Slot 0: Outbound call (to company)
├── Slot 1: AudioTap (classifier + STT reads from here)
├── Slot 2: Recording port
├── Slot 3: Device call (your phone, after transfer)
└── Slot 4: Tone generator

Bridge: Slot 0 ↔ Slot 3  (company ↔ your phone)
Tap:    Slot 0 → Slot 1  (company audio → classifier)
Record: Slot 0 → Slot 2  (company audio → WAV file)

Null Audio Device

The pipeline uses PJSUA2's null audio device — no sound card required. This is essential for headless server deployment.

Call Manager (core/call_manager.py)

Tracks all active calls and their state:

class CallManager:
    async def create_call(self, number, mode, intent, ...) -> ActiveCall
    async def get_call(self, call_id) -> Optional[ActiveCall]
    async def update_status(self, call_id, status) -> None
    async def end_call(self, call_id, reason) -> None
    async def add_transcript(self, call_id, text, speaker) -> None
    def active_call_count(self) -> int
    def get_all_active(self) -> list[ActiveCall]

ActiveCall state:

@dataclass
class ActiveCall:
    call_id: str
    number: str
    mode: CallMode          # direct, hold_slayer, ai_assisted
    status: CallStatus      # trying, ringing, connected, on_hold, transferring, ended
    intent: Optional[str]
    device: Optional[str]
    call_flow_id: Optional[str]
    
    # Timing
    started_at: datetime
    connected_at: Optional[datetime]
    hold_started_at: Optional[datetime]
    ended_at: Optional[datetime]
    
    # Audio classification
    current_audio_type: Optional[AudioClassification]
    classification_history: list[ClassificationResult]
    
    # Transcript
    transcript_chunks: list[TranscriptChunk]
    
    # Services
    services: dict[str, bool]  # recording, transcription, etc.

The CallManager publishes events to the EventBus on every state change.

Event Bus (core/event_bus.py)

Pure asyncio pub/sub connecting all components:

class EventBus:
    async def publish(self, event: GatewayEvent) -> None
    def subscribe(self, event_types: set[EventType] = None) -> EventSubscription
    @property
    def recent_events(self) -> list[GatewayEvent]
    @property
    def subscriber_count(self) -> int

EventSubscription

Subscriptions are async iterators:

subscription = event_bus.subscribe(event_types={EventType.HUMAN_DETECTED})

async for event in subscription:
    print(f"Human detected on call {event.call_id}!")

# When done:
subscription.close()

How it works

  1. Each subscribe() creates an asyncio.Queue for that subscriber
  2. publish() does put_nowait() on every subscriber's queue
  3. Full queues (dead subscribers) are automatically cleaned up
  4. Optional type filtering — only receive events you care about
  5. Event history (last 1000) for late joiners

Event Types

See models/events.py for the full list. Key categories:

Category Events
Call Lifecycle CALL_STARTED, CALL_RINGING, CALL_CONNECTED, CALL_ENDED, CALL_FAILED
Hold Slayer HOLD_DETECTED, HUMAN_DETECTED, TRANSFER_STARTED, TRANSFER_COMPLETE
IVR Navigation IVR_STEP, IVR_DTMF_SENT, IVR_MENU_DETECTED, IVR_EXPLORATION
Audio AUDIO_CLASSIFIED, TRANSCRIPT_CHUNK, RECORDING_STARTED, RECORDING_STOPPED
Device DEVICE_REGISTERED, DEVICE_UNREGISTERED, DEVICE_RINGING
System GATEWAY_STARTED, GATEWAY_STOPPED, TRUNK_REGISTERED, TRUNK_FAILED

Gateway (core/gateway.py)

The top-level orchestrator that owns and wires all components:

class AIPSTNGateway:
    def __init__(self, settings: Settings):
        self.event_bus = EventBus()
        self.call_manager = CallManager(self.event_bus)
        self.sip_engine = SippyEngine(settings, self.event_bus)
        self.media_pipeline = MediaPipeline(settings)
        self.llm_client = LLMClient(...)
        self.transcription = TranscriptionService(...)
        self.classifier = AudioClassifier()
        self.hold_slayer = HoldSlayer(...)
        self.recording = RecordingService(...)
        self.analytics = CallAnalytics(...)
        self.notification = NotificationService(...)
        self.call_flow_learner = CallFlowLearner(...)
    
    async def start(self) -> None: ...   # Start all services
    async def stop(self) -> None: ...    # Graceful shutdown
    async def make_call(self, ...) -> ActiveCall: ...
    async def end_call(self, call_id) -> None: ...

The gateway is created once at application startup (in main.py lifespan) and injected into FastAPI routes via dependency injection (api/deps.py).