# Core Engine The core engine provides the foundational infrastructure: SIP call control, media handling, call state management, and event distribution. ## SIP Engine (`core/sip_engine.py` + `core/sippy_engine.py`) ### Abstract Interface All SIP operations go through the `SIPEngine` abstract base class, which defines the contract: ```python class SIPEngine(ABC): async def start(self) -> None: ... async def stop(self) -> None: ... async def make_call(self, to_uri: str, from_uri: str = None) -> str: ... async def hangup(self, call_id: str) -> None: ... async def send_dtmf(self, call_id: str, digits: str) -> None: ... async def bridge(self, call_id_a: str, call_id_b: str) -> None: ... async def transfer(self, call_id: str, to_uri: str) -> None: ... async def register(self, ...) -> bool: ... async def get_trunk_status(self) -> TrunkStatus: ... ``` This abstraction allows: - **`SippyEngine`** — Production implementation using Sippy B2BUA - **`MockSIPEngine`** — Test implementation that simulates calls in memory ### Sippy B2BUA Engine The `SippyEngine` wraps Sippy B2BUA for SIP signaling: ```python class SippyEngine(SIPEngine): """ Production SIP engine using Sippy B2BUA. Sippy runs its own event loop in a daemon thread. All async methods bridge to Sippy via run_in_executor(). """ ``` **Key internals:** | Class | Purpose | |-------|---------| | `SipCallLeg` | Tracks one leg of a call (call-id, state, RTP endpoint, SDP) | | `SipBridge` | Two bridged call legs (outbound + device) | | `SippyCallController` | Handles Sippy callbacks (INVITE received, BYE received, DTMF, etc.) | **Call lifecycle:** ``` make_call("sip:+18005551234@trunk") │ ├── Create SipCallLeg (state=TRYING) ├── Sippy: send INVITE ├── Sippy callback: 180 Ringing → state=RINGING ├── Sippy callback: 200 OK → state=CONNECTED │ └── Extract RTP endpoint from SDP │ └── MediaPipeline.add_stream(rtp_host, rtp_port) └── Return call_id send_dtmf(call_id, "1") └── Sippy: send RFC 2833 DTMF or SIP INFO bridge(call_id_a, call_id_b) ├── Create SipBridge(leg_a, leg_b) └── MediaPipeline.bridge_streams(stream_a, stream_b) hangup(call_id) ├── Sippy: send BYE ├── MediaPipeline.remove_stream() └── Cleanup SipCallLeg ``` **Graceful fallback:** If Sippy B2BUA is not installed, the engine falls back to mock mode with a warning — useful for development and testing without a SIP stack. ### Trunk Registration The engine registers with your SIP trunk provider on startup: ```python await engine.register( registrar="sip.yourprovider.com", username="your_username", password="your_password", realm="sip.yourprovider.com", ) ``` Registration is refreshed automatically. `get_trunk_status()` returns the current registration state and health. ## Media Pipeline (`core/media_pipeline.py`) The media pipeline uses PJSUA2 for all RTP audio handling: ### Key Classes | Class | Purpose | |-------|---------| | `AudioTap` | Extracts audio frames from a stream into an async queue (for classifier/STT) | | `MediaStream` | Wraps a single RTP stream (transport port, conference slot, optional tap + recording) | | `MediaPipeline` | Main orchestrator — manages all streams, bridging, recording | ### Operations ```python # Add a new RTP stream (called when SIP call connects) stream_id = await pipeline.add_stream(rtp_host, rtp_port, codec="PCMU") # Tap audio for real-time analysis tap = await pipeline.tap_stream(stream_id) async for frame in tap: classification = classifier.classify(frame) # Bridge two streams (transfer) await pipeline.bridge_streams(stream_a, stream_b) # Record a stream to WAV await pipeline.start_recording(stream_id, "/path/to/recording.wav") await pipeline.stop_recording(stream_id) # Play a tone (e.g., ringback to caller) await pipeline.play_tone(stream_id, frequency=440, duration_ms=2000) # Clean up await pipeline.remove_stream(stream_id) ``` ### Conference Bridge PJSUA2's conference bridge is central to the architecture. Every stream gets a conference slot, and bridging is done by connecting slots: ``` Conference Bridge ├── Slot 0: Outbound call (to company) ├── Slot 1: AudioTap (classifier + STT reads from here) ├── Slot 2: Recording port ├── Slot 3: Device call (your phone, after transfer) └── Slot 4: Tone generator Bridge: Slot 0 ↔ Slot 3 (company ↔ your phone) Tap: Slot 0 → Slot 1 (company audio → classifier) Record: Slot 0 → Slot 2 (company audio → WAV file) ``` ### Null Audio Device The pipeline uses PJSUA2's null audio device — no sound card required. This is essential for headless server deployment. ## Call Manager (`core/call_manager.py`) Tracks all active calls and their state: ```python class CallManager: async def create_call(self, number, mode, intent, ...) -> ActiveCall async def get_call(self, call_id) -> Optional[ActiveCall] async def update_status(self, call_id, status) -> None async def end_call(self, call_id, reason) -> None async def add_transcript(self, call_id, text, speaker) -> None def active_call_count(self) -> int def get_all_active(self) -> list[ActiveCall] ``` **ActiveCall state:** ```python @dataclass class ActiveCall: call_id: str number: str mode: CallMode # direct, hold_slayer, ai_assisted status: CallStatus # trying, ringing, connected, on_hold, transferring, ended intent: Optional[str] device: Optional[str] call_flow_id: Optional[str] # Timing started_at: datetime connected_at: Optional[datetime] hold_started_at: Optional[datetime] ended_at: Optional[datetime] # Audio classification current_audio_type: Optional[AudioClassification] classification_history: list[ClassificationResult] # Transcript transcript_chunks: list[TranscriptChunk] # Services services: dict[str, bool] # recording, transcription, etc. ``` The CallManager publishes events to the EventBus on every state change. ## Event Bus (`core/event_bus.py`) Pure asyncio pub/sub connecting all components: ```python class EventBus: async def publish(self, event: GatewayEvent) -> None def subscribe(self, event_types: set[EventType] = None) -> EventSubscription @property def recent_events(self) -> list[GatewayEvent] @property def subscriber_count(self) -> int ``` ### EventSubscription Subscriptions are async iterators: ```python subscription = event_bus.subscribe(event_types={EventType.HUMAN_DETECTED}) async for event in subscription: print(f"Human detected on call {event.call_id}!") # When done: subscription.close() ``` ### How it works 1. Each `subscribe()` creates an `asyncio.Queue` for that subscriber 2. `publish()` does `put_nowait()` on every subscriber's queue 3. Full queues (dead subscribers) are automatically cleaned up 4. Optional type filtering — only receive events you care about 5. Event history (last 1000) for late joiners ### Event Types See [models/events.py](../models/events.py) for the full list. Key categories: | Category | Events | |----------|--------| | Call Lifecycle | `CALL_STARTED`, `CALL_RINGING`, `CALL_CONNECTED`, `CALL_ENDED`, `CALL_FAILED` | | Hold Slayer | `HOLD_DETECTED`, `HUMAN_DETECTED`, `TRANSFER_STARTED`, `TRANSFER_COMPLETE` | | IVR Navigation | `IVR_STEP`, `IVR_DTMF_SENT`, `IVR_MENU_DETECTED`, `IVR_EXPLORATION` | | Audio | `AUDIO_CLASSIFIED`, `TRANSCRIPT_CHUNK`, `RECORDING_STARTED`, `RECORDING_STOPPED` | | Device | `DEVICE_REGISTERED`, `DEVICE_UNREGISTERED`, `DEVICE_RINGING` | | System | `GATEWAY_STARTED`, `GATEWAY_STOPPED`, `TRUNK_REGISTERED`, `TRUNK_FAILED` | ## Gateway (`core/gateway.py`) The top-level orchestrator that owns and wires all components: ```python class AIPSTNGateway: def __init__(self, settings: Settings): self.event_bus = EventBus() self.call_manager = CallManager(self.event_bus) self.sip_engine = SippyEngine(settings, self.event_bus) self.media_pipeline = MediaPipeline(settings) self.llm_client = LLMClient(...) self.transcription = TranscriptionService(...) self.classifier = AudioClassifier() self.hold_slayer = HoldSlayer(...) self.recording = RecordingService(...) self.analytics = CallAnalytics(...) self.notification = NotificationService(...) self.call_flow_learner = CallFlowLearner(...) async def start(self) -> None: ... # Start all services async def stop(self) -> None: ... # Graceful shutdown async def make_call(self, ...) -> ActiveCall: ... async def end_call(self, call_id) -> None: ... ``` The gateway is created once at application startup (in `main.py` lifespan) and injected into FastAPI routes via dependency injection (`api/deps.py`).