feat: replace server-side RAG with MCP retrieval primitives

- Remove Phase 4 RAG pipeline in favor of retrieval-only architecture
- Add FastMCP server exposing search, get_chunk, list_libraries tools
- Mount MCP endpoints (streamable HTTP + SSE) via Starlette in ASGI config
- Update README to clarify Mnemosyne is a retrieval engine, not RAG
- Let calling LLMs drive synthesis and iterative retrieval themselves
This commit is contained in:
2026-04-26 15:34:26 -04:00
parent 388b37e471
commit 2df22941d2
30 changed files with 1180 additions and 126 deletions

View File

@@ -47,8 +47,8 @@ This **content-type awareness** flows through every layer: chunking strategy, em
``` ```
Query → Vector Search (Neo4j) + Graph Traversal (Cypher) + Full-Text Search Query → Vector Search (Neo4j) + Graph Traversal (Cypher) + Full-Text Search
→ Candidate Fusion → Qwen3-VL Re-ranking → Content-Type Context Injection → Candidate Fusion → Qwen3-VL Re-ranking → Ranked Chunks + Metadata
LLM Response with Citations MCP tool result (the calling LLM does its own synthesis)
``` ```
## Heritage ## Heritage
@@ -82,14 +82,21 @@ celery -A mnemosyne flower --port=5555 # Web monitoring UI
See [Phase 2: Celery Workers & Scheduler](docs/PHASE_2_EMBEDDING_PIPELINE.md#celery-workers--scheduler) for full details on queues, reliability settings, and task progress tracking. See [Phase 2: Celery Workers & Scheduler](docs/PHASE_2_EMBEDDING_PIPELINE.md#celery-workers--scheduler) for full details on queues, reliability settings, and task progress tracking.
## Architecture Note: Retrieval, Not Synthesis
Mnemosyne is a **retrieval engine**, not a RAG pipeline. It stores, embeds, and ranks — it does not synthesize answers.
The earlier roadmap had a server-side RAG layer that took a query and returned a written answer with citations. That layer has been removed. Calling LLMs (Claude via MCP, principally) are perfectly capable of driving iterative retrieval themselves when given the right primitives, and a server-side synthesis hop adds latency, cost, and a place where errors are harder to debug. Letting the calling LLM see chunks directly — and follow citations, pivot mid-search, or call `get_chunk` for full text — beats pre-digesting them.
If a "knowledge subagent" is ever wanted (a wrapper that takes a question and returns a written answer), it lives **outside** Mnemosyne as a thin client over the MCP tools, with its own system prompt. No coupling, no extra inference hop inside the server, and the subagent's behavior can iterate independently.
## Documentation ## Documentation
- **[Architecture Documentation](docs/mnemosyne.html)** — Full system architecture with diagrams - **[Architecture Documentation](docs/mnemosyne.html)** — Full system architecture with diagrams
- **[Phase 1: Foundation](docs/PHASE_1_FOUNDATION.md)** — Project skeleton, Neo4j data model, content-type system - **[Phase 1: Foundation](docs/PHASE_1_FOUNDATION.md)** — Project skeleton, Neo4j data model, content-type system
- **[Phase 2: Embedding Pipeline](docs/PHASE_2_EMBEDDING_PIPELINE.md)** — Qwen3-VL multimodal embedding - **[Phase 2: Embedding Pipeline](docs/PHASE_2_EMBEDDING_PIPELINE.md)** — Qwen3-VL multimodal embedding
- **[Phase 3: Search & Re-ranking](docs/PHASE_3_SEARCH_AND_RERANKING.md)** — Hybrid search + re-ranker - **[Phase 3: Search & Re-ranking](docs/PHASE_3_SEARCH_AND_RERANKING.md)** — Hybrid search + re-ranker
- **[Phase 4: RAG Pipeline](docs/PHASE_4_RAG_PIPELINE.md)** — Content-type-aware generation - **[Phase 5: MCP Server](docs/PHASE_5_MCP_SERVER.md)** — Retrieval primitives for LLMs (`search`, `get_chunk`, `list_libraries`, …)
- **[Phase 5: MCP Server](docs/PHASE_5_MCP_SERVER.md)** — LLM integration interface
- **[Phase 6: Backport to Spelunker](docs/PHASE_6_BACKPORT_TO_SPELUNKER.md)** — Proven patterns flowing back - **[Phase 6: Backport to Spelunker](docs/PHASE_6_BACKPORT_TO_SPELUNKER.md)** — Proven patterns flowing back

144
docs/PHASE_5_MCP_SERVER.md Normal file
View File

@@ -0,0 +1,144 @@
# Phase 5: MCP Server
The MCP (Model Context Protocol) server exposes Mnemosyne's retrieval primitives — search, chunk fetch, and library/collection/item discovery — to LLM clients like Claude Desktop, Cursor, or any MCP-compatible agent.
This is intentionally a **retrieval surface, not a RAG pipeline**. The server returns ranked evidence; the calling LLM is responsible for synthesis, citation, and follow-up. If a "knowledge subagent" wrapper is ever wanted, it lives outside Mnemosyne as a thin client over these tools.
## Architecture
```
┌──────────────────────────┐ ┌─────────────────────┐
│ Claude Desktop / Cursor │ Streamable HTTP │ uvicorn :8001 │
│ (MCP client) │ ─────────────────▶ │ mnemosyne.asgi:app │
└──────────────────────────┘ /mcp/ /mcp/sse └──────┬──────────────┘
┌────────────────┐
│ FastMCP server │
│ + middleware │
└──────┬─────────┘
┌──────────────────┼─────────────────┐
▼ ▼ ▼
┌────────────────┐ ┌──────────────┐ ┌──────────────┐
│ SearchService │ │ Neo4j Cypher │ │ S3 / MinIO │
│ (Phase 3) │ │ discovery │ │ chunk text │
└────────────────┘ └──────────────┘ └──────────────┘
```
The MCP server runs as a **separate Uvicorn ASGI process** alongside the existing Django/Gunicorn WSGI process. Both processes share the same Django settings, Postgres, Neo4j, and S3 — the MCP server is a thin protocol surface, not a duplicate stack.
## Tool surface
| Tool | Purpose | Returns |
|------|---------|---------|
| `search` | Hybrid retrieval: vector + full-text + concept-graph + Synesis re-ranking | Ranked candidates with `chunk_uid`, `text_preview`, score, source |
| `get_chunk` | Fetch the full text of a chunk by `chunk_uid` (preview is only ~500 chars) | Full chunk text + parent item context |
| `list_libraries` | Discover libraries and their `library_type` | uid, name, library_type, description |
| `list_collections` | Discover collections, optional `library_uid` filter | uid, name, description, parent library |
| `list_items` | Discover indexed documents, optional collection / library filter | uid, title, item_type, chunk_count, embedding_status |
`search` accepts these named arguments:
- `query` (required)
- `library_uid`, `library_type`, `collection_uid` — scoping filters (all optional, AND-combined)
- `limit` — default 20
- `rerank` — default `True` (Synesis cross-attention re-ranking when configured)
- `include_images` — default `True`
- `search_types` — default `["vector", "fulltext", "graph"]`
Concept-graph traversal tools (`list_concepts`, `get_concept_neighbors`) are intentionally deferred — ship the search + discovery surface first, observe how clients use it, then expand.
## Authentication
Tools calls require a Bearer token (`MCPToken`). Listing tools is unauthenticated so clients can discover the surface. Tokens are managed via Django admin or the management command:
```bash
python manage.py create_mcp_token --user r@helu.ca --name "Claude Desktop"
```
Optional flags:
- `--tools search,get_chunk` — restrict the token to a whitelist
- `--expires-days 30` — set an expiry
The token is printed once — there's no way to retrieve it later. Revoke or set expiry in the Django admin under **MCP Server → MCP tokens**.
For local development you can set `MCP_REQUIRE_AUTH=False` in your environment to skip auth entirely. **Never disable auth in production.**
## Running the server
```bash
# Development
uvicorn mnemosyne.asgi:app --host 127.0.0.1 --port 8001 --workers 1
# Health check
curl http://localhost:8001/mcp/health
# {"status":"ok"}
```
**Single worker required.** SSE transport keeps session state in worker memory; multi-worker deployments would route POSTs to the wrong worker.
In production, run alongside the WSGI Django process and route via a reverse proxy:
```nginx
location /mcp/ {
proxy_pass http://127.0.0.1:8001;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_buffering off; # required for SSE
proxy_cache off; # required for SSE
proxy_read_timeout 300s;
}
```
## Client configuration
Claude Desktop (`claude_desktop_config.json`):
```json
{
"mcpServers": {
"mnemosyne": {
"url": "http://localhost:8001/mcp/",
"headers": {
"Authorization": "Bearer YOUR_TOKEN_HERE"
}
}
}
}
```
For SSE transport, change the URL to `http://localhost:8001/mcp/sse/`.
## Observability
Prometheus metrics are exported on the WSGI Django side (`/metrics`):
| Metric | Labels | Purpose |
|--------|--------|---------|
| `mcp_tool_invocations_total` | tool, status | Per-tool call counter |
| `mcp_tool_duration_seconds` | tool | Per-tool duration histogram |
| `mcp_auth_failures_total` | reason | Auth-rejection counter (missing token, expired, tool not allowed) |
## Files
| Path | Purpose |
|------|---------|
| `mcp_server/models.py` | `MCPToken` Django ORM model |
| `mcp_server/auth.py` | `resolve_mcp_user`, `MCPAuthMiddleware` |
| `mcp_server/server.py` | FastMCP instance + tool registration |
| `mcp_server/tools/search.py` | `search`, `get_chunk` |
| `mcp_server/tools/discovery.py` | `list_libraries`, `list_collections`, `list_items` |
| `mcp_server/management/commands/create_mcp_token.py` | Token bootstrap command |
| `mnemosyne/asgi.py` | Mounts FastMCP at `/mcp` and `/mcp/sse` |
| `docs/Pattern_Django-MCP_V1-00.md` | Underlying integration pattern (FastMCP + Django ASGI + bearer auth) |
## Testing
```bash
TEST_NEO4J_ENABLED=0 python manage.py test mcp_server \
--testrunner=test_db_manager.django_integration.PostgreSQLTestRunner
```
The mcp_server test suite covers token model, auth resolution, tool registration, and the management command. It does not require Neo4j (set `TEST_NEO4J_ENABLED=0`) — only Postgres via the Docker-backed test runner.

View File

@@ -1,96 +0,0 @@
## Red Panda Approval™
This project follows Red Panda Approval standards - our gold standard for Django application quality. Code must be elegant, reliable, and maintainable to earn the approval of our adorable red panda judges.
### The 5 Sacred Django Criteria
1. **Fresh Migration Test** - Clean migrations from empty database
2. **Elegant Simplicity** - No unnecessary complexity
3. **Observable & Debuggable** - Proper logging and error handling
4. **Consistent Patterns** - Follow Django conventions
5. **Actually Works** - Passes all checks and serves real user needs
### Standards
# Environment
Virtual environment: ~/env/PROJECT/bin/activate
Python version: 3.12
# Code Organization
Maximum file length: 1000 lines
CSS: External .css files only (no inline/embedded)
JS: External .js files only (no inline/embedded)
# Required Packages
- Bootstrap 5.x (no custom CSS unless absolutely necessary)
- Bootstrap Icons (no emojis)
- django-crispy-forms + crispy-bootstrap5
- django-allauth
# Testing
Framework: Django TestCase (not pytest)
Minimum coverage: XX%? (optional)
### Database Conventions
# Development vs Production
- Development: SQLite
- Production: PostgreSQL
- Use dj-database-url for configuration
# Model Naming
- Model names: singular PascalCase (User, BlogPost, OrderItem)
- Related names: plural snake_case with proper English pluralization
- user.blog_posts, order.items
- category.industries (not industrys)
- person.children (not childs)
- analysis.analyses (not analysiss)
- Through tables: describe relationship (ProjectMembership, CourseEnrollment)
# Field Naming
- Foreign keys: singular without _id suffix (author, category, parent)
- Boolean fields: use prefixes (is_active, has_permission, can_edit)
- Date fields: use suffixes (created_at, updated_at, published_on)
- Avoid abbreviations (use description, not desc)
# Required Model Fields
All models should include:
- created_at = models.DateTimeField(auto_now_add=True)
- updated_at = models.DateTimeField(auto_now=True)
Consider adding:
- id = models.UUIDField(primary_key=True) for public-facing models
- is_active = models.BooleanField(default=True) for soft deletes
# Indexing
- Add db_index=True to frequently queried fields
- Use Meta.indexes for composite indexes
- Document why each index exists
# Migrations
- Never edit migrations that have been deployed
- Use meaningful migration names: --name add_email_to_profile
- One logical change per migration when possible
- Test migrations both forward and backward
# Queries
- Use select_related() for foreign keys
- Use prefetch_related() for reverse relations and M2M
- Avoid queries in loops (N+1 problem)
- Use .only() and .defer() for large models
- Add comments explaining complex querysets
## Monitoring & Health Check Endpoints
Follow standard Kubernetes health check endpoints for container orchestration:
### /ready/ - Readiness probe checks if the application is ready to serve traffic
Validates database connectivity
Validates cache connectivity
Returns 200 if ready, 503 if dependencies are unavailable
Used by load balancers to determine if pod should receive traffic
### /live/ - Liveness probe checks if the application process is alive
Simple health check with minimal logic
Returns 200 if Django is responding to requests
Used by Kubernetes to determine if pod should be restarted
Note: For detailed metrics and monitoring, use Prometheus and Alloy integration rather than custom health endpoints.

View File

@@ -0,0 +1 @@
default_app_config = "mcp_server.apps.McpServerConfig"

View File

@@ -0,0 +1,29 @@
from django.contrib import admin
from .models import MCPToken
@admin.register(MCPToken)
class MCPTokenAdmin(admin.ModelAdmin):
list_display = [
"name",
"user",
"is_active",
"masked_token",
"expires_at",
"last_used_at",
"created_at",
]
list_filter = ["is_active"]
search_fields = ["name", "user__email", "user__username"]
readonly_fields = ["token", "last_used_at", "created_at", "updated_at"]
fieldsets = (
(None, {"fields": ("user", "name", "is_active")}),
("Restrictions", {"fields": ("allowed_tools", "expires_at")}),
("Token (shown once at creation)", {"fields": ("token",)}),
("Audit", {"fields": ("last_used_at", "created_at", "updated_at")}),
)
@admin.display(description="Token")
def masked_token(self, obj):
return obj.get_masked_token()

View File

@@ -0,0 +1,7 @@
from django.apps import AppConfig
class McpServerConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "mcp_server"
verbose_name = "MCP Server"

View File

@@ -0,0 +1,105 @@
"""MCP token resolution and FastMCP middleware for bearer-token auth."""
from __future__ import annotations
import logging
from asgiref.sync import sync_to_async
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils import timezone
from fastmcp.server.dependencies import get_http_request
from fastmcp.server.middleware import Middleware, MiddlewareContext
from .metrics import mcp_auth_failures_total
from .models import MCPToken
logger = logging.getLogger(__name__)
STATE_KEY_USER = "mcp_user"
STATE_KEY_TOKEN = "mcp_token"
class MCPAuthError(Exception):
"""Raised when a bearer token cannot be resolved to a valid user."""
def resolve_mcp_user(token_string: str):
"""Resolve a bearer token to (user, MCPToken). Raises MCPAuthError on any failure."""
try:
token = MCPToken.objects.select_related("user").get(token=token_string)
except MCPToken.DoesNotExist:
raise MCPAuthError("Invalid MCP token.")
if not token.is_active:
raise MCPAuthError("Token has been deactivated.")
if token.expires_at and token.expires_at < timezone.now():
raise MCPAuthError("Token has expired.")
if not token.user.is_active:
raise MCPAuthError("User account is disabled.")
token.record_usage()
return token.user, token
class MCPAuthMiddleware(Middleware):
"""
FastMCP middleware that authenticates tool calls via Bearer tokens.
Listing tools/resources is permitted unauthenticated so clients can
discover the surface; calling a tool requires a valid token unless
MCP_REQUIRE_AUTH=False.
"""
async def on_call_tool(
self, context: MiddlewareContext, call_next
):
require_auth = getattr(settings, "MCP_REQUIRE_AUTH", True)
token_string = self._extract_token()
user = None
token = None
if token_string:
try:
user, token = await sync_to_async(
resolve_mcp_user, thread_sensitive=True
)(token_string)
except MCPAuthError as exc:
mcp_auth_failures_total.labels(reason=str(exc)).inc()
if require_auth:
raise PermissionError(str(exc))
elif require_auth:
mcp_auth_failures_total.labels(reason="missing_token").inc()
raise PermissionError("Authentication required. Provide a Bearer token.")
tool_name = self._extract_tool_name(context)
if token and tool_name and not token.can_use_tool(tool_name):
mcp_auth_failures_total.labels(reason="tool_not_allowed").inc()
raise PermissionError(
f"Token does not have permission to call '{tool_name}'."
)
fastmcp_ctx = getattr(context, "fastmcp_context", None)
if fastmcp_ctx and user is not None:
await fastmcp_ctx.set_state(STATE_KEY_USER, user)
await fastmcp_ctx.set_state(STATE_KEY_TOKEN, token)
return await call_next(context)
@staticmethod
def _extract_token() -> str | None:
try:
request = get_http_request()
except RuntimeError:
return None
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
return auth_header[7:].strip() or None
return None
@staticmethod
def _extract_tool_name(context: MiddlewareContext) -> str | None:
msg = getattr(context, "message", None)
params = getattr(msg, "params", None) if msg else None
return getattr(params, "name", None)

View File

@@ -0,0 +1,19 @@
"""Helpers for accessing the request-scoped MCP user/token from inside tools."""
from __future__ import annotations
from fastmcp.server.context import Context
from .auth import STATE_KEY_TOKEN, STATE_KEY_USER
async def get_mcp_user(ctx: Context | None):
if ctx is None:
return None
return await ctx.get_state(STATE_KEY_USER)
async def get_mcp_token(ctx: Context | None):
if ctx is None:
return None
return await ctx.get_state(STATE_KEY_TOKEN)

View File

@@ -0,0 +1,77 @@
"""Create an MCP bearer token for a user. Token is printed once and not retrievable later."""
from datetime import timedelta
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from mcp_server.models import MCPToken
class Command(BaseCommand):
help = "Create an MCP token for a user and print the full token (shown once)."
def add_arguments(self, parser):
parser.add_argument(
"--user",
required=True,
help="Username or email of the owner.",
)
parser.add_argument(
"--name",
required=True,
help="Friendly token name (e.g. 'Claude Desktop').",
)
parser.add_argument(
"--tools",
default="",
help="Comma-separated tool whitelist. Empty = all tools allowed.",
)
parser.add_argument(
"--expires-days",
type=int,
default=None,
help="Days until expiry. Omit for no expiry.",
)
def handle(self, *args, **options):
User = get_user_model()
identifier = options["user"]
try:
user = User.objects.get(email=identifier)
except User.DoesNotExist:
try:
user = User.objects.get(username=identifier)
except User.DoesNotExist:
raise CommandError(f'User "{identifier}" not found.')
if not user.is_active:
raise CommandError(f'User "{identifier}" is inactive.')
allowed_tools = [t.strip() for t in options["tools"].split(",") if t.strip()]
expires_at = None
if options["expires_days"] is not None:
if options["expires_days"] < 1:
raise CommandError("--expires-days must be at least 1.")
expires_at = timezone.now() + timedelta(days=options["expires_days"])
token = MCPToken.objects.create(
user=user,
name=options["name"],
allowed_tools=allowed_tools,
expires_at=expires_at,
)
self.stdout.write(self.style.SUCCESS("MCP token created"))
self.stdout.write(f" Name: {token.name}")
self.stdout.write(f" User: {user}")
if allowed_tools:
self.stdout.write(f" Tools: {', '.join(allowed_tools)}")
else:
self.stdout.write(" Tools: (all)")
if expires_at:
self.stdout.write(f" Expires: {expires_at.isoformat()}")
self.stdout.write(self.style.WARNING(" Token (shown once):"))
self.stdout.write(f" {token.token}")

View File

@@ -0,0 +1,43 @@
"""Prometheus metrics for the MCP server. Scraped via /metrics on the WSGI side."""
import time
from prometheus_client import Counter, Histogram
mcp_tool_invocations_total = Counter(
"mcp_tool_invocations_total",
"Total MCP tool invocations.",
["tool", "status"],
)
mcp_tool_duration_seconds = Histogram(
"mcp_tool_duration_seconds",
"MCP tool execution duration in seconds.",
["tool"],
buckets=(0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0),
)
mcp_auth_failures_total = Counter(
"mcp_auth_failures_total",
"Total MCP authentication failures.",
["reason"],
)
class record_tool_call:
"""Context manager that records invocation count + duration for a tool."""
def __init__(self, tool_name: str):
self.tool_name = tool_name
self.started_at = 0.0
def __enter__(self):
self.started_at = time.monotonic()
return self
def __exit__(self, exc_type, exc, tb):
duration = time.monotonic() - self.started_at
status = "error" if exc_type else "success"
mcp_tool_invocations_total.labels(tool=self.tool_name, status=status).inc()
mcp_tool_duration_seconds.labels(tool=self.tool_name).observe(duration)
return False # never suppress exceptions

View File

@@ -0,0 +1,35 @@
# Generated by Django 5.2.13 on 2026-04-26 18:59
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='MCPToken',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('token', models.CharField(db_index=True, max_length=64, unique=True)),
('name', models.CharField(max_length=100)),
('is_active', models.BooleanField(default=True)),
('expires_at', models.DateTimeField(blank=True, null=True)),
('last_used_at', models.DateTimeField(blank=True, null=True)),
('allowed_tools', models.JSONField(blank=True, default=list)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='mcp_tokens', to=settings.AUTH_USER_MODEL)),
],
options={
'ordering': ['-created_at'],
},
),
]

View File

@@ -0,0 +1,56 @@
import secrets
from django.conf import settings
from django.db import models
from django.utils import timezone
class MCPToken(models.Model):
"""Bearer token for authenticating MCP tool calls. See docs/Pattern_Django-MCP_V1-00.md."""
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="mcp_tokens",
)
token = models.CharField(max_length=64, unique=True, db_index=True)
name = models.CharField(max_length=100)
is_active = models.BooleanField(default=True)
expires_at = models.DateTimeField(null=True, blank=True)
last_used_at = models.DateTimeField(null=True, blank=True)
allowed_tools = models.JSONField(default=list, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["-created_at"]
def __str__(self):
return f"{self.name} ({self.user})"
def save(self, **kwargs):
if not self.token:
self.token = secrets.token_urlsafe(48)
super().save(**kwargs)
@property
def is_valid(self) -> bool:
if not self.is_active:
return False
if self.expires_at and self.expires_at < timezone.now():
return False
return True
def can_use_tool(self, tool_name: str) -> bool:
if not self.allowed_tools:
return True
return tool_name in self.allowed_tools
def record_usage(self):
self.last_used_at = timezone.now()
self.save(update_fields=["last_used_at"])
def get_masked_token(self) -> str:
if len(self.token) > 8:
return f"{'*' * (len(self.token) - 8)}{self.token[-8:]}"
return "*" * len(self.token)

View File

@@ -0,0 +1,53 @@
"""FastMCP server instance for Mnemosyne. Imported by asgi.py at startup."""
from __future__ import annotations
from fastmcp import FastMCP
from .auth import MCPAuthMiddleware
from .tools import register_discovery_tools, register_search_tools
INSTRUCTIONS = """\
Mnemosyne is a content-type-aware, multimodal knowledge base. It indexes
documents (text, images, audio metadata) into a Neo4j knowledge graph and
exposes them through hybrid search (vector + full-text + concept-graph) with
optional Synesis re-ranking.
Content is organized into Libraries; each library has a `library_type` that
shapes how content is chunked, embedded, and re-ranked:
- fiction — Novels, short stories. Cover art available.
- nonfiction — General non-fiction prose.
- technical — Manuals, textbooks, docs. Diagrams and code-like content.
- music — Lyrics, liner notes, album artwork.
- film — Scripts, synopses, stills.
- art — Catalogs, descriptions, artwork itself.
- journal — Personal entries; temporal/reflective.
Tools:
- search Hybrid retrieval. Filter by library_uid, library_type,
or collection_uid. Returns ranked chunks with
text_preview (~500 chars) and chunk_uid.
- get_chunk Fetch the full text of a chunk by its chunk_uid.
- list_libraries Discover libraries (and their library_type).
- list_collections Discover collections, optionally per library.
- list_items Discover indexed items (documents).
Workflow: list_libraries → search(query, library_type=...) → get_chunk(chunk_uid)
when the preview isn't enough. The calling LLM is responsible for synthesis
and citation — Mnemosyne returns evidence, not answers.
"""
def build_server() -> FastMCP:
mcp = FastMCP(
"mnemosyne",
instructions=INSTRUCTIONS,
)
mcp.add_middleware(MCPAuthMiddleware())
register_search_tools(mcp)
register_discovery_tools(mcp)
return mcp
mcp = build_server()

View File

View File

@@ -0,0 +1,53 @@
"""Tests for resolve_mcp_user."""
from datetime import timedelta
from django.contrib.auth import get_user_model
from django.test import TestCase
from django.utils import timezone
from mcp_server.auth import MCPAuthError, resolve_mcp_user
from mcp_server.models import MCPToken
User = get_user_model()
class ResolveMCPUserTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username="bob", email="bob@example.com", password="pw"
)
self.token = MCPToken.objects.create(user=self.user, name="t")
def test_resolves_valid_token(self):
user, token = resolve_mcp_user(self.token.token)
self.assertEqual(user.pk, self.user.pk)
self.assertEqual(token.pk, self.token.pk)
def test_records_usage(self):
self.assertIsNone(self.token.last_used_at)
resolve_mcp_user(self.token.token)
self.token.refresh_from_db()
self.assertIsNotNone(self.token.last_used_at)
def test_invalid_token_raises(self):
with self.assertRaises(MCPAuthError):
resolve_mcp_user("not-a-real-token")
def test_inactive_token_raises(self):
self.token.is_active = False
self.token.save()
with self.assertRaises(MCPAuthError):
resolve_mcp_user(self.token.token)
def test_expired_token_raises(self):
self.token.expires_at = timezone.now() - timedelta(hours=1)
self.token.save()
with self.assertRaises(MCPAuthError):
resolve_mcp_user(self.token.token)
def test_disabled_user_raises(self):
self.user.is_active = False
self.user.save()
with self.assertRaises(MCPAuthError):
resolve_mcp_user(self.token.token)

