Complete project scaffolding and core implementation of an AI-powered telephony system that calls companies, navigates IVR menus, waits on hold, and transfers to the user when a human answers. Key components: - FastAPI server with REST API, WebSocket, and MCP (SSE) interfaces - SIP/VoIP call management via PJSUA2 with RTP audio streaming - LLM-powered IVR navigation using OpenAI/Anthropic with tool calling - Hold detection service combining audio analysis and silence detection - Real-time STT (Whisper/Deepgram) and TTS (OpenAI/Piper) pipelines - Call recording with per-channel and mixed audio capture - Event bus (asyncio pub/sub) for real-time client updates - Web dashboard with live call monitoring - SQLite persistence via SQLAlchemy with call history and analytics - Notification support (email, SMS, webhook, desktop) - Docker Compose deployment with Opal VoIP and Opal Media containers - Comprehensive test suite with unit, integration, and E2E tests - Simplified .gitignore and full project documentation in README
558 lines
18 KiB
Python
558 lines
18 KiB
Python
"""
|
|
Tests for the intelligence layer services:
|
|
- LLMClient
|
|
- NotificationService
|
|
- RecordingService
|
|
- CallAnalytics
|
|
- CallFlowLearner
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from config import Settings
|
|
from core.event_bus import EventBus
|
|
from models.events import EventType, GatewayEvent
|
|
|
|
|
|
# ============================================================
|
|
# LLM Client Tests
|
|
# ============================================================
|
|
|
|
|
|
class TestLLMClient:
|
|
"""Test the LLM client with mocked HTTP responses."""
|
|
|
|
def _make_client(self):
|
|
from services.llm_client import LLMClient
|
|
|
|
return LLMClient(
|
|
base_url="http://localhost:11434/v1",
|
|
model="llama3",
|
|
api_key="not-needed",
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_init(self):
|
|
client = self._make_client()
|
|
assert client.model == "llama3"
|
|
assert client._total_requests == 0
|
|
assert client._total_errors == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stats(self):
|
|
client = self._make_client()
|
|
stats = client.stats
|
|
assert stats["total_requests"] == 0
|
|
assert stats["total_errors"] == 0
|
|
assert stats["model"] == "llama3"
|
|
assert stats["avg_latency_ms"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_request_format(self):
|
|
"""Verify the HTTP request is formatted correctly."""
|
|
client = self._make_client()
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"choices": [{"message": {"content": "Hello!"}}],
|
|
"usage": {"total_tokens": 10},
|
|
}
|
|
|
|
with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post:
|
|
mock_post.return_value = mock_response
|
|
result = await client.chat("Say hello", system="Hi")
|
|
assert result == "Hello!"
|
|
assert client._total_requests == 1
|
|
|
|
# Verify the request body
|
|
call_args = mock_post.call_args
|
|
body = call_args[1]["json"]
|
|
assert body["model"] == "llama3"
|
|
assert len(body["messages"]) == 2
|
|
assert body["messages"][0]["role"] == "system"
|
|
assert body["messages"][1]["role"] == "user"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_json_parsing(self):
|
|
"""Verify JSON response parsing works."""
|
|
client = self._make_client()
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"choices": [{"message": {"content": '{"action": "press_1", "confidence": 0.9}'}}],
|
|
"usage": {"total_tokens": 20},
|
|
}
|
|
|
|
with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post:
|
|
mock_post.return_value = mock_response
|
|
result = await client.chat_json("Analyze menu", system="Press 1 for billing")
|
|
assert result is not None
|
|
assert result["action"] == "press_1"
|
|
assert result["confidence"] == 0.9
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_json_markdown_extraction(self):
|
|
"""Verify JSON extraction from markdown code blocks."""
|
|
client = self._make_client()
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"choices": [
|
|
{
|
|
"message": {
|
|
"content": 'Here is the result:\n```json\n{"key": "value"}\n```'
|
|
}
|
|
}
|
|
],
|
|
"usage": {"total_tokens": 15},
|
|
}
|
|
|
|
with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post:
|
|
mock_post.return_value = mock_response
|
|
result = await client.chat_json("Parse this", system="test")
|
|
assert result is not None
|
|
assert result["key"] == "value"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_chat_http_error_returns_empty(self):
|
|
"""Verify HTTP errors return empty string gracefully."""
|
|
client = self._make_client()
|
|
|
|
with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post:
|
|
mock_post.side_effect = Exception("Connection refused")
|
|
result = await client.chat("test", system="test")
|
|
assert result == ""
|
|
assert client._total_errors == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_analyze_ivr_menu(self):
|
|
"""Verify IVR menu analysis formats correctly."""
|
|
client = self._make_client()
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_response.raise_for_status = MagicMock()
|
|
mock_response.json.return_value = {
|
|
"choices": [
|
|
{
|
|
"message": {
|
|
"content": '{"action": "press_2", "digit": "2", "confidence": 0.85, "reason": "Option 2 is billing"}'
|
|
}
|
|
}
|
|
],
|
|
"usage": {"total_tokens": 30},
|
|
}
|
|
|
|
with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post:
|
|
mock_post.return_value = mock_response
|
|
result = await client.analyze_ivr_menu(
|
|
transcript="Press 1 for sales, press 2 for billing",
|
|
intent="dispute a charge",
|
|
previous_selections=["1"],
|
|
)
|
|
assert result is not None
|
|
assert result["digit"] == "2"
|
|
|
|
|
|
# ============================================================
|
|
# Notification Service Tests
|
|
# ============================================================
|
|
|
|
|
|
class TestNotificationService:
|
|
"""Test notification routing and deduplication."""
|
|
|
|
def _make_service(self):
|
|
from services.notification import NotificationService
|
|
|
|
event_bus = EventBus()
|
|
settings = Settings()
|
|
svc = NotificationService(event_bus, settings)
|
|
return svc, event_bus
|
|
|
|
def test_init(self):
|
|
svc, _ = self._make_service()
|
|
assert svc._notified == {}
|
|
|
|
def test_event_to_notification_human_detected(self):
|
|
from services.notification import NotificationPriority
|
|
|
|
svc, _ = self._make_service()
|
|
event = GatewayEvent(
|
|
type=EventType.HUMAN_DETECTED,
|
|
call_id="call_123",
|
|
data={"confidence": 0.95},
|
|
message="Human detected!",
|
|
)
|
|
notification = svc._event_to_notification(event)
|
|
assert notification is not None
|
|
assert notification.priority == NotificationPriority.CRITICAL
|
|
assert "Human" in notification.title
|
|
|
|
def test_event_to_notification_hold_detected(self):
|
|
from services.notification import NotificationPriority
|
|
|
|
svc, _ = self._make_service()
|
|
event = GatewayEvent(
|
|
type=EventType.HOLD_DETECTED,
|
|
call_id="call_123",
|
|
data={},
|
|
message="On hold",
|
|
)
|
|
notification = svc._event_to_notification(event)
|
|
assert notification is not None
|
|
assert notification.priority == NotificationPriority.NORMAL
|
|
|
|
def test_event_to_notification_skip_transcript(self):
|
|
svc, _ = self._make_service()
|
|
event = GatewayEvent(
|
|
type=EventType.TRANSCRIPT_CHUNK,
|
|
call_id="call_123",
|
|
data={"text": "hello"},
|
|
)
|
|
notification = svc._event_to_notification(event)
|
|
assert notification is None # Transcripts don't generate notifications
|
|
|
|
def test_event_to_notification_call_ended_cleanup(self):
|
|
svc, _ = self._make_service()
|
|
# Simulate some tracking data
|
|
svc._notified["call_123"] = {"some_event"}
|
|
|
|
event = GatewayEvent(
|
|
type=EventType.CALL_ENDED,
|
|
call_id="call_123",
|
|
data={},
|
|
)
|
|
notification = svc._event_to_notification(event)
|
|
assert notification is not None
|
|
assert "call_123" not in svc._notified # Cleaned up
|
|
|
|
def test_event_to_notification_call_failed(self):
|
|
from services.notification import NotificationPriority
|
|
|
|
svc, _ = self._make_service()
|
|
event = GatewayEvent(
|
|
type=EventType.CALL_FAILED,
|
|
call_id="call_123",
|
|
data={},
|
|
message="Connection timed out",
|
|
)
|
|
notification = svc._event_to_notification(event)
|
|
assert notification is not None
|
|
assert notification.priority == NotificationPriority.HIGH
|
|
assert "Connection timed out" in notification.message
|
|
|
|
|
|
# ============================================================
|
|
# Recording Service Tests
|
|
# ============================================================
|
|
|
|
|
|
class TestRecordingService:
|
|
"""Test recording lifecycle."""
|
|
|
|
def _make_service(self):
|
|
from services.recording import RecordingService
|
|
|
|
return RecordingService(storage_dir="/tmp/test_recordings")
|
|
|
|
def test_init(self):
|
|
svc = self._make_service()
|
|
assert svc._active_recordings == {}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_recording_path_generation(self):
|
|
"""Verify recording paths are organized by date."""
|
|
svc = self._make_service()
|
|
await svc.start() # Creates storage dir
|
|
|
|
session = await svc.start_recording(call_id="call_abc123")
|
|
assert "call_abc123" in session.filepath_mixed
|
|
# Should include date-based directory
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
assert today in session.filepath_mixed
|
|
|
|
# Clean up
|
|
await svc.stop_recording("call_abc123")
|
|
|
|
|
|
# ============================================================
|
|
# Call Analytics Tests
|
|
# ============================================================
|
|
|
|
|
|
class TestCallAnalytics:
|
|
"""Test analytics tracking."""
|
|
|
|
def _make_service(self):
|
|
from services.call_analytics import CallAnalytics
|
|
|
|
return CallAnalytics(max_history=1000)
|
|
|
|
def test_init(self):
|
|
svc = self._make_service()
|
|
assert svc._call_records == []
|
|
assert svc.total_calls_recorded == 0
|
|
|
|
def test_get_summary_empty(self):
|
|
svc = self._make_service()
|
|
summary = svc.get_summary(hours=24)
|
|
assert summary["total_calls"] == 0
|
|
assert summary["success_rate"] == 0.0
|
|
|
|
def test_get_company_stats_unknown(self):
|
|
svc = self._make_service()
|
|
stats = svc.get_company_stats("+18005551234")
|
|
assert stats["total_calls"] == 0
|
|
|
|
def test_get_top_numbers_empty(self):
|
|
svc = self._make_service()
|
|
top = svc.get_top_numbers(limit=5)
|
|
assert top == []
|
|
|
|
def test_get_hold_time_trend(self):
|
|
svc = self._make_service()
|
|
trend = svc.get_hold_time_trend(days=7)
|
|
assert len(trend) == 7
|
|
assert all(t["call_count"] == 0 for t in trend)
|
|
|
|
|
|
# ============================================================
|
|
# Call Flow Learner Tests
|
|
# ============================================================
|
|
|
|
|
|
class TestCallFlowLearner:
|
|
"""Test call flow learning from exploration data."""
|
|
|
|
def _make_learner(self):
|
|
from services.call_flow_learner import CallFlowLearner
|
|
|
|
return CallFlowLearner(llm_client=None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_flow_from_discoveries(self):
|
|
"""Test building a call flow from exploration discoveries."""
|
|
learner = self._make_learner()
|
|
|
|
discoveries = [
|
|
{
|
|
"audio_type": "ivr_prompt",
|
|
"transcript": "Press 1 for billing, press 2 for sales",
|
|
"action_taken": {"dtmf": "1"},
|
|
},
|
|
{
|
|
"audio_type": "ivr_prompt",
|
|
"transcript": "Press 3 to speak to an agent",
|
|
"action_taken": {"dtmf": "3"},
|
|
},
|
|
{
|
|
"audio_type": "music",
|
|
"transcript": "",
|
|
"action_taken": None,
|
|
},
|
|
{
|
|
"audio_type": "live_human",
|
|
"transcript": "Hi, thanks for calling. How can I help?",
|
|
"action_taken": None,
|
|
},
|
|
]
|
|
|
|
flow = await learner.build_flow(
|
|
phone_number="+18005551234",
|
|
discovered_steps=discoveries,
|
|
intent="cancel my card",
|
|
company_name="Test Bank",
|
|
)
|
|
|
|
assert flow is not None
|
|
assert flow.phone_number == "+18005551234"
|
|
assert "Test Bank" in flow.name
|
|
assert len(flow.steps) == 4 # IVR, IVR, hold, human
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_build_flow_no_discoveries(self):
|
|
"""Test that build_flow returns empty flow when no meaningful data."""
|
|
learner = self._make_learner()
|
|
flow = await learner.build_flow(
|
|
phone_number="+18005551234",
|
|
discovered_steps=[],
|
|
)
|
|
assert flow is not None
|
|
assert len(flow.steps) == 0
|
|
assert "empty" in [t.lower() for t in flow.tags]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_merge_discoveries(self):
|
|
"""Test merging new discoveries into existing flow."""
|
|
learner = self._make_learner()
|
|
|
|
# Build initial flow
|
|
initial_steps = [
|
|
{
|
|
"audio_type": "ivr_prompt",
|
|
"transcript": "Press 1 for billing",
|
|
"action_taken": {"dtmf": "1"},
|
|
},
|
|
{
|
|
"audio_type": "music",
|
|
"transcript": "",
|
|
"action_taken": None,
|
|
},
|
|
]
|
|
flow = await learner.build_flow(
|
|
phone_number="+18005551234",
|
|
discovered_steps=initial_steps,
|
|
intent="billing inquiry",
|
|
)
|
|
original_step_count = len(flow.steps)
|
|
assert original_step_count == 2
|
|
|
|
# Merge new discoveries
|
|
new_steps = [
|
|
{
|
|
"audio_type": "ivr_prompt",
|
|
"transcript": "Press 1 for billing",
|
|
"action_taken": {"dtmf": "1"},
|
|
},
|
|
{
|
|
"audio_type": "music",
|
|
"transcript": "",
|
|
"action_taken": None,
|
|
},
|
|
{
|
|
"audio_type": "live_human",
|
|
"transcript": "Hello, billing department",
|
|
"action_taken": None,
|
|
},
|
|
]
|
|
|
|
merged = await learner.merge_discoveries(
|
|
existing_flow=flow,
|
|
new_steps=new_steps,
|
|
intent="billing inquiry",
|
|
)
|
|
|
|
assert merged is not None
|
|
assert merged.times_used == 2 # Incremented
|
|
assert merged.last_used is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_discovery_to_step_types(self):
|
|
"""Test that different audio types produce correct step actions."""
|
|
from models.call_flow import ActionType
|
|
|
|
learner = self._make_learner()
|
|
|
|
# IVR prompt with DTMF
|
|
step = learner._discovery_to_step(
|
|
{"audio_type": "ivr_prompt", "transcript": "Press 1", "action_taken": {"dtmf": "1"}},
|
|
0, [],
|
|
)
|
|
assert step is not None
|
|
assert step.action == ActionType.DTMF
|
|
assert step.action_value == "1"
|
|
|
|
# Hold music
|
|
step = learner._discovery_to_step(
|
|
{"audio_type": "music", "transcript": "", "action_taken": None},
|
|
1, [],
|
|
)
|
|
assert step is not None
|
|
assert step.action == ActionType.HOLD
|
|
|
|
# Live human
|
|
step = learner._discovery_to_step(
|
|
{"audio_type": "live_human", "transcript": "Hello", "action_taken": None},
|
|
2, [],
|
|
)
|
|
assert step is not None
|
|
assert step.action == ActionType.TRANSFER
|
|
|
|
|
|
# ============================================================
|
|
# EventBus Integration Tests
|
|
# ============================================================
|
|
|
|
|
|
class TestEventBusIntegration:
|
|
"""Test EventBus with real async producers/consumers."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_subscribers(self):
|
|
"""Multiple subscribers each get all events."""
|
|
bus = EventBus()
|
|
sub1 = bus.subscribe()
|
|
sub2 = bus.subscribe()
|
|
|
|
event = GatewayEvent(
|
|
type=EventType.CALL_INITIATED,
|
|
call_id="call_1",
|
|
data={},
|
|
)
|
|
await bus.publish(event)
|
|
|
|
e1 = await asyncio.wait_for(sub1.__anext__(), timeout=1.0)
|
|
e2 = await asyncio.wait_for(sub2.__anext__(), timeout=1.0)
|
|
|
|
assert e1.call_id == "call_1"
|
|
assert e2.call_id == "call_1"
|
|
assert bus.subscriber_count == 2
|
|
|
|
# Unsubscribe using .close() which passes the internal entry tuple
|
|
sub1.close()
|
|
sub2.close()
|
|
assert bus.subscriber_count == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_event_history_limit(self):
|
|
"""Event history respects max size."""
|
|
bus = EventBus(max_history=5)
|
|
|
|
for i in range(10):
|
|
await bus.publish(
|
|
GatewayEvent(
|
|
type=EventType.IVR_STEP,
|
|
call_id=f"call_{i}",
|
|
data={},
|
|
)
|
|
)
|
|
|
|
# recent_events is a property, not a method
|
|
history = bus.recent_events
|
|
assert len(history) == 5
|
|
# Should have the most recent 5
|
|
assert history[-1].call_id == "call_9"
|
|
assert history[0].call_id == "call_5"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_event_type_filtering(self):
|
|
"""Subscribers can filter by event type."""
|
|
bus = EventBus()
|
|
# Only subscribe to hold-related events
|
|
sub = bus.subscribe(event_types={EventType.HOLD_DETECTED, EventType.HUMAN_DETECTED})
|
|
|
|
# Publish multiple event types
|
|
await bus.publish(GatewayEvent(type=EventType.CALL_INITIATED, call_id="c1", data={}))
|
|
await bus.publish(GatewayEvent(type=EventType.HOLD_DETECTED, call_id="c1", data={}))
|
|
await bus.publish(GatewayEvent(type=EventType.IVR_STEP, call_id="c1", data={}))
|
|
await bus.publish(GatewayEvent(type=EventType.HUMAN_DETECTED, call_id="c1", data={}))
|
|
|
|
# Should only receive the 2 matching events
|
|
e1 = await asyncio.wait_for(sub.__anext__(), timeout=1.0)
|
|
e2 = await asyncio.wait_for(sub.__anext__(), timeout=1.0)
|
|
assert e1.type == EventType.HOLD_DETECTED
|
|
assert e2.type == EventType.HUMAN_DETECTED
|
|
|
|
sub.close()
|