From 30330f22342cba0f7aa581b11e6fdaa1c2138d6e Mon Sep 17 00:00:00 2001 From: Robert Helewka Date: Sun, 17 May 2026 16:31:06 -0400 Subject: [PATCH] Set healtcheck to run every 60s --- pallas/health.py | 142 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 103 insertions(+), 39 deletions(-) diff --git a/pallas/health.py b/pallas/health.py index 293b54c..82cd1df 100644 --- a/pallas/health.py +++ b/pallas/health.py @@ -34,6 +34,7 @@ import json import logging import os import re +import time from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -66,9 +67,16 @@ _ANTHROPIC_DEFAULT_API = "https://api.anthropic.com/v1" _OPENAI_DEFAULT_API = "https://api.openai.com/v1" _GENERIC_DEFAULT_API = "http://localhost:11434/v1" -# Populated by validate_llm_providers() at startup, read by get_health() +# Populated by validate_llm_providers() at startup, refreshed on a TTL by +# _refresh_llm_status_if_stale() from inside the get_health MCP tool. The +# TTL re-probe is what makes the agent self-heal when an upstream LLM was +# briefly unreachable at boot — without it the cached error sticks until +# the process restarts. See docs/pallas.md § Health System. _llm_status: dict[str, dict] = {} _active_provider: str = "" +_llm_status_ts: float = 0.0 # time.monotonic() of last successful probe; 0 = never +_llm_refresh_lock = asyncio.Lock() +_LLM_HEALTH_TTL_S = float(os.environ.get("PALLAS_LLM_HEALTH_TTL", "60")) # ── Config loading ─────────────────────────────────────────────────────────── @@ -342,6 +350,92 @@ def _preflight_bedrock(config: dict, secrets: dict, active_model: str) -> dict: return {"status": "error", "model": active_model, "message": msg} +async def _probe_active_provider( + client: httpx.AsyncClient, + config: dict, + secrets: dict, + active_provider: str, + active_model: str, +) -> dict: + """Run the preflight probe for a single provider and return its result dict. + + Shared by the startup path (:func:`validate_llm_providers`) and the + runtime TTL refresh path (:func:`_refresh_llm_status_if_stale`). Keeps + the dispatch matrix from being duplicated. + """ + if active_provider == "anthropic": + return await _preflight_anthropic(client, config, secrets, active_model) + if active_provider == "openai": + return await _preflight_openai(client, config, secrets, active_model) + if active_provider == "generic": + return await _preflight_generic(client, config, secrets, active_model) + if active_provider == "bedrock": + return _preflight_bedrock(config, secrets, active_model) + + # Known to fast-agent? Surface that gap explicitly rather than silently + # reporting "error" from an empty dict lookup later. + try: + from fast_agent.llm.provider_types import Provider + + Provider(active_provider) # raises ValueError if unknown + msg = ( + f"preflight for provider '{active_provider}' is not " + "implemented in pallas.health; LLM health will be " + "validated on first inference call" + ) + logger.info("%s: %s", active_provider, msg) + return {"status": "ok", "model": active_model, "message": msg} + except ValueError: + msg = f"unknown provider '{active_provider}' in default_model" + logger.warning(msg) + return {"status": "error", "message": msg} + + +async def _refresh_llm_status_if_stale(timeout: float = 5.0) -> None: + """Re-probe the active LLM provider when the cached status is older than + ``PALLAS_LLM_HEALTH_TTL`` seconds. No-op on a fresh cache or when no + active provider has been resolved yet. + + On any unexpected exception, the previous cache entry is left intact — + a real probe result is better signal than our own internal failure. + """ + global _llm_status_ts + + if not _active_provider: + return + if time.monotonic() - _llm_status_ts < _LLM_HEALTH_TTL_S: + return + + async with _llm_refresh_lock: + # Re-check under the lock — another coroutine may have refreshed + # while we were waiting. + if time.monotonic() - _llm_status_ts < _LLM_HEALTH_TTL_S: + return + try: + _load_dotenv() + config, secrets = _load_config() + default_model = config.get("default_model", "") or "" + if "." not in default_model: + return + active_provider, active_model = default_model.split(".", 1) + if active_provider != _active_provider: + # default_model was edited under us — defer to the next + # full validate_llm_providers() rather than guessing. + return + async with httpx.AsyncClient(timeout=timeout) as client: + result = await _probe_active_provider( + client, config, secrets, active_provider, active_model + ) + _llm_status[_active_provider] = result + _llm_status_ts = time.monotonic() + except Exception as exc: + logger.warning( + "llm health re-probe failed (%s); keeping previous cache: %s", + _active_provider, + exc, + ) + + async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]: """ Validate the configured LLM provider and populate the module-level cache @@ -361,7 +455,7 @@ async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]: {"openai": {"status": "error", "message": "API request failed (401)"}} {"unknown": {"status": "error", "message": "unknown provider 'foo'"}} """ - global _active_provider + global _active_provider, _llm_status_ts _load_dotenv() config, secrets = _load_config() @@ -377,52 +471,21 @@ async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]: _llm_status.clear() _llm_status.update(results) _active_provider = "unknown" + _llm_status_ts = time.monotonic() return results active_provider, active_model = default_model.split(".", 1) - results: dict[str, dict] = {} async with httpx.AsyncClient(timeout=timeout) as client: - if active_provider == "anthropic": - results["anthropic"] = await _preflight_anthropic( - client, config, secrets, active_model - ) - elif active_provider == "openai": - results["openai"] = await _preflight_openai( - client, config, secrets, active_model - ) - elif active_provider == "generic": - results["generic"] = await _preflight_generic( - client, config, secrets, active_model - ) - elif active_provider == "bedrock": - results["bedrock"] = _preflight_bedrock(config, secrets, active_model) - else: - # Known to fast-agent? Surface that gap explicitly rather than - # silently reporting "error" from an empty dict lookup later. - try: - from fast_agent.llm.provider_types import Provider - - Provider(active_provider) # raises ValueError if unknown - msg = ( - f"preflight for provider '{active_provider}' is not " - "implemented in pallas.health; LLM health will be " - "validated on first inference call" - ) - logger.info("%s: %s", active_provider, msg) - results[active_provider] = { - "status": "ok", - "model": active_model, - "message": msg, - } - except ValueError: - msg = f"unknown provider '{active_provider}' in default_model" - logger.warning(msg) - results[active_provider] = {"status": "error", "message": msg} + result = await _probe_active_provider( + client, config, secrets, active_provider, active_model + ) + results: dict[str, dict] = {active_provider: result} _llm_status.clear() _llm_status.update(results) _active_provider = active_provider + _llm_status_ts = time.monotonic() return results @@ -521,6 +584,7 @@ def register_health_tool(mcp_server, servers: dict[str, dict]) -> None: description="Returns the health status of this agent and its downstream dependencies.", ) async def get_health() -> str: + await _refresh_llm_status_if_stale() result = await check_downstream_health(servers) # Include LLM provider status from startup preflight (active provider only) if _active_provider: