Set healtcheck to run every 60s

This commit is contained in:
2026-05-17 16:31:06 -04:00
parent d3ebe1509d
commit 30330f2234

View File

@@ -34,6 +34,7 @@ import json
import logging
import os
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@@ -66,9 +67,16 @@ _ANTHROPIC_DEFAULT_API = "https://api.anthropic.com/v1"
_OPENAI_DEFAULT_API = "https://api.openai.com/v1"
_GENERIC_DEFAULT_API = "http://localhost:11434/v1"
# Populated by validate_llm_providers() at startup, read by get_health()
# Populated by validate_llm_providers() at startup, refreshed on a TTL by
# _refresh_llm_status_if_stale() from inside the get_health MCP tool. The
# TTL re-probe is what makes the agent self-heal when an upstream LLM was
# briefly unreachable at boot — without it the cached error sticks until
# the process restarts. See docs/pallas.md § Health System.
_llm_status: dict[str, dict] = {}
_active_provider: str = ""
_llm_status_ts: float = 0.0 # time.monotonic() of last successful probe; 0 = never
_llm_refresh_lock = asyncio.Lock()
_LLM_HEALTH_TTL_S = float(os.environ.get("PALLAS_LLM_HEALTH_TTL", "60"))
# ── Config loading ───────────────────────────────────────────────────────────
@@ -342,6 +350,92 @@ def _preflight_bedrock(config: dict, secrets: dict, active_model: str) -> dict:
return {"status": "error", "model": active_model, "message": msg}
async def _probe_active_provider(
client: httpx.AsyncClient,
config: dict,
secrets: dict,
active_provider: str,
active_model: str,
) -> dict:
"""Run the preflight probe for a single provider and return its result dict.
Shared by the startup path (:func:`validate_llm_providers`) and the
runtime TTL refresh path (:func:`_refresh_llm_status_if_stale`). Keeps
the dispatch matrix from being duplicated.
"""
if active_provider == "anthropic":
return await _preflight_anthropic(client, config, secrets, active_model)
if active_provider == "openai":
return await _preflight_openai(client, config, secrets, active_model)
if active_provider == "generic":
return await _preflight_generic(client, config, secrets, active_model)
if active_provider == "bedrock":
return _preflight_bedrock(config, secrets, active_model)
# Known to fast-agent? Surface that gap explicitly rather than silently
# reporting "error" from an empty dict lookup later.
try:
from fast_agent.llm.provider_types import Provider
Provider(active_provider) # raises ValueError if unknown
msg = (
f"preflight for provider '{active_provider}' is not "
"implemented in pallas.health; LLM health will be "
"validated on first inference call"
)
logger.info("%s: %s", active_provider, msg)
return {"status": "ok", "model": active_model, "message": msg}
except ValueError:
msg = f"unknown provider '{active_provider}' in default_model"
logger.warning(msg)
return {"status": "error", "message": msg}
async def _refresh_llm_status_if_stale(timeout: float = 5.0) -> None:
"""Re-probe the active LLM provider when the cached status is older than
``PALLAS_LLM_HEALTH_TTL`` seconds. No-op on a fresh cache or when no
active provider has been resolved yet.
On any unexpected exception, the previous cache entry is left intact —
a real probe result is better signal than our own internal failure.
"""
global _llm_status_ts
if not _active_provider:
return
if time.monotonic() - _llm_status_ts < _LLM_HEALTH_TTL_S:
return
async with _llm_refresh_lock:
# Re-check under the lock — another coroutine may have refreshed
# while we were waiting.
if time.monotonic() - _llm_status_ts < _LLM_HEALTH_TTL_S:
return
try:
_load_dotenv()
config, secrets = _load_config()
default_model = config.get("default_model", "") or ""
if "." not in default_model:
return
active_provider, active_model = default_model.split(".", 1)
if active_provider != _active_provider:
# default_model was edited under us — defer to the next
# full validate_llm_providers() rather than guessing.
return
async with httpx.AsyncClient(timeout=timeout) as client:
result = await _probe_active_provider(
client, config, secrets, active_provider, active_model
)
_llm_status[_active_provider] = result
_llm_status_ts = time.monotonic()
except Exception as exc:
logger.warning(
"llm health re-probe failed (%s); keeping previous cache: %s",
_active_provider,
exc,
)
async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]:
"""
Validate the configured LLM provider and populate the module-level cache
@@ -361,7 +455,7 @@ async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]:
{"openai": {"status": "error", "message": "API request failed (401)"}}
{"unknown": {"status": "error", "message": "unknown provider 'foo'"}}
"""
global _active_provider
global _active_provider, _llm_status_ts
_load_dotenv()
config, secrets = _load_config()
@@ -377,52 +471,21 @@ async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]:
_llm_status.clear()
_llm_status.update(results)
_active_provider = "unknown"
_llm_status_ts = time.monotonic()
return results
active_provider, active_model = default_model.split(".", 1)
results: dict[str, dict] = {}
async with httpx.AsyncClient(timeout=timeout) as client:
if active_provider == "anthropic":
results["anthropic"] = await _preflight_anthropic(
client, config, secrets, active_model
)
elif active_provider == "openai":
results["openai"] = await _preflight_openai(
client, config, secrets, active_model
)
elif active_provider == "generic":
results["generic"] = await _preflight_generic(
client, config, secrets, active_model
)
elif active_provider == "bedrock":
results["bedrock"] = _preflight_bedrock(config, secrets, active_model)
else:
# Known to fast-agent? Surface that gap explicitly rather than
# silently reporting "error" from an empty dict lookup later.
try:
from fast_agent.llm.provider_types import Provider
Provider(active_provider) # raises ValueError if unknown
msg = (
f"preflight for provider '{active_provider}' is not "
"implemented in pallas.health; LLM health will be "
"validated on first inference call"
)
logger.info("%s: %s", active_provider, msg)
results[active_provider] = {
"status": "ok",
"model": active_model,
"message": msg,
}
except ValueError:
msg = f"unknown provider '{active_provider}' in default_model"
logger.warning(msg)
results[active_provider] = {"status": "error", "message": msg}
result = await _probe_active_provider(
client, config, secrets, active_provider, active_model
)
results: dict[str, dict] = {active_provider: result}
_llm_status.clear()
_llm_status.update(results)
_active_provider = active_provider
_llm_status_ts = time.monotonic()
return results
@@ -521,6 +584,7 @@ def register_health_tool(mcp_server, servers: dict[str, dict]) -> None:
description="Returns the health status of this agent and its downstream dependencies.",
)
async def get_health() -> str:
await _refresh_llm_status_if_stale()
result = await check_downstream_health(servers)
# Include LLM provider status from startup preflight (active provider only)
if _active_provider: