""" Call Analytics Service — Tracks call metrics and generates insights. Monitors call patterns, hold times, success rates, and IVR navigation efficiency. Provides data for the dashboard and API. """ import logging from collections import defaultdict from datetime import datetime, timedelta from typing import Any, Optional from models.call import ActiveCall, AudioClassification, CallMode, CallStatus logger = logging.getLogger(__name__) class CallAnalytics: """ In-memory call analytics engine. Tracks: - Call success/failure rates - Hold time statistics (avg, min, max, p95) - IVR navigation efficiency - Human detection accuracy - Per-number/company patterns - Time-of-day patterns In production, this would be backed by TimescaleDB or similar. For now, we keep rolling windows in memory. """ def __init__(self, max_history: int = 10000): self._max_history = max_history self._call_records: list[CallRecord] = [] self._company_stats: dict[str, CompanyStats] = defaultdict(CompanyStats) # ================================================================ # Record Calls # ================================================================ def record_call(self, call: ActiveCall) -> None: """ Record a completed call for analytics. Called when a call ends (from CallManager). """ record = CallRecord( call_id=call.id, remote_number=call.remote_number, mode=call.mode, status=call.status, intent=call.intent, started_at=call.created_at, duration_seconds=call.duration, hold_time_seconds=call.hold_time, classification_history=[ r.audio_type.value for r in call.classification_history ], transcript_chunks=list(call.transcript_chunks), services=list(call.services), ) self._call_records.append(record) # Trim history if len(self._call_records) > self._max_history: self._call_records = self._call_records[-self._max_history :] # Update company stats company_key = self._normalize_number(call.remote_number) self._company_stats[company_key].update(record) logger.debug( f"📊 Recorded call {call.id}: " f"{call.status.value}, {call.duration}s, hold={call.hold_time}s" ) # ================================================================ # Aggregate Stats # ================================================================ def get_summary(self, hours: int = 24) -> dict[str, Any]: """Get summary statistics for the last N hours.""" cutoff = datetime.now() - timedelta(hours=hours) recent = [r for r in self._call_records if r.started_at >= cutoff] if not recent: return { "period_hours": hours, "total_calls": 0, "success_rate": 0.0, "avg_hold_time": 0.0, "avg_duration": 0.0, } total = len(recent) successful = sum(1 for r in recent if r.status in ( CallStatus.COMPLETED, CallStatus.BRIDGED, CallStatus.HUMAN_DETECTED )) failed = sum(1 for r in recent if r.status == CallStatus.FAILED) hold_times = [r.hold_time_seconds for r in recent if r.hold_time_seconds > 0] durations = [r.duration_seconds for r in recent if r.duration_seconds > 0] hold_slayer_calls = [r for r in recent if r.mode == CallMode.HOLD_SLAYER] hold_slayer_success = sum( 1 for r in hold_slayer_calls if r.status in (CallStatus.BRIDGED, CallStatus.HUMAN_DETECTED) ) return { "period_hours": hours, "total_calls": total, "successful": successful, "failed": failed, "success_rate": round(successful / total, 3) if total else 0.0, "avg_duration": round(sum(durations) / len(durations), 1) if durations else 0.0, "max_duration": max(durations) if durations else 0, "hold_time": { "avg": round(sum(hold_times) / len(hold_times), 1) if hold_times else 0.0, "min": min(hold_times) if hold_times else 0, "max": max(hold_times) if hold_times else 0, "p95": self._percentile(hold_times, 95) if hold_times else 0, "total": sum(hold_times), }, "hold_slayer": { "total": len(hold_slayer_calls), "success": hold_slayer_success, "success_rate": round( hold_slayer_success / len(hold_slayer_calls), 3 ) if hold_slayer_calls else 0.0, }, "by_mode": self._group_by_mode(recent), "by_hour": self._group_by_hour(recent), } def get_company_stats(self, number: str) -> dict[str, Any]: """Get stats for a specific company/number.""" key = self._normalize_number(number) stats = self._company_stats.get(key) if not stats: return {"number": number, "total_calls": 0} return stats.to_dict(number) def get_top_numbers(self, limit: int = 10) -> list[dict[str, Any]]: """Get the most-called numbers with their stats.""" sorted_stats = sorted( self._company_stats.items(), key=lambda x: x[1].total_calls, reverse=True, )[:limit] return [stats.to_dict(number) for number, stats in sorted_stats] # ================================================================ # Hold Time Trends # ================================================================ def get_hold_time_trend( self, number: Optional[str] = None, days: int = 7, ) -> list[dict]: """ Get hold time trend data for graphing. Returns daily average hold times for the last N days. """ cutoff = datetime.now() - timedelta(days=days) records = [r for r in self._call_records if r.started_at >= cutoff] if number: key = self._normalize_number(number) records = [r for r in records if self._normalize_number(r.remote_number) == key] # Group by day by_day: dict[str, list[int]] = defaultdict(list) for r in records: day = r.started_at.strftime("%Y-%m-%d") if r.hold_time_seconds > 0: by_day[day].append(r.hold_time_seconds) trend = [] for i in range(days): date = (datetime.now() - timedelta(days=days - 1 - i)).strftime("%Y-%m-%d") times = by_day.get(date, []) trend.append({ "date": date, "avg_hold_time": round(sum(times) / len(times), 1) if times else 0, "call_count": len(times), "max_hold_time": max(times) if times else 0, }) return trend # ================================================================ # Helpers # ================================================================ @staticmethod def _normalize_number(number: str) -> str: """Normalize phone number for grouping.""" # Strip formatting, keep last 10 digits digits = "".join(c for c in number if c.isdigit()) return digits[-10:] if len(digits) >= 10 else digits @staticmethod def _percentile(values: list, pct: int) -> float: """Calculate percentile value.""" if not values: return 0.0 sorted_vals = sorted(values) idx = int(len(sorted_vals) * pct / 100) idx = min(idx, len(sorted_vals) - 1) return float(sorted_vals[idx]) @staticmethod def _group_by_mode(records: list["CallRecord"]) -> dict[str, int]: """Group call counts by mode.""" by_mode: dict[str, int] = defaultdict(int) for r in records: by_mode[r.mode.value] += 1 return dict(by_mode) @staticmethod def _group_by_hour(records: list["CallRecord"]) -> dict[int, int]: """Group call counts by hour of day.""" by_hour: dict[int, int] = defaultdict(int) for r in records: by_hour[r.started_at.hour] += 1 return dict(sorted(by_hour.items())) @property def total_calls_recorded(self) -> int: return len(self._call_records) # ================================================================ # Data Models # ================================================================ class CallRecord: """A completed call record for analytics.""" def __init__( self, call_id: str, remote_number: str, mode: CallMode, status: CallStatus, intent: Optional[str] = None, started_at: Optional[datetime] = None, duration_seconds: int = 0, hold_time_seconds: int = 0, classification_history: Optional[list[str]] = None, transcript_chunks: Optional[list[str]] = None, services: Optional[list[str]] = None, ): self.call_id = call_id self.remote_number = remote_number self.mode = mode self.status = status self.intent = intent self.started_at = started_at or datetime.now() self.duration_seconds = duration_seconds self.hold_time_seconds = hold_time_seconds self.classification_history = classification_history or [] self.transcript_chunks = transcript_chunks or [] self.services = services or [] class CompanyStats: """Aggregated stats for a specific company/phone number.""" def __init__(self): self.total_calls = 0 self.successful_calls = 0 self.failed_calls = 0 self.total_hold_time = 0 self.hold_times: list[int] = [] self.total_duration = 0 self.last_called: Optional[datetime] = None self.intents: dict[str, int] = defaultdict(int) def update(self, record: CallRecord) -> None: """Update stats with a new call record.""" self.total_calls += 1 self.total_duration += record.duration_seconds self.last_called = record.started_at if record.status in (CallStatus.COMPLETED, CallStatus.BRIDGED, CallStatus.HUMAN_DETECTED): self.successful_calls += 1 elif record.status == CallStatus.FAILED: self.failed_calls += 1 if record.hold_time_seconds > 0: self.total_hold_time += record.hold_time_seconds self.hold_times.append(record.hold_time_seconds) if record.intent: self.intents[record.intent] += 1 def to_dict(self, number: str) -> dict[str, Any]: return { "number": number, "total_calls": self.total_calls, "successful_calls": self.successful_calls, "failed_calls": self.failed_calls, "success_rate": round( self.successful_calls / self.total_calls, 3 ) if self.total_calls else 0.0, "avg_hold_time": round( self.total_hold_time / len(self.hold_times), 1 ) if self.hold_times else 0.0, "max_hold_time": max(self.hold_times) if self.hold_times else 0, "avg_duration": round( self.total_duration / self.total_calls, 1 ) if self.total_calls else 0.0, "last_called": self.last_called.isoformat() if self.last_called else None, "top_intents": dict( sorted(self.intents.items(), key=lambda x: x[1], reverse=True)[:5] ), }