Files
mnemosyne/mnemosyne/library/services/search.py
Robert Helewka 5527cf6bdb feat(search,mcp): workspace-scope search and add get_health MCP tool
Workspace scoping is the integration's security-critical property: an
agent in workspace A must never see content from workspace B or from
any global library, regardless of what the calling LLM tries.

Adds `workspace_id` to SearchRequest with __post_init__ normalization
that converts empty strings to None — so "" cannot slip through as a
truthy filter at the Cypher boundary. Extracts the workspace scope
clause to a single string and appends it to all five search queries
(vector, fulltext-chunk, fulltext-concept, graph, image):

  ($workspace_id IS NULL AND lib.workspace_id IS NULL
   OR lib.workspace_id = $workspace_id)

Either workspace-only or global-only — never both — and the operator
precedence is bracketed so a refactor can't accidentally widen it. A
test verifies the literal clause string for that exact reason.

Adds `workspace_id` as a parameter to every MCP tool (`search`,
`get_chunk`, `list_libraries`, `list_collections`, `list_items`).
Deliberately undocumented in tool docstrings so the calling LLM is never
told the parameter exists — it is system-injected by Daedalus's chat
path and force-overwritten before reaching Mnemosyne. Mnemosyne also
validates the value but the security guarantee is enforced upstream.

Adds the `get_health` MCP tool per the Pallas health spec: returns
ok / degraded / error after probing Neo4j, S3, and the embedding
model registration. Used by Daedalus's existing health poller.
Updates the server INSTRUCTIONS string to advertise the new tool and
the two new library types (business, finance).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-29 06:27:32 -04:00

791 lines
28 KiB
Python

"""
Hybrid search service for the Mnemosyne knowledge graph.
Orchestrates vector search, full-text search, graph traversal, and
image search against Neo4j — then fuses results via Reciprocal Rank
Fusion and optionally re-ranks via Synesis.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from django.conf import settings
from neomodel import db
from library.metrics import (
SEARCH_CANDIDATES_TOTAL,
SEARCH_DURATION,
SEARCH_REQUESTS_TOTAL,
SEARCH_TOTAL_DURATION,
)
from .fusion import ImageSearchResult, SearchCandidate, reciprocal_rank_fusion
logger = logging.getLogger(__name__)
# Workspace scoping clause appended to every search Cypher query.
#
# A request with workspace_id set returns ONLY that workspace's content.
# A request with workspace_id null returns ONLY global content (libraries
# with no workspace_id). There is no third mode.
_WORKSPACE_SCOPE_CLAUSE = (
" AND ($workspace_id IS NULL AND lib.workspace_id IS NULL OR "
"lib.workspace_id = $workspace_id)"
)
@dataclass
class SearchRequest:
"""Parameters for a search query.
Scope is single-mode: a request is either workspace-scoped (workspace_id
set) or global (workspace_id is None). There is no parameter combination
that returns both workspace and global content in one call.
"""
query: str
query_image: Optional[bytes] = None
library_uid: Optional[str] = None
library_type: Optional[str] = None
collection_uid: Optional[str] = None
workspace_id: Optional[str] = None
search_types: list[str] = field(
default_factory=lambda: ["vector", "fulltext", "graph"]
)
limit: int = 20
vector_top_k: int = 50
fulltext_top_k: int = 30
graph_max_depth: int = 2
rerank: bool = True
include_images: bool = True
def __post_init__(self):
# Normalize empty strings to None so "" doesn't slip through as
# truthy at the Cypher boundary.
if self.workspace_id == "":
self.workspace_id = None
if self.library_uid == "":
self.library_uid = None
if self.library_type == "":
self.library_type = None
if self.collection_uid == "":
self.collection_uid = None
@dataclass
class SearchResponse:
"""Results from a search query."""
query: str
candidates: list[SearchCandidate]
images: list[ImageSearchResult]
total_candidates: int
search_time_ms: float
reranker_used: bool
reranker_model: Optional[str]
search_types_used: list[str]
class SearchService:
"""
Orchestrates hybrid search across the Mnemosyne knowledge graph.
Search pipeline:
1. Embed query text via system embedding model
2. Run enabled search types in parallel (vector, fulltext, graph)
3. Fuse results via Reciprocal Rank Fusion
4. Optionally re-rank via Synesis
5. Optionally search images
"""
def __init__(self, user=None):
"""
:param user: Optional Django user for usage tracking.
"""
self.user = user
def search(self, request: SearchRequest) -> SearchResponse:
"""
Execute a hybrid search query.
:param request: SearchRequest with query and parameters.
:returns: SearchResponse with ranked results.
:raises ValueError: If no embedding model configured.
"""
start_time = time.time()
library_type_label = request.library_type or "all"
# --- Embed the query ---
query_vector = self._embed_query(request)
# --- Run search types ---
result_lists = []
search_types_used = []
if "vector" in request.search_types and query_vector:
vector_results = self._vector_search(request, query_vector)
if vector_results:
result_lists.append(vector_results)
search_types_used.append("vector")
SEARCH_REQUESTS_TOTAL.labels(
search_type="vector", library_type=library_type_label
).inc()
if "fulltext" in request.search_types:
fulltext_results = self._fulltext_search(request)
if fulltext_results:
result_lists.append(fulltext_results)
search_types_used.append("fulltext")
SEARCH_REQUESTS_TOTAL.labels(
search_type="fulltext", library_type=library_type_label
).inc()
if "graph" in request.search_types:
graph_results = self._graph_search(request)
if graph_results:
result_lists.append(graph_results)
search_types_used.append("graph")
SEARCH_REQUESTS_TOTAL.labels(
search_type="graph", library_type=library_type_label
).inc()
total_candidates = sum(len(rl) for rl in result_lists)
# --- Fuse results ---
rrf_k = getattr(settings, "SEARCH_RRF_K", 60)
fused = reciprocal_rank_fusion(
result_lists, k=rrf_k, limit=request.limit * 2
)
# --- Re-rank ---
reranker_used = False
reranker_model_name = None
if request.rerank and fused:
reranked, model_name = self._rerank(request, fused)
if reranked is not None:
fused = reranked
reranker_used = True
reranker_model_name = model_name
# Trim to limit
fused = fused[: request.limit]
# --- Image search ---
images = []
if request.include_images and query_vector:
images = self._image_search(request, query_vector)
elapsed_ms = (time.time() - start_time) * 1000
SEARCH_TOTAL_DURATION.observe(elapsed_ms / 1000)
logger.info(
"Search completed query='%s' types=%s total_candidates=%d "
"fused=%d reranked=%s elapsed=%.1fms",
request.query[:80],
search_types_used,
total_candidates,
len(fused),
reranker_used,
elapsed_ms,
)
return SearchResponse(
query=request.query,
candidates=fused,
images=images,
total_candidates=total_candidates,
search_time_ms=round(elapsed_ms, 2),
reranker_used=reranker_used,
reranker_model=reranker_model_name,
search_types_used=search_types_used,
)
# ------------------------------------------------------------------
# Query embedding
# ------------------------------------------------------------------
def _embed_query(self, request: SearchRequest) -> Optional[list[float]]:
"""
Embed the query text using the system embedding model.
Prepends the library's embedding_instruction when scoped to
a specific library for vector space alignment.
:param request: SearchRequest.
:returns: Query embedding vector, or None if not available.
"""
from llm_manager.models import LLMModel
from .embedding_client import EmbeddingClient
embedding_model = LLMModel.get_system_embedding_model()
if not embedding_model:
logger.warning("No system embedding model configured — skipping vector search")
return None
# Get embedding instruction for library-scoped search
instruction = ""
if request.library_uid:
instruction = self._get_embedding_instruction(request.library_uid)
elif request.library_type:
instruction = self._get_type_embedding_instruction(request.library_type)
# Build query text with instruction prefix
query_text = request.query
if instruction:
query_text = f"{instruction}\n\n{query_text}"
try:
client = EmbeddingClient(embedding_model, user=self.user)
vector = client.embed_text(query_text)
logger.debug(
"Query embedded dimensions=%d instruction_len=%d",
len(vector),
len(instruction),
)
return vector
except Exception as exc:
logger.error("Query embedding failed: %s", exc)
return None
# ------------------------------------------------------------------
# Vector search
# ------------------------------------------------------------------
def _vector_search(
self, request: SearchRequest, query_vector: list[float]
) -> list[SearchCandidate]:
"""
Search Chunk embeddings via Neo4j vector index.
:param request: SearchRequest with scope parameters.
:param query_vector: Embedded query vector.
:returns: List of SearchCandidate sorted by cosine similarity.
"""
start = time.time()
top_k = request.vector_top_k
# Build Cypher with optional filtering
cypher = (
"""
CALL db.index.vector.queryNodes('chunk_embedding_index', $top_k, $query_vector)
YIELD node AS chunk, score
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(col:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
AND ($collection_uid IS NULL OR col.uid = $collection_uid)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type, score
ORDER BY score DESC
LIMIT $top_k
"""
)
params = {
"top_k": top_k,
"query_vector": query_vector,
"library_uid": request.library_uid,
"library_type": request.library_type,
"collection_uid": request.collection_uid,
"workspace_id": request.workspace_id,
}
try:
results, _ = db.cypher_query(cypher, params)
except Exception as exc:
logger.error("Vector search failed: %s", exc)
return []
candidates = [
SearchCandidate(
chunk_uid=row[0] or "",
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=float(row[7]) if row[7] else 0.0,
source="vector",
)
for row in results
if row[0] # Skip if chunk_uid is None
]
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="vector").observe(elapsed)
SEARCH_CANDIDATES_TOTAL.labels(search_type="vector").observe(len(candidates))
logger.debug(
"Vector search results=%d top_k=%d elapsed=%.3fs",
len(candidates),
top_k,
elapsed,
)
return candidates
# ------------------------------------------------------------------
# Full-text search
# ------------------------------------------------------------------
def _fulltext_search(self, request: SearchRequest) -> list[SearchCandidate]:
"""
Search via Neo4j full-text indexes (BM25).
Queries both chunk_text_fulltext and concept_name_fulltext indexes,
then merges results.
:param request: SearchRequest with query and scope parameters.
:returns: List of SearchCandidate sorted by BM25 score.
"""
start = time.time()
top_k = request.fulltext_top_k
candidates: dict[str, SearchCandidate] = {}
# --- Chunk text search ---
self._fulltext_chunk_search(request, top_k, candidates)
# --- Concept-to-chunk traversal ---
self._fulltext_concept_search(request, top_k, candidates)
result = sorted(candidates.values(), key=lambda c: c.score, reverse=True)[
:top_k
]
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="fulltext").observe(elapsed)
SEARCH_CANDIDATES_TOTAL.labels(search_type="fulltext").observe(len(result))
logger.debug(
"Fulltext search results=%d elapsed=%.3fs", len(result), elapsed
)
return result
def _fulltext_chunk_search(
self,
request: SearchRequest,
top_k: int,
candidates: dict[str, SearchCandidate],
):
"""Search chunk_text_fulltext index and add to candidates dict."""
cypher = (
"""
CALL db.index.fulltext.queryNodes('chunk_text_fulltext', $query)
YIELD node AS chunk, score
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(col:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
AND ($collection_uid IS NULL OR col.uid = $collection_uid)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type, score
ORDER BY score DESC
LIMIT $top_k
"""
)
params = {
"query": request.query,
"top_k": top_k,
"library_uid": request.library_uid,
"library_type": request.library_type,
"collection_uid": request.collection_uid,
"workspace_id": request.workspace_id,
}
try:
results, _ = db.cypher_query(cypher, params)
# Keep raw BM25 scores — RRF fuses by rank, not by score magnitude.
for row in results:
uid = row[0]
if not uid:
continue
raw_score = float(row[7]) if row[7] else 0.0
if uid not in candidates or raw_score > candidates[uid].score:
candidates[uid] = SearchCandidate(
chunk_uid=uid,
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=raw_score,
source="fulltext",
)
except Exception as exc:
logger.error("Fulltext chunk search failed: %s", exc)
def _fulltext_concept_search(
self,
request: SearchRequest,
top_k: int,
candidates: dict[str, SearchCandidate],
):
"""Search concept_name_fulltext and traverse to chunks."""
cypher = (
"""
CALL db.index.fulltext.queryNodes('concept_name_fulltext', $query)
YIELD node AS concept, score AS concept_score
MATCH (chunk:Chunk)-[:MENTIONS]->(concept)
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type,
concept_score * 0.8 AS score
ORDER BY score DESC
LIMIT $top_k
"""
)
params = {
"query": request.query,
"top_k": top_k,
"library_uid": request.library_uid,
"library_type": request.library_type,
"workspace_id": request.workspace_id,
}
try:
results, _ = db.cypher_query(cypher, params)
# Raw scores already include the 0.8 concept downweight from Cypher.
for row in results:
uid = row[0]
if not uid:
continue
raw_score = float(row[7]) if row[7] else 0.0
if uid not in candidates or raw_score > candidates[uid].score:
candidates[uid] = SearchCandidate(
chunk_uid=uid,
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=raw_score,
source="fulltext",
)
except Exception as exc:
logger.error("Fulltext concept search failed: %s", exc)
# ------------------------------------------------------------------
# Graph search
# ------------------------------------------------------------------
def _graph_search(self, request: SearchRequest) -> list[SearchCandidate]:
"""
Knowledge graph traversal search.
Matches query terms against Concept names, then traverses
MENTIONS/REFERENCES relationships to discover related chunks.
:param request: SearchRequest with query and scope parameters.
:returns: List of SearchCandidate from graph traversal.
"""
start = time.time()
cypher = (
"""
CALL db.index.fulltext.queryNodes('concept_name_fulltext', $query)
YIELD node AS concept, score AS concept_score
WITH concept, concept_score
ORDER BY concept_score DESC
LIMIT 10
MATCH (chunk:Chunk)-[:MENTIONS]->(concept)
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
WITH chunk, item, lib,
max(concept_score) AS score,
collect(DISTINCT concept.name)[..5] AS concept_names
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type,
score, concept_names
ORDER BY score DESC
LIMIT $limit
"""
)
params = {
"query": request.query,
"limit": request.fulltext_top_k,
"library_uid": request.library_uid,
"library_type": request.library_type,
"workspace_id": request.workspace_id,
}
try:
results, _ = db.cypher_query(cypher, params)
except Exception as exc:
logger.error("Graph search failed: %s", exc)
return []
candidates = []
for row in results:
uid = row[0]
if not uid:
continue
raw_score = float(row[7]) if row[7] else 0.0
concept_names = row[8] if len(row) > 8 else []
candidates.append(
SearchCandidate(
chunk_uid=uid,
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=raw_score,
source="graph",
metadata={"concepts": concept_names},
)
)
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="graph").observe(elapsed)
SEARCH_CANDIDATES_TOTAL.labels(search_type="graph").observe(len(candidates))
logger.debug(
"Graph search results=%d elapsed=%.3fs", len(candidates), elapsed
)
return candidates
# ------------------------------------------------------------------
# Image search
# ------------------------------------------------------------------
def _image_search(
self, request: SearchRequest, query_vector: list[float]
) -> list[ImageSearchResult]:
"""
Search images via multimodal vector index.
:param request: SearchRequest.
:param query_vector: Embedded query vector.
:returns: List of ImageSearchResult.
"""
start = time.time()
cypher = (
"""
CALL db.index.vector.queryNodes('image_embedding_index', $top_k, $query_vector)
YIELD node AS emb_node, score
MATCH (img:Image)-[:HAS_EMBEDDING]->(emb_node)
MATCH (item:Item)-[:HAS_IMAGE]->(img)
MATCH (lib:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN img.uid AS image_uid, img.image_type AS image_type,
img.description AS description, img.s3_key AS s3_key,
item.uid AS item_uid, item.title AS item_title,
score
ORDER BY score DESC
LIMIT 10
"""
)
params = {
"top_k": 10,
"query_vector": query_vector,
"library_uid": request.library_uid,
"library_type": request.library_type,
"workspace_id": request.workspace_id,
}
try:
results, _ = db.cypher_query(cypher, params)
except Exception as exc:
logger.error("Image search failed: %s", exc)
return []
images = [
ImageSearchResult(
image_uid=row[0] or "",
image_type=row[1] or "",
description=row[2] or "",
s3_key=row[3] or "",
item_uid=row[4] or "",
item_title=row[5] or "",
score=float(row[6]) if row[6] else 0.0,
source="vector",
)
for row in results
if row[0]
]
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="image").observe(elapsed)
logger.debug(
"Image search results=%d elapsed=%.3fs", len(images), elapsed
)
return images
# ------------------------------------------------------------------
# Re-ranking
# ------------------------------------------------------------------
def _rerank(
self, request: SearchRequest, candidates: list[SearchCandidate]
) -> tuple[Optional[list[SearchCandidate]], Optional[str]]:
"""
Re-rank candidates via Synesis.
:param request: SearchRequest.
:param candidates: Fused candidates to re-rank.
:returns: Tuple of (reranked_candidates, model_name) or (None, None).
"""
from llm_manager.models import LLMModel
from .reranker import RerankerClient
reranker_model = LLMModel.get_system_reranker_model()
if not reranker_model:
logger.debug("No system reranker model — skipping re-ranking")
return None, None
# Get content-type reranker instruction
instruction = self._get_reranker_instruction(request, candidates)
# Cap candidates at configured maximum
max_candidates = getattr(settings, "RERANKER_MAX_CANDIDATES", 32)
candidates_to_rerank = candidates[:max_candidates]
try:
client = RerankerClient(reranker_model, user=self.user)
# Don't pass top_n — let the reranker score every candidate so
# cross-attention can promote items the RRF stage ranked low.
# Final trimming to request.limit happens in search().
reranked = client.rerank(
query=request.query,
candidates=candidates_to_rerank,
instruction=instruction,
query_image=request.query_image,
)
return reranked, reranker_model.name
except Exception as exc:
logger.warning(
"Re-ranking failed, returning fusion results: %s", exc
)
return None, None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
GENERIC_RERANKER_INSTRUCTION = (
"Re-rank these passages by relevance to the query."
)
def _get_reranker_instruction(
self, request: SearchRequest, candidates: list[SearchCandidate]
) -> str:
"""
Get the content-type-aware reranker instruction.
Scoped queries (by library or library type) use that type's
instruction. Unscoped queries — even when results happen to
come mostly from one type — use a generic instruction so the
reranker is not biased toward the majority type.
:param request: SearchRequest.
:param candidates: Candidates (unused; kept for API stability).
:returns: Reranker instruction string.
"""
from library.content_types import get_library_type_config
if request.library_type:
try:
config = get_library_type_config(request.library_type)
return config.get("reranker_instruction", "")
except ValueError:
pass
if request.library_uid:
instruction = self._get_library_reranker_instruction(request.library_uid)
if instruction:
return instruction
return self.GENERIC_RERANKER_INSTRUCTION
def _get_library_reranker_instruction(self, library_uid: str) -> str:
"""Get reranker_instruction from a Library node."""
try:
from library.models import Library
lib = Library.nodes.get(uid=library_uid)
return lib.reranker_instruction or ""
except Exception as exc:
logger.warning(
"Failed to load reranker_instruction for library_uid=%s: %s",
library_uid,
exc,
)
return ""
def _get_embedding_instruction(self, library_uid: str) -> str:
"""Get embedding_instruction from a Library node."""
try:
from library.models import Library
lib = Library.nodes.get(uid=library_uid)
return lib.embedding_instruction or ""
except Exception as exc:
logger.warning(
"Failed to load embedding_instruction for library_uid=%s: %s",
library_uid,
exc,
)
return ""
def _get_type_embedding_instruction(self, library_type: str) -> str:
"""Get embedding_instruction for a library type."""
try:
from library.content_types import get_library_type_config
config = get_library_type_config(library_type)
return config.get("embedding_instruction", "")
except ValueError:
return ""