""" Database connection and session management. PostgreSQL via asyncpg + SQLAlchemy async. """ from datetime import datetime from sqlalchemy import ( JSON, Column, DateTime, Float, Integer, String, Text, func, ) from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase from config import get_settings class Base(DeclarativeBase): """SQLAlchemy declarative base for all ORM models.""" pass # ============================================================ # ORM Models # ============================================================ class CallRecord(Base): __tablename__ = "call_records" id = Column(String, primary_key=True) direction = Column(String, nullable=False) # inbound / outbound remote_number = Column(String, index=True, nullable=False) status = Column(String, nullable=False) # completed / missed / failed / active / on_hold mode = Column(String, nullable=False) # direct / hold_slayer / ai_assisted intent = Column(Text) # What the user wanted (for hold_slayer) started_at = Column(DateTime, default=func.now()) ended_at = Column(DateTime, nullable=True) duration = Column(Integer, default=0) # seconds hold_time = Column(Integer, default=0) # seconds spent on hold device_used = Column(String) recording_path = Column(String, nullable=True) transcript = Column(Text, nullable=True) summary = Column(Text, nullable=True) action_items = Column(JSON, nullable=True) sentiment = Column(String, nullable=True) call_flow_id = Column(String, nullable=True) # which flow was used classification_timeline = Column(JSON, nullable=True) # [{time, type, confidence}, ...] metadata_ = Column("metadata", JSON, nullable=True) def __repr__(self) -> str: return f"" class StoredCallFlow(Base): __tablename__ = "call_flows" id = Column(String, primary_key=True) name = Column(String, nullable=False) phone_number = Column(String, index=True, nullable=False) description = Column(Text) steps = Column(JSON, nullable=False) # Serialized list[CallFlowStep] last_verified = Column(DateTime, nullable=True) avg_hold_time = Column(Integer, nullable=True) success_rate = Column(Float, nullable=True) times_used = Column(Integer, default=0) last_used = Column(DateTime, nullable=True) notes = Column(Text, nullable=True) tags = Column(JSON, default=list) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) def __repr__(self) -> str: return f"" class Contact(Base): __tablename__ = "contacts" id = Column(String, primary_key=True) name = Column(String, nullable=False) phone_numbers = Column(JSON, nullable=False) # [{number, label, primary}, ...] category = Column(String) # personal / business / service routing_preference = Column(String, nullable=True) # how to handle their calls notes = Column(Text, nullable=True) call_count = Column(Integer, default=0) last_call = Column(DateTime, nullable=True) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) def __repr__(self) -> str: return f"" class Device(Base): __tablename__ = "devices" id = Column(String, primary_key=True) name = Column(String, nullable=False) # "Office SIP Phone" type = Column(String, nullable=False) # sip_phone / cell / tablet / softphone sip_uri = Column(String, nullable=True) # sip:robert@gateway.helu.ca phone_number = Column(String, nullable=True) # For PSTN devices priority = Column(Integer, default=10) # Routing priority (lower = higher priority) is_online = Column(String, default="false") capabilities = Column(JSON, default=list) # ["voice", "video", "sms"] last_seen = Column(DateTime, nullable=True) created_at = Column(DateTime, default=func.now()) updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) def __repr__(self) -> str: return f"" # ============================================================ # Engine & Session # ============================================================ _engine = None _session_factory = None def get_engine(): """Get or create the async engine.""" global _engine if _engine is None: settings = get_settings() _engine = create_async_engine( settings.database_url, echo=settings.debug, pool_size=10, max_overflow=20, ) return _engine def get_session_factory() -> async_sessionmaker[AsyncSession]: """Get or create the session factory.""" global _session_factory if _session_factory is None: _session_factory = async_sessionmaker( get_engine(), class_=AsyncSession, expire_on_commit=False, ) return _session_factory async def get_db() -> AsyncSession: """Dependency: yield an async database session.""" factory = get_session_factory() async with factory() as session: try: yield session await session.commit() except Exception: await session.rollback() raise async def init_db(): """Create all tables. For development; use Alembic migrations in production.""" engine = get_engine() async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def close_db(): """Close the database engine.""" global _engine, _session_factory if _engine is not None: await _engine.dispose() _engine = None _session_factory = None