View File

@@ -0,0 +1,52 @@
"""Tests for the create_mcp_token management command."""
from io import StringIO
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.core.management.base import CommandError
from django.test import TestCase
from mcp_server.models import MCPToken
User = get_user_model()
class CreateMCPTokenCommandTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username="carol", email="carol@example.com", password="pw"
)
def test_create_basic_token(self):
out = StringIO()
call_command("create_mcp_token", user="carol@example.com", name="CLI", stdout=out)
self.assertEqual(MCPToken.objects.count(), 1)
self.assertIn("CLI", out.getvalue())
def test_lookup_by_username(self):
out = StringIO()
call_command("create_mcp_token", user="carol", name="CLI2", stdout=out)
self.assertEqual(MCPToken.objects.count(), 1)
def test_unknown_user_raises(self):
with self.assertRaises(CommandError):
call_command("create_mcp_token", user="nobody@x.com", name="x")
def test_inactive_user_raises(self):
self.user.is_active = False
self.user.save()
with self.assertRaises(CommandError):
call_command("create_mcp_token", user="carol", name="x")
def test_tool_whitelist_parsed(self):
out = StringIO()
call_command(
"create_mcp_token",
user="carol",
name="Restricted",
tools="search,get_chunk",
stdout=out,
)
token = MCPToken.objects.get(name="Restricted")
self.assertEqual(token.allowed_tools, ["search", "get_chunk"])

