Files
mnemosyne/mnemosyne/library/services/search.py
Robert Helewka a2c885cf34
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 52s
CVE Scan & Docker Build / build-and-push (push) Successful in 2m32s
feat(library): add workspace-scoped search and JWT auth for Daedalus
- Extend library list endpoint with `include_workspace` and
  `with_item_count` query params to support Daedalus registry mirroring
- Expand search scope clause to three modes: workspace-only, workspace
  plus allowed user libraries, and global
- Add `allowed_libraries` field to SearchRequest for Phase-2 JWT claims
- Introduce JWT-based actor resolution using a synthetic service user
  (`MCP_JWT_SERVICE_USERNAME`) for Daedalus-originated requests
2026-05-03 17:36:06 -04:00

817 lines
29 KiB
Python

"""
Hybrid search service for the Mnemosyne knowledge graph.
Orchestrates vector search, full-text search, graph traversal, and
image search against Neo4j — then fuses results via Reciprocal Rank
Fusion and optionally re-ranks via Synesis.
"""
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from django.conf import settings
from neomodel import db
from library.metrics import (
SEARCH_CANDIDATES_TOTAL,
SEARCH_DURATION,
SEARCH_REQUESTS_TOTAL,
SEARCH_TOTAL_DURATION,
)
from .fusion import ImageSearchResult, SearchCandidate, reciprocal_rank_fusion
logger = logging.getLogger(__name__)
# Search-scope clause appended to every search Cypher query.
#
# Three modes, picked structurally by which params are set:
#
# 1. ``workspace_id`` set, ``allowed_libraries`` empty → workspace-scoped.
# Returns ONLY content from libraries whose workspace_id matches.
# 2. ``workspace_id`` set + ``allowed_libraries`` non-empty → workspace
# PLUS the listed user-managed libraries (typical Phase-2 chat turn).
# 3. Both null → global. Returns ONLY libraries with no workspace_id
# (legacy opaque-token callers / dashboard).
#
# When ``allowed_libraries`` is non-empty alone (no workspace_id), it
# narrows results to those libraries.
_WORKSPACE_SCOPE_CLAUSE = (
" AND ("
"($workspace_id IS NOT NULL AND lib.workspace_id = $workspace_id) "
"OR ($allowed_libraries IS NOT NULL AND lib.uid IN $allowed_libraries) "
"OR ($workspace_id IS NULL AND $allowed_libraries IS NULL "
" AND lib.workspace_id IS NULL)"
")"
)
@dataclass
class SearchRequest:
"""Parameters for a search query.
Scope is single-mode: a request is either workspace-scoped (workspace_id
set) or global (workspace_id is None). There is no parameter combination
that returns both workspace and global content in one call.
"""
query: str
query_image: Optional[bytes] = None
library_uid: Optional[str] = None
library_type: Optional[str] = None
collection_uid: Optional[str] = None
workspace_id: Optional[str] = None
# Phase-2 token claim: user-managed libraries the caller may include
# alongside their workspace's auto-library. Cypher uses ``IS NULL`` vs
# non-empty list to gate the second branch of the scope clause.
allowed_libraries: Optional[list[str]] = None
search_types: list[str] = field(
default_factory=lambda: ["vector", "fulltext", "graph"]
)
limit: int = 20
vector_top_k: int = 50
fulltext_top_k: int = 30
graph_max_depth: int = 2
rerank: bool = True
include_images: bool = True
def __post_init__(self):
# Normalize empty strings to None so "" doesn't slip through as
# truthy at the Cypher boundary.
if self.workspace_id == "":
self.workspace_id = None
if self.library_uid == "":
self.library_uid = None
if self.library_type == "":
self.library_type = None
if self.collection_uid == "":
self.collection_uid = None
# Empty list collapses to None so the Cypher branch reads
# "$allowed_libraries IS NOT NULL" rather than "size > 0" — keeps
# the parameter binding straightforward and the predicate sargable.
if self.allowed_libraries is not None and len(self.allowed_libraries) == 0:
self.allowed_libraries = None
@dataclass
class SearchResponse:
"""Results from a search query."""
query: str
candidates: list[SearchCandidate]
images: list[ImageSearchResult]
total_candidates: int
search_time_ms: float
reranker_used: bool
reranker_model: Optional[str]
search_types_used: list[str]
class SearchService:
"""
Orchestrates hybrid search across the Mnemosyne knowledge graph.
Search pipeline:
1. Embed query text via system embedding model
2. Run enabled search types in parallel (vector, fulltext, graph)
3. Fuse results via Reciprocal Rank Fusion
4. Optionally re-rank via Synesis
5. Optionally search images
"""
def __init__(self, user=None):
"""
:param user: Optional Django user for usage tracking.
"""
self.user = user
def search(self, request: SearchRequest) -> SearchResponse:
"""
Execute a hybrid search query.
:param request: SearchRequest with query and parameters.
:returns: SearchResponse with ranked results.
:raises ValueError: If no embedding model configured.
"""
start_time = time.time()
library_type_label = request.library_type or "all"
# --- Embed the query ---
query_vector = self._embed_query(request)
# --- Run search types ---
result_lists = []
search_types_used = []
if "vector" in request.search_types and query_vector:
vector_results = self._vector_search(request, query_vector)
if vector_results:
result_lists.append(vector_results)
search_types_used.append("vector")
SEARCH_REQUESTS_TOTAL.labels(
search_type="vector", library_type=library_type_label
).inc()
if "fulltext" in request.search_types:
fulltext_results = self._fulltext_search(request)
if fulltext_results:
result_lists.append(fulltext_results)
search_types_used.append("fulltext")
SEARCH_REQUESTS_TOTAL.labels(
search_type="fulltext", library_type=library_type_label
).inc()
if "graph" in request.search_types:
graph_results = self._graph_search(request)
if graph_results:
result_lists.append(graph_results)
search_types_used.append("graph")
SEARCH_REQUESTS_TOTAL.labels(
search_type="graph", library_type=library_type_label
).inc()
total_candidates = sum(len(rl) for rl in result_lists)
# --- Fuse results ---
rrf_k = getattr(settings, "SEARCH_RRF_K", 60)
fused = reciprocal_rank_fusion(
result_lists, k=rrf_k, limit=request.limit * 2
)
# --- Re-rank ---
reranker_used = False
reranker_model_name = None
if request.rerank and fused:
reranked, model_name = self._rerank(request, fused)
if reranked is not None:
fused = reranked
reranker_used = True
reranker_model_name = model_name
# Trim to limit
fused = fused[: request.limit]
# --- Image search ---
images = []
if request.include_images and query_vector:
images = self._image_search(request, query_vector)
elapsed_ms = (time.time() - start_time) * 1000
SEARCH_TOTAL_DURATION.observe(elapsed_ms / 1000)
logger.info(
"Search completed query='%s' types=%s total_candidates=%d "
"fused=%d reranked=%s elapsed=%.1fms",
request.query[:80],
search_types_used,
total_candidates,
len(fused),
reranker_used,
elapsed_ms,
)
return SearchResponse(
query=request.query,
candidates=fused,
images=images,
total_candidates=total_candidates,
search_time_ms=round(elapsed_ms, 2),
reranker_used=reranker_used,
reranker_model=reranker_model_name,
search_types_used=search_types_used,
)
# ------------------------------------------------------------------
# Query embedding
# ------------------------------------------------------------------
def _embed_query(self, request: SearchRequest) -> Optional[list[float]]:
"""
Embed the query text using the system embedding model.
Prepends the library's embedding_instruction when scoped to
a specific library for vector space alignment.
:param request: SearchRequest.
:returns: Query embedding vector, or None if not available.
"""
from llm_manager.models import LLMModel
from .embedding_client import EmbeddingClient
embedding_model = LLMModel.get_system_embedding_model()
if not embedding_model:
logger.warning("No system embedding model configured — skipping vector search")
return None
# Get embedding instruction for library-scoped search
instruction = ""
if request.library_uid:
instruction = self._get_embedding_instruction(request.library_uid)
elif request.library_type:
instruction = self._get_type_embedding_instruction(request.library_type)
# Build query text with instruction prefix
query_text = request.query
if instruction:
query_text = f"{instruction}\n\n{query_text}"
try:
client = EmbeddingClient(embedding_model, user=self.user)
vector = client.embed_text(query_text)
logger.debug(
"Query embedded dimensions=%d instruction_len=%d",
len(vector),
len(instruction),
)
return vector
except Exception as exc:
logger.error("Query embedding failed: %s", exc)
return None
# ------------------------------------------------------------------
# Vector search
# ------------------------------------------------------------------
def _vector_search(
self, request: SearchRequest, query_vector: list[float]
) -> list[SearchCandidate]:
"""
Search Chunk embeddings via Neo4j vector index.
:param request: SearchRequest with scope parameters.
:param query_vector: Embedded query vector.
:returns: List of SearchCandidate sorted by cosine similarity.
"""
start = time.time()
top_k = request.vector_top_k
# Build Cypher with optional filtering
cypher = (
"""
CALL db.index.vector.queryNodes('chunk_embedding_index', $top_k, $query_vector)
YIELD node AS chunk, score
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(col:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
AND ($collection_uid IS NULL OR col.uid = $collection_uid)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type, score
ORDER BY score DESC
LIMIT $top_k
"""
)
params = {
"top_k": top_k,
"query_vector": query_vector,
"library_uid": request.library_uid,
"library_type": request.library_type,
"collection_uid": request.collection_uid,
"workspace_id": request.workspace_id,
"allowed_libraries": request.allowed_libraries,
}
try:
results, _ = db.cypher_query(cypher, params)
except Exception as exc:
logger.error("Vector search failed: %s", exc)
return []
candidates = [
SearchCandidate(
chunk_uid=row[0] or "",
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=float(row[7]) if row[7] else 0.0,
source="vector",
)
for row in results
if row[0] # Skip if chunk_uid is None
]
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="vector").observe(elapsed)
SEARCH_CANDIDATES_TOTAL.labels(search_type="vector").observe(len(candidates))
logger.debug(
"Vector search results=%d top_k=%d elapsed=%.3fs",
len(candidates),
top_k,
elapsed,
)
return candidates
# ------------------------------------------------------------------
# Full-text search
# ------------------------------------------------------------------
def _fulltext_search(self, request: SearchRequest) -> list[SearchCandidate]:
"""
Search via Neo4j full-text indexes (BM25).
Queries both chunk_text_fulltext and concept_name_fulltext indexes,
then merges results.
:param request: SearchRequest with query and scope parameters.
:returns: List of SearchCandidate sorted by BM25 score.
"""
start = time.time()
top_k = request.fulltext_top_k
candidates: dict[str, SearchCandidate] = {}
# --- Chunk text search ---
self._fulltext_chunk_search(request, top_k, candidates)
# --- Concept-to-chunk traversal ---
self._fulltext_concept_search(request, top_k, candidates)
result = sorted(candidates.values(), key=lambda c: c.score, reverse=True)[
:top_k
]
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="fulltext").observe(elapsed)
SEARCH_CANDIDATES_TOTAL.labels(search_type="fulltext").observe(len(result))
logger.debug(
"Fulltext search results=%d elapsed=%.3fs", len(result), elapsed
)
return result
def _fulltext_chunk_search(
self,
request: SearchRequest,
top_k: int,
candidates: dict[str, SearchCandidate],
):
"""Search chunk_text_fulltext index and add to candidates dict."""
cypher = (
"""
CALL db.index.fulltext.queryNodes('chunk_text_fulltext', $query)
YIELD node AS chunk, score
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(col:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
AND ($collection_uid IS NULL OR col.uid = $collection_uid)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type, score
ORDER BY score DESC
LIMIT $top_k
"""
)
params = {
"query": request.query,
"top_k": top_k,
"library_uid": request.library_uid,
"library_type": request.library_type,
"collection_uid": request.collection_uid,
"workspace_id": request.workspace_id,
"allowed_libraries": request.allowed_libraries,
}
try:
results, _ = db.cypher_query(cypher, params)
# Keep raw BM25 scores — RRF fuses by rank, not by score magnitude.
for row in results:
uid = row[0]
if not uid:
continue
raw_score = float(row[7]) if row[7] else 0.0
if uid not in candidates or raw_score > candidates[uid].score:
candidates[uid] = SearchCandidate(
chunk_uid=uid,
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=raw_score,
source="fulltext",
)
except Exception as exc:
logger.error("Fulltext chunk search failed: %s", exc)
def _fulltext_concept_search(
self,
request: SearchRequest,
top_k: int,
candidates: dict[str, SearchCandidate],
):
"""Search concept_name_fulltext and traverse to chunks."""
cypher = (
"""
CALL db.index.fulltext.queryNodes('concept_name_fulltext', $query)
YIELD node AS concept, score AS concept_score
MATCH (chunk:Chunk)-[:MENTIONS]->(concept)
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type,
concept_score * 0.8 AS score
ORDER BY score DESC
LIMIT $top_k
"""
)
params = {
"query": request.query,
"top_k": top_k,
"library_uid": request.library_uid,
"library_type": request.library_type,
"workspace_id": request.workspace_id,
"allowed_libraries": request.allowed_libraries,
}
try:
results, _ = db.cypher_query(cypher, params)
# Raw scores already include the 0.8 concept downweight from Cypher.
for row in results:
uid = row[0]
if not uid:
continue
raw_score = float(row[7]) if row[7] else 0.0
if uid not in candidates or raw_score > candidates[uid].score:
candidates[uid] = SearchCandidate(
chunk_uid=uid,
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=raw_score,
source="fulltext",
)
except Exception as exc:
logger.error("Fulltext concept search failed: %s", exc)
# ------------------------------------------------------------------
# Graph search
# ------------------------------------------------------------------
def _graph_search(self, request: SearchRequest) -> list[SearchCandidate]:
"""
Knowledge graph traversal search.
Matches query terms against Concept names, then traverses
MENTIONS/REFERENCES relationships to discover related chunks.
:param request: SearchRequest with query and scope parameters.
:returns: List of SearchCandidate from graph traversal.
"""
start = time.time()
cypher = (
"""
CALL db.index.fulltext.queryNodes('concept_name_fulltext', $query)
YIELD node AS concept, score AS concept_score
WITH concept, concept_score
ORDER BY concept_score DESC
LIMIT 10
MATCH (chunk:Chunk)-[:MENTIONS]->(concept)
MATCH (item:Item)-[:HAS_CHUNK]->(chunk)
MATCH (lib:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
WITH chunk, item, lib,
max(concept_score) AS score,
collect(DISTINCT concept.name)[..5] AS concept_names
RETURN chunk.uid AS chunk_uid, chunk.text_preview AS text_preview,
chunk.chunk_s3_key AS chunk_s3_key, chunk.chunk_index AS chunk_index,
item.uid AS item_uid, item.title AS item_title,
lib.library_type AS library_type,
score, concept_names
ORDER BY score DESC
LIMIT $limit
"""
)
params = {
"query": request.query,
"limit": request.fulltext_top_k,
"library_uid": request.library_uid,
"library_type": request.library_type,
"workspace_id": request.workspace_id,
"allowed_libraries": request.allowed_libraries,
}
try:
results, _ = db.cypher_query(cypher, params)
except Exception as exc:
logger.error("Graph search failed: %s", exc)
return []
candidates = []
for row in results:
uid = row[0]
if not uid:
continue
raw_score = float(row[7]) if row[7] else 0.0
concept_names = row[8] if len(row) > 8 else []
candidates.append(
SearchCandidate(
chunk_uid=uid,
text_preview=row[1] or "",
chunk_s3_key=row[2] or "",
chunk_index=row[3] or 0,
item_uid=row[4] or "",
item_title=row[5] or "",
library_type=row[6] or "",
score=raw_score,
source="graph",
metadata={"concepts": concept_names},
)
)
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="graph").observe(elapsed)
SEARCH_CANDIDATES_TOTAL.labels(search_type="graph").observe(len(candidates))
logger.debug(
"Graph search results=%d elapsed=%.3fs", len(candidates), elapsed
)
return candidates
# ------------------------------------------------------------------
# Image search
# ------------------------------------------------------------------
def _image_search(
self, request: SearchRequest, query_vector: list[float]
) -> list[ImageSearchResult]:
"""
Search images via multimodal vector index.
:param request: SearchRequest.
:param query_vector: Embedded query vector.
:returns: List of ImageSearchResult.
"""
start = time.time()
cypher = (
"""
CALL db.index.vector.queryNodes('image_embedding_index', $top_k, $query_vector)
YIELD node AS emb_node, score
MATCH (img:Image)-[:HAS_EMBEDDING]->(emb_node)
MATCH (item:Item)-[:HAS_IMAGE]->(img)
MATCH (lib:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(item)
WHERE ($library_uid IS NULL OR lib.uid = $library_uid)
AND ($library_type IS NULL OR lib.library_type = $library_type)
"""
+ _WORKSPACE_SCOPE_CLAUSE
+ """
RETURN img.uid AS image_uid, img.image_type AS image_type,
img.description AS description, img.s3_key AS s3_key,
item.uid AS item_uid, item.title AS item_title,
score
ORDER BY score DESC
LIMIT 10
"""
)
params = {
"top_k": 10,
"query_vector": query_vector,
"library_uid": request.library_uid,
"library_type": request.library_type,
"workspace_id": request.workspace_id,
"allowed_libraries": request.allowed_libraries,
}
try:
results, _ = db.cypher_query(cypher, params)
except Exception as exc:
logger.error("Image search failed: %s", exc)
return []
images = [
ImageSearchResult(
image_uid=row[0] or "",
image_type=row[1] or "",
description=row[2] or "",
s3_key=row[3] or "",
item_uid=row[4] or "",
item_title=row[5] or "",
score=float(row[6]) if row[6] else 0.0,
source="vector",
)
for row in results
if row[0]
]
elapsed = time.time() - start
SEARCH_DURATION.labels(search_type="image").observe(elapsed)
logger.debug(
"Image search results=%d elapsed=%.3fs", len(images), elapsed
)
return images
# ------------------------------------------------------------------
# Re-ranking
# ------------------------------------------------------------------
def _rerank(
self, request: SearchRequest, candidates: list[SearchCandidate]
) -> tuple[Optional[list[SearchCandidate]], Optional[str]]:
"""
Re-rank candidates via Synesis.
:param request: SearchRequest.
:param candidates: Fused candidates to re-rank.
:returns: Tuple of (reranked_candidates, model_name) or (None, None).
"""
from llm_manager.models import LLMModel
from .reranker import RerankerClient
reranker_model = LLMModel.get_system_reranker_model()
if not reranker_model:
logger.debug("No system reranker model — skipping re-ranking")
return None, None
# Get content-type reranker instruction
instruction = self._get_reranker_instruction(request, candidates)
# Cap candidates at configured maximum
max_candidates = getattr(settings, "RERANKER_MAX_CANDIDATES", 32)
candidates_to_rerank = candidates[:max_candidates]
try:
client = RerankerClient(reranker_model, user=self.user)
# Don't pass top_n — let the reranker score every candidate so
# cross-attention can promote items the RRF stage ranked low.
# Final trimming to request.limit happens in search().
reranked = client.rerank(
query=request.query,
candidates=candidates_to_rerank,
instruction=instruction,
query_image=request.query_image,
)
return reranked, reranker_model.name
except Exception as exc:
logger.warning(
"Re-ranking failed, returning fusion results: %s", exc
)
return None, None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
GENERIC_RERANKER_INSTRUCTION = (
"Re-rank these passages by relevance to the query."
)
def _get_reranker_instruction(
self, request: SearchRequest, candidates: list[SearchCandidate]
) -> str:
"""
Get the content-type-aware reranker instruction.
Scoped queries (by library or library type) use that type's
instruction. Unscoped queries — even when results happen to
come mostly from one type — use a generic instruction so the
reranker is not biased toward the majority type.
:param request: SearchRequest.
:param candidates: Candidates (unused; kept for API stability).
:returns: Reranker instruction string.
"""
from library.content_types import get_library_type_config
if request.library_type:
try:
config = get_library_type_config(request.library_type)
return config.get("reranker_instruction", "")
except ValueError:
pass
if request.library_uid:
instruction = self._get_library_reranker_instruction(request.library_uid)
if instruction:
return instruction
return self.GENERIC_RERANKER_INSTRUCTION
def _get_library_reranker_instruction(self, library_uid: str) -> str:
"""Get reranker_instruction from a Library node."""
try:
from library.models import Library
lib = Library.nodes.get(uid=library_uid)
return lib.reranker_instruction or ""
except Exception as exc:
logger.warning(
"Failed to load reranker_instruction for library_uid=%s: %s",
library_uid,
exc,
)
return ""
def _get_embedding_instruction(self, library_uid: str) -> str:
"""Get embedding_instruction from a Library node."""
try:
from library.models import Library
lib = Library.nodes.get(uid=library_uid)
return lib.embedding_instruction or ""
except Exception as exc:
logger.warning(
"Failed to load embedding_instruction for library_uid=%s: %s",
library_uid,
exc,
)
return ""
def _get_type_embedding_instruction(self, library_type: str) -> str:
"""Get embedding_instruction for a library type."""
try:
from library.content_types import get_library_type_config
config = get_library_type_config(library_type)
return config.get("embedding_instruction", "")
except ValueError:
return ""