Add a Prometheus custom collector that probes the four system-default models (chat, vision, embedding, reranker) at /metrics scrape time and emits up/down, configured, and probe-latency gauges. This complements the ingest-pipeline counters in the Celery worker, which only move during active ingests and cannot signal model outages on an idle queue. - New `library/health_collector.py` registers a custom collector with a 55s in-process cache to avoid hammering GPU endpoints on rapid scrapes or across multiple gunicorn workers. - New `library/services/model_health.py` centralises the probe logic, resolving system-default models via SystemSettings and dispatching to chat/embedding/rerank endpoints with a short timeout. - Register the collector only in the web process (gunicorn/runserver) via `LibraryConfig.ready`, excluding Celery, pytest, and management commands to prevent duplicate registration and stray probes. - Add unit tests covering the collector cache, metric shape, and per-role probe dispatch.
100 lines
3.6 KiB
Python
100 lines
3.6 KiB
Python
"""
|
|
Scrape-time Prometheus collector for system-default model reachability.
|
|
|
|
The ingest-pipeline counters in ``library/metrics.py`` live in the Celery
|
|
worker process and only move during an active ingest, so they cannot signal
|
|
"models down" on an idle queue. This collector runs in the **web** process
|
|
(where ``/metrics`` is served by ``django_prometheus``) and probes the four
|
|
system-default models at scrape time, emitting an up/down gauge that is
|
|
present regardless of queue activity.
|
|
|
|
Probe results are cached for a short TTL so rapid scrapes — or multiple
|
|
gunicorn workers each scraped in turn — cannot hammer the GPU endpoints.
|
|
"""
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
|
|
from prometheus_client.core import GaugeMetricFamily
|
|
|
|
from library.services.model_health import probe_system_models
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cache probe results so repeated scrapes don't re-probe the router. The
|
|
# value is comfortably above a 15s scrape_interval but bounded so a recovered
|
|
# model shows green within a minute.
|
|
_CACHE_TTL_SECONDS = 55
|
|
|
|
_lock = threading.Lock()
|
|
_cache: dict = {"ts": 0.0, "results": None}
|
|
|
|
|
|
def _cached_probe() -> list[dict]:
|
|
"""Return probe results, re-probing only when the cache has expired."""
|
|
now = time.monotonic()
|
|
with _lock:
|
|
if _cache["results"] is not None and (now - _cache["ts"]) < _CACHE_TTL_SECONDS:
|
|
return _cache["results"]
|
|
try:
|
|
results = probe_system_models()
|
|
except Exception as exc: # never let a probe failure break /metrics
|
|
logger.warning("Model health probe failed during scrape: %s", exc)
|
|
# Serve the stale cache if we have one; otherwise report nothing.
|
|
return _cache["results"] or []
|
|
_cache["ts"] = now
|
|
_cache["results"] = results
|
|
return results
|
|
|
|
|
|
class SystemModelHealthCollector:
|
|
"""prometheus_client custom collector for system-default model health."""
|
|
|
|
def collect(self):
|
|
results = _cached_probe()
|
|
|
|
up = GaugeMetricFamily(
|
|
"mnemosyne_system_default_model_up",
|
|
"System-default model endpoint reachable (1) or not (0)",
|
|
labels=["role", "model", "api"],
|
|
)
|
|
configured = GaugeMetricFamily(
|
|
"mnemosyne_system_default_model_configured",
|
|
"A system-default model is configured for this role (1) or not (0)",
|
|
labels=["role"],
|
|
)
|
|
latency = GaugeMetricFamily(
|
|
"mnemosyne_system_default_model_probe_latency_seconds",
|
|
"Latency of the last reachability probe for this role",
|
|
labels=["role"],
|
|
)
|
|
|
|
for r in results:
|
|
role = r["role"]
|
|
configured.add_metric([role], 1 if r["configured"] else 0)
|
|
if not r["configured"]:
|
|
continue
|
|
up.add_metric(
|
|
[role, r["model_name"] or "", r["api_name"] or ""],
|
|
1 if r["ok"] else 0,
|
|
)
|
|
if r["latency_ms"] is not None:
|
|
latency.add_metric([role], r["latency_ms"] / 1000.0)
|
|
|
|
yield configured
|
|
yield up
|
|
yield latency
|
|
|
|
|
|
def register():
|
|
"""Register the collector against the default registry (idempotent)."""
|
|
from prometheus_client import REGISTRY
|
|
|
|
# Guard against duplicate registration (autoreload, repeated ready()).
|
|
for collector in list(getattr(REGISTRY, "_collector_to_names", {})):
|
|
if isinstance(collector, SystemModelHealthCollector):
|
|
return
|
|
REGISTRY.register(SystemModelHealthCollector())
|
|
logger.info("Registered SystemModelHealthCollector on Prometheus default registry")
|