Workspace scoping is the integration's security-critical property: an agent in workspace A must never see content from workspace B or from any global library, regardless of what the calling LLM tries. Adds `workspace_id` to SearchRequest with __post_init__ normalization that converts empty strings to None — so "" cannot slip through as a truthy filter at the Cypher boundary. Extracts the workspace scope clause to a single string and appends it to all five search queries (vector, fulltext-chunk, fulltext-concept, graph, image): ($workspace_id IS NULL AND lib.workspace_id IS NULL OR lib.workspace_id = $workspace_id) Either workspace-only or global-only — never both — and the operator precedence is bracketed so a refactor can't accidentally widen it. A test verifies the literal clause string for that exact reason. Adds `workspace_id` as a parameter to every MCP tool (`search`, `get_chunk`, `list_libraries`, `list_collections`, `list_items`). Deliberately undocumented in tool docstrings so the calling LLM is never told the parameter exists — it is system-injected by Daedalus's chat path and force-overwritten before reaching Mnemosyne. Mnemosyne also validates the value but the security guarantee is enforced upstream. Adds the `get_health` MCP tool per the Pallas health spec: returns ok / degraded / error after probing Neo4j, S3, and the embedding model registration. Used by Daedalus's existing health poller. Updates the server INSTRUCTIONS string to advertise the new tool and the two new library types (business, finance). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
124 lines
3.7 KiB
Python
124 lines
3.7 KiB
Python
"""Health-check MCP tool — used by Pallas/Daedalus health pollers.
|
|
|
|
Per the Pallas health spec, returns one of:
|
|
- ok — all dependencies reachable
|
|
- degraded — non-critical dependency unhealthy (chat allowed)
|
|
- error — critical dependency unhealthy (chat blocked)
|
|
|
|
The tool is intercepted by the FastMCP server and never invokes an LLM —
|
|
it executes synchronously against Neo4j, S3, and the embedding model
|
|
endpoint, and returns within the poller's timeout.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Any
|
|
|
|
from asgiref.sync import sync_to_async
|
|
|
|
from ..metrics import record_tool_call
|
|
|
|
|
|
def register_health_tools(mcp):
|
|
@mcp.tool
|
|
async def get_health() -> dict[str, Any]:
|
|
"""Health check for Mnemosyne.
|
|
|
|
Returns a status object compatible with the Pallas health spec:
|
|
{status: "ok"|"degraded"|"error", checks: {neo4j, s3, embedding}}.
|
|
"""
|
|
with record_tool_call("get_health"):
|
|
return await sync_to_async(_run_health_check, thread_sensitive=True)()
|
|
|
|
|
|
def _run_health_check() -> dict[str, Any]:
|
|
"""Synchronous health check across Neo4j, S3, and embedding model."""
|
|
checks: dict[str, dict[str, Any]] = {}
|
|
|
|
checks["neo4j"] = _check_neo4j()
|
|
checks["s3"] = _check_s3()
|
|
checks["embedding"] = _check_embedding_model()
|
|
|
|
# Aggregate status: error if any critical check failed; degraded if a
|
|
# non-critical check failed; ok otherwise.
|
|
if checks["neo4j"]["status"] == "error" or checks["s3"]["status"] == "error":
|
|
status = "error"
|
|
elif any(c["status"] != "ok" for c in checks.values()):
|
|
status = "degraded"
|
|
else:
|
|
status = "ok"
|
|
|
|
return {
|
|
"status": status,
|
|
"checks": checks,
|
|
}
|
|
|
|
|
|
def _check_neo4j() -> dict[str, Any]:
|
|
start = time.time()
|
|
try:
|
|
from neomodel import db
|
|
|
|
db.cypher_query("RETURN 1")
|
|
return {
|
|
"status": "ok",
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|
|
except Exception as exc:
|
|
return {
|
|
"status": "error",
|
|
"error": str(exc),
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|
|
|
|
|
|
def _check_s3() -> dict[str, Any]:
|
|
start = time.time()
|
|
try:
|
|
from django.core.files.storage import default_storage
|
|
|
|
# `exists` on a path that won't exist is the cheapest round-trip
|
|
# we have. It returns False rather than raising on most backends.
|
|
default_storage.exists("__healthcheck__")
|
|
return {
|
|
"status": "ok",
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|
|
except Exception as exc:
|
|
return {
|
|
"status": "error",
|
|
"error": str(exc),
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|
|
|
|
|
|
def _check_embedding_model() -> dict[str, Any]:
|
|
"""Soft check: confirm a system embedding model is configured.
|
|
|
|
We don't hit the model endpoint here — that would burn GPU time on
|
|
every poll. The poller-level check is "is a model registered."
|
|
"""
|
|
start = time.time()
|
|
try:
|
|
from llm_manager.models import LLMModel
|
|
|
|
model = LLMModel.get_system_embedding_model()
|
|
if model is None:
|
|
return {
|
|
"status": "degraded",
|
|
"error": "no system embedding model configured",
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|
|
return {
|
|
"status": "ok",
|
|
"model": model.name,
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|
|
except Exception as exc:
|
|
return {
|
|
"status": "degraded",
|
|
"error": str(exc),
|
|
"duration_ms": round((time.time() - start) * 1000, 1),
|
|
}
|