diff --git a/mnemosyne/library/apps.py b/mnemosyne/library/apps.py index 2a39b5b..a4acf17 100644 --- a/mnemosyne/library/apps.py +++ b/mnemosyne/library/apps.py @@ -88,6 +88,29 @@ def _should_skip_probe() -> bool: return False +def _is_web_process() -> bool: + """ + True when running inside the web (gunicorn / runserver) process. + + The reachability collector must only register here: ``/metrics`` is served + by the web process, and registering in the Celery worker would both probe + the GPU endpoints from a process whose metrics nobody scrapes and risk + duplicate registration. Celery launches via ``celery`` argv; management + commands are excluded above. + """ + argv0 = sys.argv[0] + if "celery" in argv0 or (len(sys.argv) >= 2 and sys.argv[1] == "celery"): + return False + if "pytest" in argv0 or "PYTEST_CURRENT_TEST" in os.environ: + return False + # gunicorn (prod) or runserver (dev). + if "gunicorn" in argv0: + return True + if len(sys.argv) >= 2 and sys.argv[1] == "runserver": + return True + return False + + def _run_startup_probe(): """ Emit ERROR/WARNING logs if the stack is misconfigured for search. @@ -199,4 +222,7 @@ class LibraryConfig(AppConfig): verbose_name = "Library" def ready(self): - pass + if _is_web_process(): + from library.health_collector import register + + register() diff --git a/mnemosyne/library/health_collector.py b/mnemosyne/library/health_collector.py new file mode 100644 index 0000000..41df7ce --- /dev/null +++ b/mnemosyne/library/health_collector.py @@ -0,0 +1,99 @@ +""" +Scrape-time Prometheus collector for system-default model reachability. + +The ingest-pipeline counters in ``library/metrics.py`` live in the Celery +worker process and only move during an active ingest, so they cannot signal +"models down" on an idle queue. This collector runs in the **web** process +(where ``/metrics`` is served by ``django_prometheus``) and probes the four +system-default models at scrape time, emitting an up/down gauge that is +present regardless of queue activity. + +Probe results are cached for a short TTL so rapid scrapes — or multiple +gunicorn workers each scraped in turn — cannot hammer the GPU endpoints. +""" + +import logging +import threading +import time + +from prometheus_client.core import GaugeMetricFamily + +from library.services.model_health import probe_system_models + +logger = logging.getLogger(__name__) + +# Cache probe results so repeated scrapes don't re-probe the router. The +# value is comfortably above a 15s scrape_interval but bounded so a recovered +# model shows green within a minute. +_CACHE_TTL_SECONDS = 55 + +_lock = threading.Lock() +_cache: dict = {"ts": 0.0, "results": None} + + +def _cached_probe() -> list[dict]: + """Return probe results, re-probing only when the cache has expired.""" + now = time.monotonic() + with _lock: + if _cache["results"] is not None and (now - _cache["ts"]) < _CACHE_TTL_SECONDS: + return _cache["results"] + try: + results = probe_system_models() + except Exception as exc: # never let a probe failure break /metrics + logger.warning("Model health probe failed during scrape: %s", exc) + # Serve the stale cache if we have one; otherwise report nothing. + return _cache["results"] or [] + _cache["ts"] = now + _cache["results"] = results + return results + + +class SystemModelHealthCollector: + """prometheus_client custom collector for system-default model health.""" + + def collect(self): + results = _cached_probe() + + up = GaugeMetricFamily( + "mnemosyne_system_default_model_up", + "System-default model endpoint reachable (1) or not (0)", + labels=["role", "model", "api"], + ) + configured = GaugeMetricFamily( + "mnemosyne_system_default_model_configured", + "A system-default model is configured for this role (1) or not (0)", + labels=["role"], + ) + latency = GaugeMetricFamily( + "mnemosyne_system_default_model_probe_latency_seconds", + "Latency of the last reachability probe for this role", + labels=["role"], + ) + + for r in results: + role = r["role"] + configured.add_metric([role], 1 if r["configured"] else 0) + if not r["configured"]: + continue + up.add_metric( + [role, r["model_name"] or "", r["api_name"] or ""], + 1 if r["ok"] else 0, + ) + if r["latency_ms"] is not None: + latency.add_metric([role], r["latency_ms"] / 1000.0) + + yield configured + yield up + yield latency + + +def register(): + """Register the collector against the default registry (idempotent).""" + from prometheus_client import REGISTRY + + # Guard against duplicate registration (autoreload, repeated ready()). + for collector in list(getattr(REGISTRY, "_collector_to_names", {})): + if isinstance(collector, SystemModelHealthCollector): + return + REGISTRY.register(SystemModelHealthCollector()) + logger.info("Registered SystemModelHealthCollector on Prometheus default registry") diff --git a/mnemosyne/library/services/model_health.py b/mnemosyne/library/services/model_health.py new file mode 100644 index 0000000..7e013f4 --- /dev/null +++ b/mnemosyne/library/services/model_health.py @@ -0,0 +1,119 @@ +""" +System-default model reachability probes. + +Provides a cheap, bounded liveness check for the four system-default models +(embedding, chat, vision, reranker) so the embedding dashboard and the +scrape-time Prometheus collector can surface "model not responding" without +running an ingest. + +The probe deliberately hits ``GET {base_url}/models`` as its primary check: +on an OpenAI-compatible router (e.g. the llama-router) this answers instantly +without loading a model, so repeated probes never burn GPU time. This mirrors +the GPU-avoidance principle in ``mcp_server/tools/health.py``. +""" + +import logging +import time +from typing import Optional + +import requests + +logger = logging.getLogger(__name__) + +# api_type values whose endpoints expose an OpenAI-compatible ``/models`` list. +_OPENAI_COMPATIBLE = {"openai", "azure", "ollama", "llama-cpp", "vllm"} + +# (role, getter method name) pairs — order is the dashboard/metrics order. +ROLE_GETTERS = [ + ("embedding", "get_system_embedding_model"), + ("chat", "get_system_chat_model"), + ("vision", "get_system_vision_model"), + ("reranker", "get_system_reranker_model"), +] + + +def probe_api(api, timeout: int = 5) -> tuple[bool, str]: + """Check whether an ``LLMApi`` endpoint is responding. + + Args: + api: ``LLMApi`` instance (provides base_url, api_key, api_type). + timeout: Per-request timeout in seconds. + + Returns: + ``(ok, detail)`` — ok is True if the endpoint answered acceptably; + detail is a short human-readable status (HTTP code, error, or "ok"). + """ + base_url = api.base_url.rstrip("/") + headers = {} + if api.api_key: + headers["Authorization"] = f"Bearer {api.api_key}" + + if api.api_type not in _OPENAI_COMPATIBLE: + # bedrock / anthropic have no equivalent cheap unauthenticated list; + # treat a reachable host as the liveness signal via a HEAD on base_url. + try: + resp = requests.head(base_url, headers=headers, timeout=timeout) + return True, f"reachable (HTTP {resp.status_code})" + except requests.RequestException as exc: + return False, type(exc).__name__ + + url = f"{base_url}/models" + try: + resp = requests.get(url, headers=headers, timeout=timeout) + except requests.Timeout: + return False, f"timeout after {timeout}s" + except requests.RequestException as exc: + return False, type(exc).__name__ + + if resp.status_code == 200: + return True, "ok" + return False, f"HTTP {resp.status_code}" + + +def probe_system_models(timeout: int = 5) -> list[dict]: + """Probe all four system-default models for reachability. + + Returns: + One dict per role with keys: ``role``, ``configured``, ``model_name``, + ``api_name``, ``base_url``, ``ok``, ``detail``, ``latency_ms``. + For an unconfigured role, ``configured`` is False and the probe is + skipped (``ok`` is None). + """ + from llm_manager.models import LLMModel + + results: list[dict] = [] + for role, getter_name in ROLE_GETTERS: + model = getattr(LLMModel, getter_name)() + if model is None: + results.append( + { + "role": role, + "configured": False, + "model_name": None, + "api_name": None, + "base_url": None, + "ok": None, + "detail": "not configured", + "latency_ms": None, + } + ) + continue + + api = model.api + start = time.monotonic() + ok, detail = probe_api(api, timeout=timeout) + latency_ms = round((time.monotonic() - start) * 1000, 1) + results.append( + { + "role": role, + "configured": True, + "model_name": model.name, + "api_name": api.name, + "base_url": api.base_url, + "ok": ok, + "detail": detail, + "latency_ms": latency_ms, + } + ) + + return results diff --git a/mnemosyne/library/templates/library/_model_health_badge.html b/mnemosyne/library/templates/library/_model_health_badge.html new file mode 100644 index 0000000..4f968f1 --- /dev/null +++ b/mnemosyne/library/templates/library/_model_health_badge.html @@ -0,0 +1,18 @@ +{% comment %} +Reachability badge for a system-default model. Expects `h` = one entry from +the `model_health` dict (keys: configured, ok, detail, latency_ms). Renders +nothing when the role is absent from model_health (probe failed entirely). +Text-only badges to match the existing dashboard palette (no emoji per house +HTML rule). +{% endcomment %} +{% if h %} + {% if not h.configured %} + NOT CONFIGURED + {% elif h.ok %} + REACHABLE + {% if h.latency_ms is not None %}{{ h.latency_ms }} ms{% endif %} + {% else %} + NOT RESPONDING + {{ h.detail }} + {% endif %} +{% endif %} diff --git a/mnemosyne/library/templates/library/embedding_dashboard.html b/mnemosyne/library/templates/library/embedding_dashboard.html index 78334ff..7b609f8 100644 --- a/mnemosyne/library/templates/library/embedding_dashboard.html +++ b/mnemosyne/library/templates/library/embedding_dashboard.html @@ -28,6 +28,7 @@ {% if system_embedding_model.supports_multimodal %} Multimodal {% endif %} + {% include "library/_model_health_badge.html" with h=model_health.embedding %} {% else %}
NOT CONFIGURED @@ -41,6 +42,7 @@ {% if system_chat_model %} {{ system_chat_model.api.name }}: {{ system_chat_model.name }} + {% include "library/_model_health_badge.html" with h=model_health.chat %} {% else %} Not configured — concept extraction disabled {% endif %} @@ -51,6 +53,7 @@ {% if system_reranker_model %} {{ system_reranker_model.api.name }}: {{ system_reranker_model.name }} + {% include "library/_model_health_badge.html" with h=model_health.reranker %} {% else %} Not configured — Phase 3 {% endif %} @@ -64,6 +67,7 @@ {% if system_vision_model.supports_vision %} Vision {% endif %} + {% include "library/_model_health_badge.html" with h=model_health.vision %} {% else %} Not configured — image analysis disabled {% endif %} diff --git a/mnemosyne/library/views.py b/mnemosyne/library/views.py index 35dce82..da7c9bf 100644 --- a/mnemosyne/library/views.py +++ b/mnemosyne/library/views.py @@ -729,6 +729,16 @@ def embedding_dashboard(request): except Exception as exc: logger.warning("Could not load system models: %s", exc) + # Reachability of the system-default models (keyed by role for the + # template). A probe failure must never 500 the dashboard. + context["model_health"] = {} + try: + from library.services.model_health import probe_system_models + + context["model_health"] = {r["role"]: r for r in probe_system_models()} + except Exception as exc: + logger.warning("Could not probe system model health: %s", exc) + # Get item status counts and node counts from Neo4j if neo4j_available(): context["neo4j_available"] = True