View File

@@ -0,0 +1,25 @@
"""Tests that the FastMCP server registers the expected tools."""
import asyncio
from django.test import TestCase
from mcp_server.server import mcp
EXPECTED_TOOLS = {"search", "get_chunk", "list_libraries", "list_collections", "list_items"}
class ServerRegistrationTest(TestCase):
def test_expected_tools_registered(self):
tools = asyncio.run(mcp.get_tools())
self.assertEqual(EXPECTED_TOOLS, set(tools.keys()))
def test_tool_descriptions_within_limit(self):
tools = asyncio.run(mcp.get_tools())
for name, tool in tools.items():
description = tool.description or ""
self.assertLessEqual(
len(description), 1024,
f"Tool '{name}' description exceeds 1024 chars (MCP spec limit).",
)

View File

@@ -0,0 +1,63 @@
"""Tests for the MCPToken model."""
from datetime import timedelta
from django.contrib.auth import get_user_model
from django.test import TestCase
from django.utils import timezone
from mcp_server.models import MCPToken
User = get_user_model()
class MCPTokenModelTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username="alice", email="alice@example.com", password="pw"
)
def test_token_auto_generated(self):
token = MCPToken.objects.create(user=self.user, name="t")
self.assertTrue(token.token)
self.assertGreater(len(token.token), 20)
def test_active_token_is_valid(self):
token = MCPToken.objects.create(user=self.user, name="t")
self.assertTrue(token.is_valid)
def test_inactive_token_not_valid(self):
token = MCPToken.objects.create(user=self.user, name="t", is_active=False)
self.assertFalse(token.is_valid)
def test_expired_token_not_valid(self):
token = MCPToken.objects.create(
user=self.user,
name="t",
expires_at=timezone.now() - timedelta(hours=1),
)
self.assertFalse(token.is_valid)
def test_unrestricted_permits_all(self):
token = MCPToken.objects.create(user=self.user, name="t")
self.assertTrue(token.can_use_tool("anything"))
def test_tool_whitelist(self):
token = MCPToken.objects.create(
user=self.user, name="t", allowed_tools=["search"]
)
self.assertTrue(token.can_use_tool("search"))
self.assertFalse(token.can_use_tool("get_chunk"))
def test_record_usage(self):
token = MCPToken.objects.create(user=self.user, name="t")
self.assertIsNone(token.last_used_at)
token.record_usage()
token.refresh_from_db()
self.assertIsNotNone(token.last_used_at)
def test_masked_token(self):
token = MCPToken.objects.create(user=self.user, name="t")
masked = token.get_masked_token()
self.assertTrue(masked.endswith(token.token[-8:]))
self.assertIn("*", masked)

View File

@@ -0,0 +1,4 @@
from .discovery import register_discovery_tools
from .search import register_search_tools
__all__ = ["register_search_tools", "register_discovery_tools"]

View File

@@ -0,0 +1,180 @@
"""Discovery MCP tools: list libraries, collections, and items."""
from __future__ import annotations
from typing import Any
from asgiref.sync import sync_to_async
from ..metrics import record_tool_call
DEFAULT_LIMIT = 50
MAX_LIMIT = 200
def _clamp(limit: int) -> int:
if limit < 1:
return 1
return min(limit, MAX_LIMIT)
def register_discovery_tools(mcp):
@mcp.tool
async def list_libraries(limit: int = DEFAULT_LIMIT, offset: int = 0) -> dict[str, Any]:
"""List Mnemosyne libraries. Each library has a content-aware library_type
(fiction, nonfiction, technical, music, film, art, journal) that drives
chunking, embedding, and re-ranking. Returns uid, name, library_type,
description for each library — use the uid or library_type to scope a
subsequent search.
"""
with record_tool_call("list_libraries"):
return await sync_to_async(_query_libraries, thread_sensitive=True)(
_clamp(limit), max(offset, 0)
)
@mcp.tool
async def list_collections(
library_uid: str | None = None,
limit: int = DEFAULT_LIMIT,
offset: int = 0,
) -> dict[str, Any]:
"""List collections, optionally filtered by parent library_uid.
Collections group related items inside a library (e.g. a series of novels,
a multi-volume manual). Returns uid, name, description, library_uid,
library_name. Use the uid to scope a subsequent search to one collection.
"""
with record_tool_call("list_collections"):
return await sync_to_async(_query_collections, thread_sensitive=True)(
library_uid, _clamp(limit), max(offset, 0)
)
@mcp.tool
async def list_items(
collection_uid: str | None = None,
library_uid: str | None = None,
limit: int = DEFAULT_LIMIT,
offset: int = 0,
) -> dict[str, Any]:
"""List items (the indexed documents/files), optionally filtered by
collection_uid or library_uid. Returns uid, title, item_type, file_type,
chunk_count, image_count, embedding_status. Use chunk_count to gauge
document size; use embedding_status to skip items that are not yet
searchable (only 'completed' items appear in search results).
"""
with record_tool_call("list_items"):
return await sync_to_async(_query_items, thread_sensitive=True)(
collection_uid, library_uid, _clamp(limit), max(offset, 0)
)
def _query_libraries(limit: int, offset: int) -> dict[str, Any]:
from neomodel import db
rows, _ = db.cypher_query(
"MATCH (l:Library) RETURN l.uid, l.name, l.library_type, l.description "
"ORDER BY l.name SKIP $offset LIMIT $limit",
{"offset": offset, "limit": limit},
)
return {
"libraries": [
{
"uid": uid,
"name": name,
"library_type": library_type,
"description": description,
}
for (uid, name, library_type, description) in rows
],
"limit": limit,
"offset": offset,
}
def _query_collections(
library_uid: str | None, limit: int, offset: int
) -> dict[str, Any]:
from neomodel import db
if library_uid:
cypher = (
"MATCH (l:Library {uid: $library_uid})-[:CONTAINS]->(c:Collection) "
"RETURN c.uid, c.name, c.description, l.uid, l.name "
"ORDER BY c.name SKIP $offset LIMIT $limit"
)
params = {"library_uid": library_uid, "offset": offset, "limit": limit}
else:
cypher = (
"MATCH (l:Library)-[:CONTAINS]->(c:Collection) "
"RETURN c.uid, c.name, c.description, l.uid, l.name "
"ORDER BY l.name, c.name SKIP $offset LIMIT $limit"
)
params = {"offset": offset, "limit": limit}
rows, _ = db.cypher_query(cypher, params)
return {
"collections": [
{
"uid": c_uid,
"name": c_name,
"description": c_desc,
"library_uid": l_uid,
"library_name": l_name,
}
for (c_uid, c_name, c_desc, l_uid, l_name) in rows
],
"limit": limit,
"offset": offset,
}
def _query_items(
collection_uid: str | None,
library_uid: str | None,
limit: int,
offset: int,
) -> dict[str, Any]:
from neomodel import db
where = []
params: dict[str, Any] = {"offset": offset, "limit": limit}
if collection_uid:
where.append("c.uid = $collection_uid")
params["collection_uid"] = collection_uid
if library_uid:
where.append("l.uid = $library_uid")
params["library_uid"] = library_uid
where_clause = ("WHERE " + " AND ".join(where)) if where else ""
cypher = (
"MATCH (l:Library)-[:CONTAINS]->(c:Collection)-[:CONTAINS]->(i:Item) "
f"{where_clause} "
"RETURN i.uid, i.title, i.item_type, i.file_type, "
"i.chunk_count, i.image_count, i.embedding_status "
"ORDER BY i.title SKIP $offset LIMIT $limit"
)
rows, _ = db.cypher_query(cypher, params)
return {
"items": [
{
"uid": uid,
"title": title,
"item_type": item_type,
"file_type": file_type,
"chunk_count": chunk_count,
"image_count": image_count,
"embedding_status": embedding_status,
}
for (
uid,
title,
item_type,
file_type,
chunk_count,
image_count,
embedding_status,
) in rows
],
"limit": limit,
"offset": offset,
}

View File

@@ -0,0 +1,127 @@
"""Search-related MCP tools: hybrid `search` and `get_chunk` for full text."""
from __future__ import annotations
from dataclasses import asdict
from typing import Any
from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.files.storage import default_storage
from fastmcp.server.context import Context
from ..context import get_mcp_user
from ..metrics import record_tool_call
DEFAULT_SEARCH_TYPES = ["vector", "fulltext", "graph"]
def register_search_tools(mcp):
@mcp.tool
async def search(
query: str,
library_uid: str | None = None,
library_type: str | None = None,
collection_uid: str | None = None,
limit: int = 20,
rerank: bool = True,
include_images: bool = True,
search_types: list[str] | None = None,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Hybrid retrieval over Mnemosyne: vector + full-text + concept-graph
candidates fused by RRF and optionally re-ranked by Synesis.
Filters: library_uid (exact library), library_type (one of fiction,
nonfiction, technical, music, film, art, journal), or collection_uid.
Set rerank=False to skip re-ranking. search_types defaults to all three.
Returns ranked candidates with chunk_uid (use get_chunk for full text),
item_uid/item_title for citation, library_type, text_preview (~500 chars),
score, and source. Also returns matching images when include_images=True.
"""
types = search_types or DEFAULT_SEARCH_TYPES
with record_tool_call("search"):
user = await get_mcp_user(ctx)
return await sync_to_async(_run_search, thread_sensitive=True)(
user=user,
query=query,
library_uid=library_uid,
library_type=library_type,
collection_uid=collection_uid,
limit=limit,
rerank=rerank,
include_images=include_images,
search_types=types,
)
@mcp.tool
async def get_chunk(chunk_uid: str, ctx: Context | None = None) -> dict[str, Any]:
"""Fetch the full text of a chunk by its uid (typically obtained from `search`).
Returns the chunk text plus parent item context: chunk_uid, chunk_index,
item_uid, item_title, library_type, text. Use this when the 500-character
text_preview from `search` isn't enough.
"""
with record_tool_call("get_chunk"):
return await sync_to_async(_load_chunk, thread_sensitive=True)(chunk_uid)
def _run_search(*, user, query, library_uid, library_type, collection_uid, limit,
rerank, include_images, search_types) -> dict[str, Any]:
from library.services.search import SearchRequest, SearchService
req = SearchRequest(
query=query,
library_uid=library_uid,
library_type=library_type,
collection_uid=collection_uid,
search_types=search_types,
limit=limit,
vector_top_k=getattr(settings, "SEARCH_VECTOR_TOP_K", 50),
fulltext_top_k=getattr(settings, "SEARCH_FULLTEXT_TOP_K", 30),
rerank=rerank,
include_images=include_images,
)
service = SearchService(user=user)
response = service.search(req)
return {
"query": response.query,
"candidates": [asdict(c) for c in response.candidates],
"images": [asdict(i) for i in response.images],
"total_candidates": response.total_candidates,
"search_time_ms": response.search_time_ms,
"reranker_used": response.reranker_used,
"reranker_model": response.reranker_model,
"search_types_used": response.search_types_used,
}
def _load_chunk(chunk_uid: str) -> dict[str, Any]:
from neomodel import db
rows, _ = db.cypher_query(
"MATCH (l:Library)-[:CONTAINS]->(:Collection)-[:CONTAINS]->"
"(i:Item)-[:HAS_CHUNK]->(c:Chunk {uid: $uid}) "
"RETURN c.uid, c.chunk_index, c.chunk_s3_key, "
"i.uid, i.title, l.library_type LIMIT 1",
{"uid": chunk_uid},
)
if not rows:
raise ValueError(f"Chunk not found: {chunk_uid}")
c_uid, chunk_index, chunk_s3_key, item_uid, item_title, library_type = rows[0]
text = ""
if chunk_s3_key:
with default_storage.open(chunk_s3_key, "rb") as fh:
text = fh.read().decode("utf-8", errors="replace")
return {
"chunk_uid": c_uid,
"chunk_index": chunk_index,
"item_uid": item_uid,
"item_title": item_title,
"library_type": library_type,
"text": text,
}

View File

@@ -1,16 +1,52 @@
""" """ASGI config for Mnemosyne.
ASGI config for mnemosyne project.
It exposes the ASGI callable as a module-level variable named ``application``. Serves the FastMCP server at /mcp (streamable HTTP) and /mcp/sse (SSE) plus a
/mcp/health endpoint. The Django ASGI app is also mounted at the root for any
other ASGI consumers — but the WSGI server (Gunicorn) is still the primary
serving path for Django views.
For more information on this file, see Run separately from the Django WSGI server:
https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/ uvicorn mnemosyne.asgi:app --host 0.0.0.0 --port 8001 --workers 1
""" """
import os import os
from contextlib import asynccontextmanager
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mnemosyne.settings") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mnemosyne.settings")
application = get_asgi_application() import django # noqa: E402
django.setup()
from django.core.asgi import get_asgi_application # noqa: E402
from starlette.applications import Starlette # noqa: E402
from starlette.responses import JSONResponse # noqa: E402
from starlette.routing import Mount, Route # noqa: E402
from mcp_server.server import mcp # noqa: E402
application = get_asgi_application() # Django ASGI app (kept for compatibility)
mcp_http_app = mcp.http_app(path="/", transport="streamable-http")
mcp_sse_app = mcp.http_app(path="/", transport="sse")
async def health(request):
return JSONResponse({"status": "ok"})
@asynccontextmanager
async def lifespan(app):
async with mcp_http_app.lifespan(app), mcp_sse_app.lifespan(app):
yield
app = Starlette(
routes=[
Route("/mcp/health", health),
Mount("/mcp/sse", app=mcp_sse_app),
Mount("/mcp", app=mcp_http_app),
Mount("/", app=application),
],
lifespan=lifespan,
)

View File

@@ -57,8 +57,12 @@ INSTALLED_APPS = [
"themis", "themis",
"library", "library",
"llm_manager", "llm_manager",
"mcp_server",
] ]
# --- MCP Server ---
MCP_REQUIRE_AUTH = env.bool("MCP_REQUIRE_AUTH", default=True)
MIDDLEWARE = [ MIDDLEWARE = [
"django_prometheus.middleware.PrometheusBeforeMiddleware", "django_prometheus.middleware.PrometheusBeforeMiddleware",
"django.middleware.security.SecurityMiddleware", "django.middleware.security.SecurityMiddleware",

View File

@@ -53,22 +53,29 @@ class TestDatabaseConfig:
return hashlib.sha256(data.encode()).hexdigest()[:16] return hashlib.sha256(data.encode()).hexdigest()[:16]
def get_postgres_command_args(self) -> List[str]: def get_postgres_command_args(self) -> List[str]:
"""Return PostgreSQL command args optimised for testing (fsync=off etc.).""" """Return PostgreSQL command args optimised for testing (fsync=off etc.).
return [
"postgres", Each `-c key=value` must be passed as two separate exec args; the
f"-c max_connections={self.max_connections}", Postgres image's entrypoint passes argv straight through, so a glued
f"-c shared_buffers={self.shared_buffers}", ``"-c key=value"`` arg is parsed by postgres as the parameter name
"-c fsync=off", ``" key"`` (with a leading space) and rejected.
"-c synchronous_commit=off", """
"-c full_page_writes=off", args = ["postgres"]
] + ( params = [
[ f"max_connections={self.max_connections}",
"-c log_min_duration_statement=0", f"shared_buffers={self.shared_buffers}",
"-c log_statement=all", "fsync=off",
"synchronous_commit=off",
"full_page_writes=off",
]
if self.enable_query_logging:
params += [
"log_min_duration_statement=0",
"log_statement=all",
] ]
if self.enable_query_logging for p in params:
else [] args += ["-c", p]
) return args
@dataclass @dataclass

View File

@@ -49,7 +49,18 @@ class PostgreSQLTestRunner(DiscoverRunner):
self.pg_manager.cleanup() self.pg_manager.cleanup()
raise RuntimeError("PostgreSQL test container failed to start") raise RuntimeError("PostgreSQL test container failed to start")
settings.DATABASES["default"] = self.pg_manager.get_django_database_config() from django.db import connections
db_cfg = self.pg_manager.get_django_database_config()
# Preserve Django's defaulted TEST sub-dict (CHARSET/MIRROR/MIGRATE…).
existing_test = connections["default"].settings_dict.get("TEST", {})
merged_test = {**existing_test, **db_cfg.get("TEST", {})}
db_cfg["TEST"] = merged_test
settings.DATABASES["default"] = db_cfg
# The default connection was instantiated at Django bootstrap; its
# settings_dict is independent of settings.DATABASES. Sync it
# manually so test code talks to the container, not the dev DB.
connections["default"].settings_dict.update(db_cfg)
logger.info("PostgreSQL test DB ready on port %s", self.pg_manager.assigned_port) logger.info("PostgreSQL test DB ready on port %s", self.pg_manager.assigned_port)
# ── Neo4j ────────────────────────────────────────────────────── # ── Neo4j ──────────────────────────────────────────────────────
@@ -83,7 +94,10 @@ class PostgreSQLTestRunner(DiscoverRunner):
logger.info("Neo4j test DB ready on bolt port %s", self.neo4j_manager.assigned_bolt_port) logger.info("Neo4j test DB ready on bolt port %s", self.neo4j_manager.assigned_bolt_port)
# Run Django's standard setup (creates tables via migrations) # Containers were just created — DB already exists, so flip keepdb to
# skip "CREATE DATABASE" (which would fail; test_user is not superuser).
# Django still runs migrations to populate the schema.
self.keepdb = True
return super().setup_databases(**kwargs) return super().setup_databases(**kwargs)
def teardown_databases(self, old_config, **kwargs) -> None: def teardown_databases(self, old_config, **kwargs) -> None:

View File

@@ -109,13 +109,19 @@ class DockerPostgreSQLManager:
} }
def get_django_database_config(self) -> Dict[str, Any]: def get_django_database_config(self) -> Dict[str, Any]:
# The container is launched with POSTGRES_DB=<get_database_name()>, so
# the test DB already exists. Tell Django to reuse it as the TEST name
# — otherwise DiscoverRunner prepends another "test_" prefix and tries
# to CREATE DATABASE, which fails as the test_user lacks superuser.
db_name = self.config.get_database_name()
return { return {
"ENGINE": "django.db.backends.postgresql", "ENGINE": "django.db.backends.postgresql",
"NAME": self.config.get_database_name(), "NAME": db_name,
"USER": "test_user", "USER": "test_user",
"PASSWORD": "test_password", "PASSWORD": "test_password",
"HOST": "127.0.0.1", "HOST": "127.0.0.1",
"PORT": str(self._assigned_port), "PORT": str(self._assigned_port),
"TEST": {"NAME": db_name},
} }
# ── Helpers ──────────────────────────────────────────────────────── # ── Helpers ────────────────────────────────────────────────────────

View File

@@ -30,6 +30,9 @@ dependencies = [
"tokenizers>=0.20,<1.0", "tokenizers>=0.20,<1.0",
"Pillow>=10.0,<12.0", "Pillow>=10.0,<12.0",
"requests>=2.31,<3.0", "requests>=2.31,<3.0",
# Phase 5: MCP Server
"fastmcp>=2.0,<3.0",
"uvicorn[standard]>=0.30,<1.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]