""" Tests for the intelligence layer services: - LLMClient - NotificationService - RecordingService - CallAnalytics - CallFlowLearner """ import asyncio from datetime import datetime from unittest.mock import AsyncMock, MagicMock, patch import pytest from config import Settings from core.event_bus import EventBus from models.events import EventType, GatewayEvent # ============================================================ # LLM Client Tests # ============================================================ class TestLLMClient: """Test the LLM client with mocked HTTP responses.""" def _make_client(self): from services.llm_client import LLMClient return LLMClient( base_url="http://localhost:11434/v1", model="llama3", api_key="not-needed", ) @pytest.mark.asyncio async def test_init(self): client = self._make_client() assert client.model == "llama3" assert client._total_requests == 0 assert client._total_errors == 0 @pytest.mark.asyncio async def test_stats(self): client = self._make_client() stats = client.stats assert stats["total_requests"] == 0 assert stats["total_errors"] == 0 assert stats["model"] == "llama3" assert stats["avg_latency_ms"] == 0 @pytest.mark.asyncio async def test_chat_request_format(self): """Verify the HTTP request is formatted correctly.""" client = self._make_client() mock_response = MagicMock() mock_response.status_code = 200 mock_response.raise_for_status = MagicMock() mock_response.json.return_value = { "choices": [{"message": {"content": "Hello!"}}], "usage": {"total_tokens": 10}, } with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post: mock_post.return_value = mock_response result = await client.chat("Say hello", system="Hi") assert result == "Hello!" assert client._total_requests == 1 # Verify the request body call_args = mock_post.call_args body = call_args[1]["json"] assert body["model"] == "llama3" assert len(body["messages"]) == 2 assert body["messages"][0]["role"] == "system" assert body["messages"][1]["role"] == "user" @pytest.mark.asyncio async def test_chat_json_parsing(self): """Verify JSON response parsing works.""" client = self._make_client() mock_response = MagicMock() mock_response.status_code = 200 mock_response.raise_for_status = MagicMock() mock_response.json.return_value = { "choices": [{"message": {"content": '{"action": "press_1", "confidence": 0.9}'}}], "usage": {"total_tokens": 20}, } with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post: mock_post.return_value = mock_response result = await client.chat_json("Analyze menu", system="Press 1 for billing") assert result is not None assert result["action"] == "press_1" assert result["confidence"] == 0.9 @pytest.mark.asyncio async def test_chat_json_markdown_extraction(self): """Verify JSON extraction from markdown code blocks.""" client = self._make_client() mock_response = MagicMock() mock_response.status_code = 200 mock_response.raise_for_status = MagicMock() mock_response.json.return_value = { "choices": [ { "message": { "content": 'Here is the result:\n```json\n{"key": "value"}\n```' } } ], "usage": {"total_tokens": 15}, } with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post: mock_post.return_value = mock_response result = await client.chat_json("Parse this", system="test") assert result is not None assert result["key"] == "value" @pytest.mark.asyncio async def test_chat_http_error_returns_empty(self): """Verify HTTP errors return empty string gracefully.""" client = self._make_client() with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post: mock_post.side_effect = Exception("Connection refused") result = await client.chat("test", system="test") assert result == "" assert client._total_errors == 1 @pytest.mark.asyncio async def test_analyze_ivr_menu(self): """Verify IVR menu analysis formats correctly.""" client = self._make_client() mock_response = MagicMock() mock_response.status_code = 200 mock_response.raise_for_status = MagicMock() mock_response.json.return_value = { "choices": [ { "message": { "content": '{"action": "press_2", "digit": "2", "confidence": 0.85, "reason": "Option 2 is billing"}' } } ], "usage": {"total_tokens": 30}, } with patch.object(client._client, "post", new_callable=AsyncMock) as mock_post: mock_post.return_value = mock_response result = await client.analyze_ivr_menu( transcript="Press 1 for sales, press 2 for billing", intent="dispute a charge", previous_selections=["1"], ) assert result is not None assert result["digit"] == "2" # ============================================================ # Notification Service Tests # ============================================================ class TestNotificationService: """Test notification routing and deduplication.""" def _make_service(self): from services.notification import NotificationService event_bus = EventBus() settings = Settings() svc = NotificationService(event_bus, settings) return svc, event_bus def test_init(self): svc, _ = self._make_service() assert svc._notified == {} def test_event_to_notification_human_detected(self): from services.notification import NotificationPriority svc, _ = self._make_service() event = GatewayEvent( type=EventType.HUMAN_DETECTED, call_id="call_123", data={"confidence": 0.95}, message="Human detected!", ) notification = svc._event_to_notification(event) assert notification is not None assert notification.priority == NotificationPriority.CRITICAL assert "Human" in notification.title def test_event_to_notification_hold_detected(self): from services.notification import NotificationPriority svc, _ = self._make_service() event = GatewayEvent( type=EventType.HOLD_DETECTED, call_id="call_123", data={}, message="On hold", ) notification = svc._event_to_notification(event) assert notification is not None assert notification.priority == NotificationPriority.NORMAL def test_event_to_notification_skip_transcript(self): svc, _ = self._make_service() event = GatewayEvent( type=EventType.TRANSCRIPT_CHUNK, call_id="call_123", data={"text": "hello"}, ) notification = svc._event_to_notification(event) assert notification is None # Transcripts don't generate notifications def test_event_to_notification_call_ended_cleanup(self): svc, _ = self._make_service() # Simulate some tracking data svc._notified["call_123"] = {"some_event"} event = GatewayEvent( type=EventType.CALL_ENDED, call_id="call_123", data={}, ) notification = svc._event_to_notification(event) assert notification is not None assert "call_123" not in svc._notified # Cleaned up def test_event_to_notification_call_failed(self): from services.notification import NotificationPriority svc, _ = self._make_service() event = GatewayEvent( type=EventType.CALL_FAILED, call_id="call_123", data={}, message="Connection timed out", ) notification = svc._event_to_notification(event) assert notification is not None assert notification.priority == NotificationPriority.HIGH assert "Connection timed out" in notification.message # ============================================================ # Recording Service Tests # ============================================================ class TestRecordingService: """Test recording lifecycle.""" def _make_service(self): from services.recording import RecordingService return RecordingService(storage_dir="/tmp/test_recordings") def test_init(self): svc = self._make_service() assert svc._active_recordings == {} @pytest.mark.asyncio async def test_recording_path_generation(self): """Verify recording paths are organized by date.""" svc = self._make_service() await svc.start() # Creates storage dir session = await svc.start_recording(call_id="call_abc123") assert "call_abc123" in session.filepath_mixed # Should include date-based directory today = datetime.now().strftime("%Y-%m-%d") assert today in session.filepath_mixed # Clean up await svc.stop_recording("call_abc123") # ============================================================ # Call Analytics Tests # ============================================================ class TestCallAnalytics: """Test analytics tracking.""" def _make_service(self): from services.call_analytics import CallAnalytics return CallAnalytics(max_history=1000) def test_init(self): svc = self._make_service() assert svc._call_records == [] assert svc.total_calls_recorded == 0 def test_get_summary_empty(self): svc = self._make_service() summary = svc.get_summary(hours=24) assert summary["total_calls"] == 0 assert summary["success_rate"] == 0.0 def test_get_company_stats_unknown(self): svc = self._make_service() stats = svc.get_company_stats("+18005551234") assert stats["total_calls"] == 0 def test_get_top_numbers_empty(self): svc = self._make_service() top = svc.get_top_numbers(limit=5) assert top == [] def test_get_hold_time_trend(self): svc = self._make_service() trend = svc.get_hold_time_trend(days=7) assert len(trend) == 7 assert all(t["call_count"] == 0 for t in trend) # ============================================================ # Call Flow Learner Tests # ============================================================ class TestCallFlowLearner: """Test call flow learning from exploration data.""" def _make_learner(self): from services.call_flow_learner import CallFlowLearner return CallFlowLearner(llm_client=None) @pytest.mark.asyncio async def test_build_flow_from_discoveries(self): """Test building a call flow from exploration discoveries.""" learner = self._make_learner() discoveries = [ { "audio_type": "ivr_prompt", "transcript": "Press 1 for billing, press 2 for sales", "action_taken": {"dtmf": "1"}, }, { "audio_type": "ivr_prompt", "transcript": "Press 3 to speak to an agent", "action_taken": {"dtmf": "3"}, }, { "audio_type": "music", "transcript": "", "action_taken": None, }, { "audio_type": "live_human", "transcript": "Hi, thanks for calling. How can I help?", "action_taken": None, }, ] flow = await learner.build_flow( phone_number="+18005551234", discovered_steps=discoveries, intent="cancel my card", company_name="Test Bank", ) assert flow is not None assert flow.phone_number == "+18005551234" assert "Test Bank" in flow.name assert len(flow.steps) == 4 # IVR, IVR, hold, human @pytest.mark.asyncio async def test_build_flow_no_discoveries(self): """Test that build_flow returns empty flow when no meaningful data.""" learner = self._make_learner() flow = await learner.build_flow( phone_number="+18005551234", discovered_steps=[], ) assert flow is not None assert len(flow.steps) == 0 assert "empty" in [t.lower() for t in flow.tags] @pytest.mark.asyncio async def test_merge_discoveries(self): """Test merging new discoveries into existing flow.""" learner = self._make_learner() # Build initial flow initial_steps = [ { "audio_type": "ivr_prompt", "transcript": "Press 1 for billing", "action_taken": {"dtmf": "1"}, }, { "audio_type": "music", "transcript": "", "action_taken": None, }, ] flow = await learner.build_flow( phone_number="+18005551234", discovered_steps=initial_steps, intent="billing inquiry", ) original_step_count = len(flow.steps) assert original_step_count == 2 # Merge new discoveries new_steps = [ { "audio_type": "ivr_prompt", "transcript": "Press 1 for billing", "action_taken": {"dtmf": "1"}, }, { "audio_type": "music", "transcript": "", "action_taken": None, }, { "audio_type": "live_human", "transcript": "Hello, billing department", "action_taken": None, }, ] merged = await learner.merge_discoveries( existing_flow=flow, new_steps=new_steps, intent="billing inquiry", ) assert merged is not None assert merged.times_used == 2 # Incremented assert merged.last_used is not None @pytest.mark.asyncio async def test_discovery_to_step_types(self): """Test that different audio types produce correct step actions.""" from models.call_flow import ActionType learner = self._make_learner() # IVR prompt with DTMF step = learner._discovery_to_step( {"audio_type": "ivr_prompt", "transcript": "Press 1", "action_taken": {"dtmf": "1"}}, 0, [], ) assert step is not None assert step.action == ActionType.DTMF assert step.action_value == "1" # Hold music step = learner._discovery_to_step( {"audio_type": "music", "transcript": "", "action_taken": None}, 1, [], ) assert step is not None assert step.action == ActionType.HOLD # Live human step = learner._discovery_to_step( {"audio_type": "live_human", "transcript": "Hello", "action_taken": None}, 2, [], ) assert step is not None assert step.action == ActionType.TRANSFER # ============================================================ # EventBus Integration Tests # ============================================================ class TestEventBusIntegration: """Test EventBus with real async producers/consumers.""" @pytest.mark.asyncio async def test_multiple_subscribers(self): """Multiple subscribers each get all events.""" bus = EventBus() sub1 = bus.subscribe() sub2 = bus.subscribe() event = GatewayEvent( type=EventType.CALL_INITIATED, call_id="call_1", data={}, ) await bus.publish(event) e1 = await asyncio.wait_for(sub1.__anext__(), timeout=1.0) e2 = await asyncio.wait_for(sub2.__anext__(), timeout=1.0) assert e1.call_id == "call_1" assert e2.call_id == "call_1" assert bus.subscriber_count == 2 # Unsubscribe using .close() which passes the internal entry tuple sub1.close() sub2.close() assert bus.subscriber_count == 0 @pytest.mark.asyncio async def test_event_history_limit(self): """Event history respects max size.""" bus = EventBus(max_history=5) for i in range(10): await bus.publish( GatewayEvent( type=EventType.IVR_STEP, call_id=f"call_{i}", data={}, ) ) # recent_events is a property, not a method history = bus.recent_events assert len(history) == 5 # Should have the most recent 5 assert history[-1].call_id == "call_9" assert history[0].call_id == "call_5" @pytest.mark.asyncio async def test_event_type_filtering(self): """Subscribers can filter by event type.""" bus = EventBus() # Only subscribe to hold-related events sub = bus.subscribe(event_types={EventType.HOLD_DETECTED, EventType.HUMAN_DETECTED}) # Publish multiple event types await bus.publish(GatewayEvent(type=EventType.CALL_INITIATED, call_id="c1", data={})) await bus.publish(GatewayEvent(type=EventType.HOLD_DETECTED, call_id="c1", data={})) await bus.publish(GatewayEvent(type=EventType.IVR_STEP, call_id="c1", data={})) await bus.publish(GatewayEvent(type=EventType.HUMAN_DETECTED, call_id="c1", data={})) # Should only receive the 2 matching events e1 = await asyncio.wait_for(sub.__anext__(), timeout=1.0) e2 = await asyncio.wait_for(sub.__anext__(), timeout=1.0) assert e1.type == EventType.HOLD_DETECTED assert e2.type == EventType.HUMAN_DETECTED sub.close()