Files
hold-slayer/docs/core-engine.md
Robert Helewka ecf37658ce feat: add initial Hold Slayer AI telephony gateway implementation
Complete project scaffolding and core implementation of an AI-powered
telephony system that calls companies, navigates IVR menus, waits on
hold, and transfers to the user when a human answers.

Key components:
- FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces
- SIP/VoIP call management via PJSUA2 with RTP audio streaming
- LLM-powered IVR navigation using OpenAI/Anthropic with tool calling
- Hold detection service combining audio analysis and silence detection
- Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines
- Call recording with per-channel and mixed audio capture
- Event bus (asyncio pub/sub) for real-time client updates
- Web dashboard with live call monitoring
- SQLite persistence via SQLAlchemy with call history and analytics
- Notification support (email, SMS, webhook, desktop)
- Docker Compose deployment with Opal VoIP and Opal Media containers
- Comprehensive test suite with unit, integration, and E2E tests
- Simplified .gitignore and full project documentation in README
2026-03-21 19:23:26 +00:00

274 lines
8.8 KiB
Markdown

# Core Engine
The core engine provides the foundational infrastructure: SIP call control, media handling, call state management, and event distribution.
## SIP Engine (`core/sip_engine.py` + `core/sippy_engine.py`)
### Abstract Interface
All SIP operations go through the `SIPEngine` abstract base class, which defines the contract:
```python
class SIPEngine(ABC):
async def start(self) -> None: ...
async def stop(self) -> None: ...
async def make_call(self, to_uri: str, from_uri: str = None) -> str: ...
async def hangup(self, call_id: str) -> None: ...
async def send_dtmf(self, call_id: str, digits: str) -> None: ...
async def bridge(self, call_id_a: str, call_id_b: str) -> None: ...
async def transfer(self, call_id: str, to_uri: str) -> None: ...
async def register(self, ...) -> bool: ...
async def get_trunk_status(self) -> TrunkStatus: ...
```
This abstraction allows:
- **`SippyEngine`** — Production implementation using Sippy B2BUA
- **`MockSIPEngine`** — Test implementation that simulates calls in memory
### Sippy B2BUA Engine
The `SippyEngine` wraps Sippy B2BUA for SIP signaling:
```python
class SippyEngine(SIPEngine):
"""
Production SIP engine using Sippy B2BUA.
Sippy runs its own event loop in a daemon thread.
All async methods bridge to Sippy via run_in_executor().
"""
```
**Key internals:**
| Class | Purpose |
|-------|---------|
| `SipCallLeg` | Tracks one leg of a call (call-id, state, RTP endpoint, SDP) |
| `SipBridge` | Two bridged call legs (outbound + device) |
| `SippyCallController` | Handles Sippy callbacks (INVITE received, BYE received, DTMF, etc.) |
**Call lifecycle:**
```
make_call("sip:+18005551234@trunk")
├── Create SipCallLeg (state=TRYING)
├── Sippy: send INVITE
├── Sippy callback: 180 Ringing → state=RINGING
├── Sippy callback: 200 OK → state=CONNECTED
│ └── Extract RTP endpoint from SDP
│ └── MediaPipeline.add_stream(rtp_host, rtp_port)
└── Return call_id
send_dtmf(call_id, "1")
└── Sippy: send RFC 2833 DTMF or SIP INFO
bridge(call_id_a, call_id_b)
├── Create SipBridge(leg_a, leg_b)
└── MediaPipeline.bridge_streams(stream_a, stream_b)
hangup(call_id)
├── Sippy: send BYE
├── MediaPipeline.remove_stream()
└── Cleanup SipCallLeg
```
**Graceful fallback:** If Sippy B2BUA is not installed, the engine falls back to mock mode with a warning — useful for development and testing without a SIP stack.
### Trunk Registration
The engine registers with your SIP trunk provider on startup:
```python
await engine.register(
registrar="sip.yourprovider.com",
username="your_username",
password="your_password",
realm="sip.yourprovider.com",
)
```
Registration is refreshed automatically. `get_trunk_status()` returns the current registration state and health.
## Media Pipeline (`core/media_pipeline.py`)
The media pipeline uses PJSUA2 for all RTP audio handling:
### Key Classes
| Class | Purpose |
|-------|---------|
| `AudioTap` | Extracts audio frames from a stream into an async queue (for classifier/STT) |
| `MediaStream` | Wraps a single RTP stream (transport port, conference slot, optional tap + recording) |
| `MediaPipeline` | Main orchestrator — manages all streams, bridging, recording |
### Operations
```python
# Add a new RTP stream (called when SIP call connects)
stream_id = await pipeline.add_stream(rtp_host, rtp_port, codec="PCMU")
# Tap audio for real-time analysis
tap = await pipeline.tap_stream(stream_id)
async for frame in tap:
classification = classifier.classify(frame)
# Bridge two streams (transfer)
await pipeline.bridge_streams(stream_a, stream_b)
# Record a stream to WAV
await pipeline.start_recording(stream_id, "/path/to/recording.wav")
await pipeline.stop_recording(stream_id)
# Play a tone (e.g., ringback to caller)
await pipeline.play_tone(stream_id, frequency=440, duration_ms=2000)
# Clean up
await pipeline.remove_stream(stream_id)
```
### Conference Bridge
PJSUA2's conference bridge is central to the architecture. Every stream gets a conference slot, and bridging is done by connecting slots:
```
Conference Bridge
├── Slot 0: Outbound call (to company)
├── Slot 1: AudioTap (classifier + STT reads from here)
├── Slot 2: Recording port
├── Slot 3: Device call (your phone, after transfer)
└── Slot 4: Tone generator
Bridge: Slot 0 ↔ Slot 3 (company ↔ your phone)
Tap: Slot 0 → Slot 1 (company audio → classifier)
Record: Slot 0 → Slot 2 (company audio → WAV file)
```
### Null Audio Device
The pipeline uses PJSUA2's null audio device — no sound card required. This is essential for headless server deployment.
## Call Manager (`core/call_manager.py`)
Tracks all active calls and their state:
```python
class CallManager:
async def create_call(self, number, mode, intent, ...) -> ActiveCall
async def get_call(self, call_id) -> Optional[ActiveCall]
async def update_status(self, call_id, status) -> None
async def end_call(self, call_id, reason) -> None
async def add_transcript(self, call_id, text, speaker) -> None
def active_call_count(self) -> int
def get_all_active(self) -> list[ActiveCall]
```
**ActiveCall state:**
```python
@dataclass
class ActiveCall:
call_id: str
number: str
mode: CallMode # direct, hold_slayer, ai_assisted
status: CallStatus # trying, ringing, connected, on_hold, transferring, ended
intent: Optional[str]
device: Optional[str]
call_flow_id: Optional[str]
# Timing
started_at: datetime
connected_at: Optional[datetime]
hold_started_at: Optional[datetime]
ended_at: Optional[datetime]
# Audio classification
current_audio_type: Optional[AudioClassification]
classification_history: list[ClassificationResult]
# Transcript
transcript_chunks: list[TranscriptChunk]
# Services
services: dict[str, bool] # recording, transcription, etc.
```
The CallManager publishes events to the EventBus on every state change.
## Event Bus (`core/event_bus.py`)
Pure asyncio pub/sub connecting all components:
```python
class EventBus:
async def publish(self, event: GatewayEvent) -> None
def subscribe(self, event_types: set[EventType] = None) -> EventSubscription
@property
def recent_events(self) -> list[GatewayEvent]
@property
def subscriber_count(self) -> int
```
### EventSubscription
Subscriptions are async iterators:
```python
subscription = event_bus.subscribe(event_types={EventType.HUMAN_DETECTED})
async for event in subscription:
print(f"Human detected on call {event.call_id}!")
# When done:
subscription.close()
```
### How it works
1. Each `subscribe()` creates an `asyncio.Queue` for that subscriber
2. `publish()` does `put_nowait()` on every subscriber's queue
3. Full queues (dead subscribers) are automatically cleaned up
4. Optional type filtering — only receive events you care about
5. Event history (last 1000) for late joiners
### Event Types
See [models/events.py](../models/events.py) for the full list. Key categories:
| Category | Events |
|----------|--------|
| Call Lifecycle | `CALL_STARTED`, `CALL_RINGING`, `CALL_CONNECTED`, `CALL_ENDED`, `CALL_FAILED` |
| Hold Slayer | `HOLD_DETECTED`, `HUMAN_DETECTED`, `TRANSFER_STARTED`, `TRANSFER_COMPLETE` |
| IVR Navigation | `IVR_STEP`, `IVR_DTMF_SENT`, `IVR_MENU_DETECTED`, `IVR_EXPLORATION` |
| Audio | `AUDIO_CLASSIFIED`, `TRANSCRIPT_CHUNK`, `RECORDING_STARTED`, `RECORDING_STOPPED` |
| Device | `DEVICE_REGISTERED`, `DEVICE_UNREGISTERED`, `DEVICE_RINGING` |
| System | `GATEWAY_STARTED`, `GATEWAY_STOPPED`, `TRUNK_REGISTERED`, `TRUNK_FAILED` |
## Gateway (`core/gateway.py`)
The top-level orchestrator that owns and wires all components:
```python
class AIPSTNGateway:
def __init__(self, settings: Settings):
self.event_bus = EventBus()
self.call_manager = CallManager(self.event_bus)
self.sip_engine = SippyEngine(settings, self.event_bus)
self.media_pipeline = MediaPipeline(settings)
self.llm_client = LLMClient(...)
self.transcription = TranscriptionService(...)
self.classifier = AudioClassifier()
self.hold_slayer = HoldSlayer(...)
self.recording = RecordingService(...)
self.analytics = CallAnalytics(...)
self.notification = NotificationService(...)
self.call_flow_learner = CallFlowLearner(...)
async def start(self) -> None: ... # Start all services
async def stop(self) -> None: ... # Graceful shutdown
async def make_call(self, ...) -> ActiveCall: ...
async def end_call(self, call_id) -> None: ...
```
The gateway is created once at application startup (in `main.py` lifespan) and injected into FastAPI routes via dependency injection (`api/deps.py`).