feat: add Phase 3 hybrid search with Synesis reranking
Implement hybrid search pipeline combining vector, fulltext, and graph search across Neo4j, with cross-attention reranking via Synesis (Qwen3-VL-Reranker-2B) `/v1/rerank` endpoint. - Add SearchService with vector, fulltext, and graph search strategies - Add SynesisRerankerClient for multimodal reranking via HTTP API - Add search API endpoint (POST /search/) with filtering by library, collection, and library_type - Add SearchRequest/Response serializers and image search results - Add "nonfiction" to library_type choices - Consolidate reranker stack from two models to single Synesis service - Handle image analysis_status as "skipped" when analysis is unavailable - Add comprehensive tests for search pipeline and reranker client
This commit is contained in:
129
mnemosyne/library/services/fusion.py
Normal file
129
mnemosyne/library/services/fusion.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""
|
||||
Reciprocal Rank Fusion (RRF) for merging multiple ranked result lists.
|
||||
|
||||
Combines results from vector search, full-text search, and graph traversal
|
||||
into a single ranked list. Candidates appearing in multiple lists receive
|
||||
a natural score boost.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from library.metrics import FUSION_DURATION
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SearchCandidate:
|
||||
"""
|
||||
A single search result candidate (text chunk).
|
||||
|
||||
Carries enough context for re-ranking, display, and citation.
|
||||
"""
|
||||
|
||||
chunk_uid: str
|
||||
item_uid: str
|
||||
item_title: str
|
||||
library_type: str
|
||||
text_preview: str
|
||||
chunk_s3_key: str
|
||||
chunk_index: int
|
||||
score: float
|
||||
source: str # "vector", "fulltext", "graph"
|
||||
metadata: dict = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageSearchResult:
|
||||
"""A search result for an image."""
|
||||
|
||||
image_uid: str
|
||||
item_uid: str
|
||||
item_title: str
|
||||
image_type: str
|
||||
description: str
|
||||
s3_key: str
|
||||
score: float
|
||||
source: str # "vector", "graph"
|
||||
|
||||
|
||||
def reciprocal_rank_fusion(
|
||||
result_lists: list[list[SearchCandidate]],
|
||||
k: int = 60,
|
||||
limit: int = 50,
|
||||
) -> list[SearchCandidate]:
|
||||
"""
|
||||
Merge multiple ranked result lists using Reciprocal Rank Fusion.
|
||||
|
||||
RRF score for candidate c = Σ 1 / (k + rank_i)
|
||||
where rank_i is the 1-based position in list i.
|
||||
|
||||
Candidates appearing in multiple lists receive a natural boost
|
||||
because they accumulate scores from each list.
|
||||
|
||||
:param result_lists: List of ranked candidate lists (one per search type).
|
||||
:param k: RRF constant (higher = less emphasis on top ranks). Default 60.
|
||||
:param limit: Maximum number of results to return.
|
||||
:returns: Merged, deduplicated, and re-scored candidates sorted by RRF score.
|
||||
"""
|
||||
start = time.time()
|
||||
|
||||
# Accumulate RRF scores by chunk_uid
|
||||
scores: dict[str, float] = {}
|
||||
candidates: dict[str, SearchCandidate] = {}
|
||||
sources: dict[str, list[str]] = {}
|
||||
|
||||
for result_list in result_lists:
|
||||
for rank, candidate in enumerate(result_list, start=1):
|
||||
uid = candidate.chunk_uid
|
||||
rrf_score = 1.0 / (k + rank)
|
||||
scores[uid] = scores.get(uid, 0.0) + rrf_score
|
||||
|
||||
# Keep the candidate with the highest original score
|
||||
if uid not in candidates or candidate.score > candidates[uid].score:
|
||||
candidates[uid] = candidate
|
||||
|
||||
# Track which search types found this candidate
|
||||
sources.setdefault(uid, [])
|
||||
if candidate.source not in sources[uid]:
|
||||
sources[uid].append(candidate.source)
|
||||
|
||||
# Build fused results
|
||||
fused = []
|
||||
for uid, rrf_score in scores.items():
|
||||
candidate = candidates[uid]
|
||||
# Update score to RRF score
|
||||
fused_candidate = SearchCandidate(
|
||||
chunk_uid=candidate.chunk_uid,
|
||||
item_uid=candidate.item_uid,
|
||||
item_title=candidate.item_title,
|
||||
library_type=candidate.library_type,
|
||||
text_preview=candidate.text_preview,
|
||||
chunk_s3_key=candidate.chunk_s3_key,
|
||||
chunk_index=candidate.chunk_index,
|
||||
score=rrf_score,
|
||||
source="+".join(sources[uid]),
|
||||
metadata={**candidate.metadata, "sources": sources[uid]},
|
||||
)
|
||||
fused.append(fused_candidate)
|
||||
|
||||
# Sort by RRF score descending
|
||||
fused.sort(key=lambda c: c.score, reverse=True)
|
||||
|
||||
# Trim to limit
|
||||
fused = fused[:limit]
|
||||
|
||||
elapsed = time.time() - start
|
||||
FUSION_DURATION.observe(elapsed)
|
||||
|
||||
logger.debug(
|
||||
"RRF fusion input_lists=%d total_candidates=%d fused=%d elapsed=%.3fs",
|
||||
len(result_lists),
|
||||
sum(len(rl) for rl in result_lists),
|
||||
len(fused),
|
||||
elapsed,
|
||||
)
|
||||
|
||||
return fused
|
||||
Reference in New Issue
Block a user