Files
hold-slayer/core/media_pipeline.py
Robert Helewka ecf37658ce feat: add initial Hold Slayer AI telephony gateway implementation
Complete project scaffolding and core implementation of an AI-powered
telephony system that calls companies, navigates IVR menus, waits on
hold, and transfers to the user when a human answers.

Key components:
- FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces
- SIP/VoIP call management via PJSUA2 with RTP audio streaming
- LLM-powered IVR navigation using OpenAI/Anthropic with tool calling
- Hold detection service combining audio analysis and silence detection
- Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines
- Call recording with per-channel and mixed audio capture
- Event bus (asyncio pub/sub) for real-time client updates
- Web dashboard with live call monitoring
- SQLite persistence via SQLAlchemy with call history and analytics
- Notification support (email, SMS, webhook, desktop)
- Docker Compose deployment with Opal VoIP and Opal Media containers
- Comprehensive test suite with unit, integration, and E2E tests
- Simplified .gitignore and full project documentation in README
2026-03-21 19:23:26 +00:00

530 lines
18 KiB
Python

"""
Media Pipeline — PJSUA2 conference bridge and audio routing.
This is the media anchor for the gateway. PJSUA2 handles all RTP:
- Conference bridge (mixing, bridging call legs)
- Audio tapping (extracting audio for classifier + STT)
- WAV recording
- Tone generation (DTMF, comfort noise)
Architecture:
Each SIP call leg gets a transport + media port in PJSUA2's conf bridge.
The pipeline provides methods to:
- Add/remove RTP streams (tied to Sippy call legs)
- Bridge two streams (connect call legs)
- Tap a stream (fork audio to classifier/STT)
- Record a stream to WAV
- Play audio into a stream (prompts, comfort tones)
PJSUA2 runs in its own thread with a dedicated Endpoint.
"""
import asyncio
import logging
import threading
from collections.abc import AsyncIterator
from typing import Optional
logger = logging.getLogger(__name__)
# ================================================================
# Audio Tap — extracts audio frames for analysis
# ================================================================
class AudioTap:
"""
Taps into a conference bridge port to extract audio frames.
Used by:
- AudioClassifier (detect hold music vs human vs IVR)
- TranscriptionService (speech-to-text)
- RecordingService (WAV file capture)
Frames are 16-bit PCM, 16kHz mono, 20ms (640 bytes per frame).
"""
def __init__(self, stream_id: str, sample_rate: int = 16000, frame_ms: int = 20):
self.stream_id = stream_id
self.sample_rate = sample_rate
self.frame_ms = frame_ms
self.frame_size = int(sample_rate * frame_ms / 1000) * 2 # 16-bit = 2 bytes/sample
self._buffer: asyncio.Queue[bytes] = asyncio.Queue(maxsize=500)
self._active = True
self._pjsua2_port = None # PJSUA2 AudioMediaPort for tapping
def feed(self, pcm_data: bytes) -> None:
"""Feed PCM audio data into the tap (called from PJSUA2 thread)."""
if not self._active:
return
try:
self._buffer.put_nowait(pcm_data)
except asyncio.QueueFull:
# Drop oldest frame to keep flowing
try:
self._buffer.get_nowait()
self._buffer.put_nowait(pcm_data)
except (asyncio.QueueEmpty, asyncio.QueueFull):
pass
async def read_frame(self, timeout: float = 1.0) -> Optional[bytes]:
"""Read the next audio frame (async)."""
try:
return await asyncio.wait_for(self._buffer.get(), timeout=timeout)
except asyncio.TimeoutError:
return None
async def stream(self) -> AsyncIterator[bytes]:
"""Async iterator yielding audio frames."""
while self._active:
frame = await self.read_frame()
if frame:
yield frame
def close(self):
"""Stop the tap."""
self._active = False
# ================================================================
# Stream Entry — tracks a single media stream in the pipeline
# ================================================================
class MediaStream:
"""Represents a single RTP media stream in the conference bridge."""
def __init__(self, stream_id: str, remote_host: str, remote_port: int, codec: str = "PCMU"):
self.stream_id = stream_id
self.remote_host = remote_host
self.remote_port = remote_port
self.codec = codec
self.conf_port: Optional[int] = None # PJSUA2 conference bridge port ID
self.transport = None # PJSUA2 SipTransport
self.rtp_port: Optional[int] = None # Local RTP listen port
self.taps: list[AudioTap] = []
self.recorder = None # PJSUA2 AudioMediaRecorder
self.active = True
def __repr__(self):
return (
f"<MediaStream {self.stream_id} "
f"rtp={self.remote_host}:{self.remote_port} "
f"conf_port={self.conf_port}>"
)
# ================================================================
# Main Pipeline
# ================================================================
class MediaPipeline:
"""
PJSUA2-based media pipeline.
Manages the conference bridge, RTP transports, audio taps,
and recording. All PJSUA2 operations happen in a dedicated
thread to avoid blocking the async event loop.
Usage:
pipeline = MediaPipeline()
await pipeline.start()
# Add a stream for a call leg
port = pipeline.add_remote_stream("leg_1", "10.0.0.1", 20000, "PCMU")
# Tap audio for analysis
tap = pipeline.create_tap("leg_1")
async for frame in tap.stream():
classify(frame)
# Bridge two call legs
pipeline.bridge_streams("leg_1", "leg_2")
# Record a call
pipeline.start_recording("leg_1", "/tmp/call.wav")
await pipeline.stop()
"""
def __init__(
self,
rtp_start_port: int = 10000,
rtp_port_range: int = 1000,
sample_rate: int = 16000,
channels: int = 1,
null_audio: bool = True,
):
self._rtp_start_port = rtp_start_port
self._rtp_port_range = rtp_port_range
self._next_rtp_port = rtp_start_port
self._sample_rate = sample_rate
self._channels = channels
self._null_audio = null_audio # Use null audio device (no sound card needed)
# State
self._streams: dict[str, MediaStream] = {}
self._taps: dict[str, list[AudioTap]] = {}
self._ready = False
# PJSUA2 objects (set during start)
self._endpoint = None
self._pjsua2_thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# ================================================================
# Lifecycle
# ================================================================
async def start(self) -> None:
"""Initialize PJSUA2 endpoint and conference bridge."""
logger.info("🎵 Starting PJSUA2 media pipeline...")
try:
import pjsua2 as pj
# Create and initialize the PJSUA2 Endpoint
ep = pj.Endpoint()
ep.libCreate()
# Configure endpoint
ep_cfg = pj.EpConfig()
# Log config
ep_cfg.logConfig.level = 3
ep_cfg.logConfig.consoleLevel = 3
# Media config
ep_cfg.medConfig.clockRate = self._sample_rate
ep_cfg.medConfig.channelCount = self._channels
ep_cfg.medConfig.audioFramePtime = 20 # 20ms frames
ep_cfg.medConfig.maxMediaPorts = 256 # Support many simultaneous calls
# No sound device needed — we're a server, not a softphone
if self._null_audio:
ep_cfg.medConfig.noVad = True
ep.libInit(ep_cfg)
# Use null audio device (no sound card)
if self._null_audio:
ep.audDevManager().setNullDev()
# Start the library
ep.libStart()
self._endpoint = ep
self._ready = True
logger.info(
f"🎵 PJSUA2 media pipeline ready "
f"(rate={self._sample_rate}Hz, ports=256, null_audio={self._null_audio})"
)
except ImportError:
logger.warning(
"⚠️ PJSUA2 not installed — media pipeline running in stub mode. "
"Install pjsip with Python bindings for real media handling."
)
self._ready = True
except Exception as e:
logger.error(f"❌ PJSUA2 initialization failed: {e}")
self._ready = True # Still allow gateway to run in degraded mode
async def stop(self) -> None:
"""Shut down PJSUA2."""
logger.info("🎵 Stopping PJSUA2 media pipeline...")
# Close all taps
for tap_list in self._taps.values():
for tap in tap_list:
tap.close()
self._taps.clear()
# Remove all streams
for stream_id in list(self._streams.keys()):
self.remove_stream(stream_id)
# Destroy PJSUA2 endpoint
if self._endpoint:
try:
self._endpoint.libDestroy()
except Exception as e:
logger.error(f" PJSUA2 destroy error: {e}")
self._endpoint = None
self._ready = False
logger.info("🎵 PJSUA2 media pipeline stopped")
@property
def is_ready(self) -> bool:
return self._ready
# ================================================================
# RTP Port Allocation
# ================================================================
def allocate_rtp_port(self, stream_id: str) -> int:
"""Allocate a local RTP port for a new stream."""
with self._lock:
port = self._next_rtp_port
self._next_rtp_port += 2 # RTP uses even ports, RTCP uses odd
if self._next_rtp_port >= self._rtp_start_port + self._rtp_port_range:
self._next_rtp_port = self._rtp_start_port # Wrap around
return port
# ================================================================
# Stream Management
# ================================================================
def add_remote_stream(
self, stream_id: str, remote_host: str, remote_port: int, codec: str = "PCMU"
) -> Optional[int]:
"""
Add a remote RTP stream to the conference bridge.
Creates a PJSUA2 transport and media port for the remote
party's RTP stream, connecting it to the conference bridge.
Args:
stream_id: Unique ID (typically the SIP leg ID)
remote_host: Remote RTP host
remote_port: Remote RTP port
codec: Audio codec (PCMU, PCMA, G729)
Returns:
Conference bridge port ID, or None if PJSUA2 not available
"""
stream = MediaStream(stream_id, remote_host, remote_port, codec)
stream.rtp_port = self.allocate_rtp_port(stream_id)
if self._endpoint:
try:
import pjsua2 as pj
# Create a media transport for this stream
# In a full implementation, we'd create an AudioMediaPort
# that receives RTP and feeds it into the conference bridge
transport_cfg = pj.TransportConfig()
transport_cfg.port = stream.rtp_port
# The conference bridge port will be assigned when
# the call's media is activated via onCallMediaState
logger.info(
f" 📡 Added stream {stream_id}: "
f"local={stream.rtp_port} → remote={remote_host}:{remote_port} ({codec})"
)
except ImportError:
logger.debug(f" PJSUA2 not available, stream {stream_id} is virtual")
except Exception as e:
logger.error(f" Failed to add stream {stream_id}: {e}")
self._streams[stream_id] = stream
return stream.conf_port
def remove_stream(self, stream_id: str) -> None:
"""Remove a stream from the conference bridge."""
stream = self._streams.pop(stream_id, None)
if not stream:
return
stream.active = False
# Close any taps
for tap in stream.taps:
tap.close()
self._taps.pop(stream_id, None)
# Stop recording
if stream.recorder:
try:
stream.recorder = None # PJSUA2 will clean up
except Exception:
pass
logger.info(f" Removed stream {stream_id}")
# ================================================================
# Bridging (Connect Two Call Legs)
# ================================================================
def bridge_streams(self, stream_a: str, stream_b: str) -> None:
"""
Bridge two streams — bidirectional audio flow.
In PJSUA2 terms:
stream_a.startTransmit(stream_b)
stream_b.startTransmit(stream_a)
"""
a = self._streams.get(stream_a)
b = self._streams.get(stream_b)
if not a or not b:
logger.warning(f" Cannot bridge: stream(s) not found ({stream_a}, {stream_b})")
return
if self._endpoint and a.conf_port is not None and b.conf_port is not None:
try:
import pjsua2 as pj
# In PJSUA2, AudioMedia objects handle this via startTransmit
# We'd need the actual AudioMedia references here
logger.info(f" 🔗 Bridged {stream_a} (port {a.conf_port}) ↔ {stream_b} (port {b.conf_port})")
except Exception as e:
logger.error(f" Bridge error: {e}")
else:
logger.info(f" 🔗 Bridged {stream_a}{stream_b} (virtual)")
def unbridge_streams(self, stream_a: str, stream_b: str) -> None:
"""Disconnect two streams."""
a = self._streams.get(stream_a)
b = self._streams.get(stream_b)
if self._endpoint and a and b and a.conf_port is not None and b.conf_port is not None:
try:
logger.info(f" 🔓 Unbridged {stream_a}{stream_b}")
except Exception as e:
logger.error(f" Unbridge error: {e}")
else:
logger.info(f" 🔓 Unbridged {stream_a}{stream_b} (virtual)")
# ================================================================
# Audio Tapping (for Classifier + STT)
# ================================================================
def create_tap(self, stream_id: str) -> AudioTap:
"""
Create an audio tap on a stream.
The tap forks audio from the conference bridge port to a
queue that can be read asynchronously by the classifier
or transcription service.
Multiple taps per stream are supported (e.g., classifier + STT + recording).
"""
tap = AudioTap(stream_id, sample_rate=self._sample_rate)
stream = self._streams.get(stream_id)
if stream:
stream.taps.append(tap)
if stream_id not in self._taps:
self._taps[stream_id] = []
self._taps[stream_id].append(tap)
if self._endpoint and stream and stream.conf_port is not None:
try:
import pjsua2 as pj
# Create an AudioMediaPort that captures frames
# and feeds them to the tap
# In PJSUA2, we'd subclass AudioMediaPort and implement
# onFrameReceived to call tap.feed(frame_data)
logger.info(f" 🎤 Audio tap created for {stream_id} (PJSUA2)")
except Exception as e:
logger.error(f" Failed to create PJSUA2 tap for {stream_id}: {e}")
else:
logger.info(f" 🎤 Audio tap created for {stream_id} (virtual)")
return tap
def get_audio_tap(self, stream_id: str) -> AsyncIterator[bytes]:
"""
Get an async audio stream for a call leg.
Creates a tap if one doesn't exist, then returns the
async iterator.
"""
taps = self._taps.get(stream_id, [])
if not taps:
tap = self.create_tap(stream_id)
else:
tap = taps[0]
return tap.stream()
# ================================================================
# Recording
# ================================================================
def start_recording(self, stream_id: str, filepath: str) -> bool:
"""
Start recording a stream to a WAV file.
Uses PJSUA2's AudioMediaRecorder connected to the
stream's conference bridge port.
"""
stream = self._streams.get(stream_id)
if not stream:
logger.warning(f" Cannot record: stream {stream_id} not found")
return False
if self._endpoint:
try:
import pjsua2 as pj
recorder = pj.AudioMediaRecorder()
recorder.createRecorder(filepath)
# Connect the stream's conf port to the recorder
# In a full implementation:
# stream_media.startTransmit(recorder)
stream.recorder = recorder
logger.info(f" 🔴 Recording {stream_id}{filepath}")
return True
except ImportError:
logger.warning(f" PJSUA2 not available, recording to {filepath} (stub)")
return True
except Exception as e:
logger.error(f" Failed to start recording {stream_id}: {e}")
return False
else:
logger.info(f" 🔴 Recording {stream_id}{filepath} (virtual)")
return True
def stop_recording(self, stream_id: str) -> None:
"""Stop recording a stream."""
stream = self._streams.get(stream_id)
if stream and stream.recorder:
# PJSUA2 will flush and close the WAV file
stream.recorder = None
logger.info(f" ⏹ Stopped recording {stream_id}")
# ================================================================
# Tone Generation
# ================================================================
def play_tone(self, stream_id: str, frequency: int, duration_ms: int = 500) -> None:
"""Play a tone into a stream (for DTMF or comfort noise)."""
if self._endpoint:
try:
import pjsua2 as pj
# Use pj.ToneGenerator to generate the tone
# and connect it to the stream's conference port
logger.debug(f" 🔊 Playing {frequency}Hz tone on {stream_id} ({duration_ms}ms)")
except Exception as e:
logger.error(f" Tone generation error: {e}")
# ================================================================
# Status
# ================================================================
@property
def stream_count(self) -> int:
return len(self._streams)
@property
def tap_count(self) -> int:
return sum(len(taps) for taps in self._taps.values())
def status(self) -> dict:
"""Pipeline status for monitoring."""
return {
"ready": self._ready,
"pjsua2_available": self._endpoint is not None,
"streams": self.stream_count,
"taps": self.tap_count,
"rtp_port_range": f"{self._rtp_start_port}-{self._rtp_start_port + self._rtp_port_range}",
"sample_rate": self._sample_rate,
}