Files
mnemosyne/docs/mnemosyne_integration.md
Robert Helewka 8d650c0570
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 55s
CVE Scan & Docker Build / build-and-push (push) Successful in 2m15s
docs(mnemosyne): update Phase 3 status to implemented
Mark per-turn JWT access control as implemented in the Mnemosyne
integration docs. Update Phase 2/3 status tables, replace deferred
language with concrete implementation details, and document the
`MCPSigningKey` model, `resolve_mcp_jwt`, and `_scope_from_claims`
components now live in the MCP server.
2026-05-04 15:06:34 -04:00

20 KiB

Mnemosyne Integration — Daedalus & Pallas Reference

This document describes Mnemosyne's role in the Daedalus + Pallas architecture and what's actually built today. The Daedalus-side spec lives in daedalus/docs/mnemosyne_integration.md.


Overview

Mnemosyne exposes two interfaces for the wider Ouranos ecosystem:

  1. REST API (/library/api/*) — consumed by the Daedalus backend (HTTP Basic auth, service account daedalus-service) for workspace lifecycle and asynchronous file ingestion. Phase 1, implemented.
  2. MCP Server (port 22091 internal, /mcp/ via nginx on 23090) — exposes search, browse, and retrieval tools. Phase 5 of Mnemosyne's own roadmap, implemented with workspace_id scoping and per-turn JWT access control. Consumed by Pallas FastAgents in production (Daedalus integration Phase 2, implemented — see Phase 3 of this doc).

Phase status

Phase What Status
1. REST workspace + ingest API for Daedalus POST /workspaces/, DELETE /workspaces/{id}/, POST /ingest/, GET /jobs/{id}/ Implemented
2. MCP Server (Mnemosyne roadmap Phase 5) search, get_chunk, list_libraries, list_collections, list_items, get_health Implemented (workspace_id scoping enforced in Cypher)
3. Per-turn signed-token access control for Daedalus integration Daedalus mints HS256 JWTs carrying {ws, libs} claims; Mnemosyne validates via MCPSigningKey and scopes search via _scope_from_claims Implemented

1. MCP Server

Port & URL

Endpoint Internal (container) Public (via nginx on host port 23181)
Django REST API http://app:8000/ https://mnemosyne.ouranos.helu.ca/
MCP server http://mcp:8001/mcp/ https://mnemosyne.ouranos.helu.ca/mcp/
MCP health http://mcp:8001/mcp/health https://mnemosyne.ouranos.helu.ca/healthz
Django liveness http://app:8000/live/ internal only
Django readiness http://app:8000/ready/ internal only

Project structure (as built)

Follows the Django MCP Pattern:

mnemosyne/mnemosyne/mcp_server/
├── __init__.py
├── server.py              # FastMCP instance + tool registration
├── auth.py                # MCPAuthMiddleware
├── context.py             # get_mcp_user(), get_mcp_token()
└── tools/
    ├── __init__.py
    ├── search.py          # register_search_tools(mcp) → search, get_chunk
    ├── discovery.py       # register_discovery_tools(mcp) → list_libraries, list_collections, list_items
    └── health.py          # register_health_tools(mcp) → get_health

The ASGI mount lives at mnemosyne/mnemosyne/asgi.py (project-level) — it composes the FastMCP app at /mcp/ with a 307 redirect from bare /mcp so MCP clients that omit the trailing slash still land correctly.

Tools (as implemented)

Tool Module Description
search search.py Hybrid vector + full-text + concept-graph search → fusion → optional Synesis re-rank. Accepts library_uid, library_type, collection_uid, and (system-injected, undocumented to LLM) workspace_id for scoping.
get_chunk search.py Fetch full text of a chunk by uid (typically obtained from search). Honors workspace_id scoping.
list_libraries discovery.py List libraries with uid, name, library_type, description. Workspace_id-aware.
list_collections discovery.py List collections, optionally filtered by parent library. Workspace_id-aware.
list_items discovery.py List items with chunk_count, image_count, embedding_status. Workspace_id-aware.
get_health health.py Check Neo4j, S3, embedding model reachability. Used by Pallas health pollers.

The workspace_id parameter is present on every search/discovery tool but is deliberately undocumented in the LLM-facing tool description — it's a system-injected field the calling LLM should never know about. A workspace-scoped query returns ONLY that workspace's content; an unscoped query (workspace_id is NULL) returns ONLY global libraries. There is no mode that mixes the two — see library/services/search.py, _WORKSPACE_SCOPE_CLAUSE.

MCP Resources

Resource URI Source
mnemosyne://library-types library/content_types.pyLIBRARY_TYPE_DEFAULTS
mnemosyne://libraries Library.nodes.order_by("name") serialized to JSON

Deployment

Production runs as four containers from a single image via docker-compose.yaml. The nginx web container is the only publicly-exposed service, listening on host port 23181, which HAProxy on Titania reverse-proxies as https://mnemosyne.ouranos.helu.ca.

Container Internal port Role
app 8000 Django REST API + admin (gunicorn)
mcp 8001 FastMCP ASGI server (uvicorn)
worker Celery worker (embedding/ingest/batch)
web 80 → host 23181 nginx reverse proxy + static files

Auth is controlled by MCP_REQUIRE_AUTH in .env. Production sets it to True; the internal validator and ad-hoc testing may use False on an isolated network.

⚠️ DEBUG LOG Points — MCP Server

Location Log Event Level What to Log
Tool dispatch mcp_tool_called DEBUG Tool name, all input parameters
Vector search mcp_search_vector_query DEBUG Query text, embedding dims, library filter, limit
Vector search result mcp_search_vector_results DEBUG Candidate count, top/lowest scores
Full-text search mcp_search_fulltext_query DEBUG Query terms, index used
Re-ranking mcp_search_rerank DEBUG Candidates in/out, reranker model, duration_ms
Graph traversal mcp_graph_traverse DEBUG Starting node UID, relationships, depth, nodes visited
Neo4j query mcp_neo4j_query DEBUG Cypher query (parameterized), execution time_ms
Tool response mcp_tool_response DEBUG Tool name, result size (bytes/items), duration_ms
Health check mcp_health_check DEBUG Each dependency status, overall result

Important: All neomodel ORM calls inside async tool functions must be wrapped with sync_to_async(thread_sensitive=True).


2. REST API for Daedalus

All endpoints require HTTP Basic auth as daedalus-service. They are consumed by the Daedalus FastAPI backend only — not by any frontend.

Workspace lifecycle

Method Route Purpose
POST /library/api/workspaces/ Create workspace Library. Body: {workspace_id, name, library_type, description?}. Idempotent on workspace_id. library_type frozen at create.
GET /library/api/workspaces/{workspace_id}/ Workspace status (item_count, chunk_count, library_uid).
DELETE /library/api/workspaces/{workspace_id}/ Delete workspace Library + reachable content. Concept-safe: orphan-only Concept GC; concepts referenced by other libraries survive.

Ingest

Method Route Purpose
POST /library/api/ingest/ Accept a file (already in S3) for ingestion + embedding
GET /library/api/jobs/{job_id}/ Poll job status
POST /library/api/jobs/{job_id}/retry/ Retry a failed job
GET /library/api/jobs/?status=&library_uid= List recent jobs

Model: IngestJob

Lives in library/models.py (Django ORM on PostgreSQL, not Neo4j). Migration: library/migrations/0001_initial.py.

class IngestJob(models.Model):
    """Tracks the lifecycle of a content ingestion + embedding job."""

    id = models.CharField(max_length=64, primary_key=True)
    item_uid = models.CharField(max_length=64, db_index=True)
    celery_task_id = models.CharField(max_length=255, blank=True)

    status = models.CharField(
        max_length=20,
        choices=[
            ("pending", "Pending"),
            ("processing", "Processing"),
            ("completed", "Completed"),
            ("failed", "Failed"),
        ],
        default="pending",
        db_index=True,
    )
    progress = models.CharField(max_length=50, default="queued")
    error = models.TextField(blank=True, null=True)
    retry_count = models.PositiveIntegerField(default=0)

    chunks_created = models.PositiveIntegerField(default=0)
    concepts_extracted = models.PositiveIntegerField(default=0)
    embedding_model = models.CharField(max_length=100, blank=True)

    source = models.CharField(max_length=50, default="")
    source_ref = models.CharField(max_length=200, blank=True)
    s3_key = models.CharField(max_length=500)

    created_at = models.DateTimeField(auto_now_add=True)
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ["-created_at"]
        indexes = [
            models.Index(fields=["status", "-created_at"]),
            models.Index(fields=["source", "source_ref"]),
        ]

Ingest Request Schema

The target Library can be specified by either workspace_id (preferred for Daedalus) or library_uid. Idempotency key: (library, source_ref, content_hash). Same triple → existing job returned. New content_hash for the same source_ref → supersedes the prior Item.

{
  "s3_key": "workspaces/ws_abc/files/f_def/report.pdf",
  "title": "Q4 Technical Report",
  "workspace_id": "ws_abc",
  "file_type": "application/pdf",
  "file_size": 245000,
  "content_hash": "<sha256 hex, 64 chars>",
  "source": "daedalus",
  "source_ref": "ws_abc/f_def"
}

Job Status Response Schema

{
  "job_id": "job_789xyz",
  "item_uid": "item_abc123",
  "status": "processing",
  "progress": "embedding",
  "chunks_created": 0,
  "concepts_extracted": 0,
  "embedding_model": "qwen3-vl-embedding-8b",
  "started_at": "2026-03-12T15:42:01Z",
  "completed_at": null,
  "error": null
}

⚠️ DEBUG LOG Points — Ingest Endpoint

Location Log Event Level What to Log
Request received ingest_request_received INFO s3_key, title, library_uid, file_type, source, source_ref
S3 key validation ingest_s3_key_check DEBUG s3_key, exists (bool), bucket name
Library lookup ingest_library_lookup DEBUG library_uid, found (bool), library_type
Item node creation ingest_item_created INFO item_uid, title, library_uid, collection_uid
Celery task dispatch ingest_task_dispatched INFO job_id, item_uid, celery_task_id, queue name
Celery task dispatch failure ingest_task_dispatch_failed ERROR job_id, item_uid, exception details

3. Celery Embedding Pipeline

Task: ingest_from_daedalus

Defined in library/tasks.py. Routed to the embedding queue (per CELERY_TASK_ROUTES["library.tasks.ingest_*"]). Wraps the existing EmbeddingPipeline.process_item.

@shared_task(
    name="library.tasks.ingest_from_daedalus",
    bind=True,
    queue="embedding",
    max_retries=3,
    default_retry_delay=60,
    acks_late=True,
)
def ingest_from_daedalus(self, job_id: str): ...

Task flow (as built)

  1. Mark job processing, set started_at.
  2. Resolve target Library by library_uid.
  3. If a prior Item exists for this Library with the same source_ref but a different content_hash, delete it (chunks + images + embeddings) before continuing.
  4. Fetch file bytes from the Daedalus S3 bucket via library.services.daedalus_s3.fetch_from_daedalus.
  5. Create the Item neomodel node with s3_key=items/{item_uid}/original.{ext} and copy bytes into Mnemosyne's own bucket.
  6. Connect to a default Collection for the Library (auto-created on first ingest).
  7. Run EmbeddingPipeline.process_item(item.uid) — chunk per library_type, embed via the configured model, write Chunks + Concepts to Neo4j.
  8. Mark job completed with chunks_created, concepts_extracted, embedding_model, completed_at.

On any exception with retries remaining: re-raise via self.retry() (exponential backoff). On terminal failure: mark job failed with the exception text.

⚠️ DEBUG LOG Points — Celery Worker (Critical)

These are the most important log points in the entire integration. Without them, debugging async embedding failures is nearly impossible.

Location Log Event Level What to Log
Task pickup embed_task_started INFO job_id, item_uid, worker hostname, retry count
S3 fetch start embed_s3_fetch_start DEBUG s3_key, source bucket
S3 fetch complete embed_s3_fetch_complete DEBUG s3_key, file_size, duration_ms
S3 fetch failed embed_s3_fetch_failed ERROR s3_key, error, retry_count
S3 cross-bucket copy start s3_cross_bucket_copy_start DEBUG source_bucket, source_key, dest_bucket, dest_key
S3 cross-bucket copy complete s3_cross_bucket_copy_complete DEBUG source_key, dest_key, file_size, duration_ms
S3 cross-bucket copy failed s3_cross_bucket_copy_failed ERROR source_bucket, source_key, error
Chunking start embed_chunking_start DEBUG library_type, strategy, chunk_size, chunk_overlap
Chunking complete embed_chunking_complete INFO chunks_created, avg_chunk_size
Chunking failed embed_chunking_failed ERROR file_type, error
Embedding start embed_vectors_start DEBUG model_name, dimensions, batch_size, total_chunks
Embedding complete embed_vectors_complete INFO model_name, duration_ms, tokens_processed
Embedding failed embed_vectors_failed ERROR model_name, chunk_index, error
Neo4j write start embed_neo4j_write_start DEBUG chunks_to_write count
Neo4j write complete embed_neo4j_write_complete INFO chunks_written, duration_ms
Neo4j write failed embed_neo4j_write_failed ERROR chunk_index, neo4j_error
Concept extraction start embed_concepts_start DEBUG model_name
Concept extraction complete embed_concepts_complete INFO concepts_extracted, concept_names, duration_ms
Graph build start embed_graph_build_start DEBUG
Graph build complete embed_graph_build_complete INFO relationships_created, duration_ms
Job completed embed_job_completed INFO job_id, item_uid, total_duration_ms, chunks, concepts
Job failed embed_job_failed ERROR job_id, item_uid, exception_type, error, full traceback

4. S3 Bucket Strategy

Mnemosyne uses its own bucket (mnemosyne-content, Terraform-provisioned per Phase 1). On ingest, the Celery worker copies the file from the Daedalus bucket to Mnemosyne's bucket.

mnemosyne-content bucket
├── items/
│   └── {item_uid}/
│       └── original/{filename}     ← copied from Daedalus bucket
│       └── chunks/
│           └── chunk_000.txt
│           └── chunk_001.txt
├── images/
│   └── {image_uid}/{filename}

Configuration

# .env additions

# Mnemosyne's own bucket (existing)
AWS_STORAGE_BUCKET_NAME=mnemosyne-content

# Cross-bucket read access to Daedalus bucket
DAEDALUS_S3_BUCKET_NAME=daedalus
DAEDALUS_S3_ENDPOINT_URL=http://incus-s3.incus:9000
DAEDALUS_S3_ACCESS_KEY_ID=${VAULT_DAEDALUS_S3_READ_KEY}
DAEDALUS_S3_SECRET_ACCESS_KEY=${VAULT_DAEDALUS_S3_READ_SECRET}

# MCP server
MCP_SERVER_PORT=22091
MCP_REQUIRE_AUTH=False

5. Prometheus Metrics

# MCP tool calls
mnemosyne_mcp_tool_invocations_total{tool,status}              counter
mnemosyne_mcp_tool_duration_seconds{tool}                      histogram

# Ingest pipeline
mnemosyne_ingest_jobs_total{status}                            counter
mnemosyne_ingest_duration_seconds{library_type}                histogram
mnemosyne_chunks_created_total{library_type}                   counter
mnemosyne_concepts_extracted_total                             counter
mnemosyne_embeddings_generated_total{model}                    counter
mnemosyne_embedding_duration_seconds{model}                    histogram

# Search performance
mnemosyne_search_duration_seconds{search_type}                 histogram
mnemosyne_search_results_total{search_type}                    counter
mnemosyne_rerank_duration_seconds{model}                       histogram

# Infrastructure
mnemosyne_neo4j_query_duration_seconds{query_type}             histogram
mnemosyne_s3_operations_total{operation,status}                counter

6. Implementation Phases (Mnemosyne-specific)

Phase 1 — REST API for Daedalus (workspace + ingest) Implemented

  • Library.workspace_id + library_type enum (added business, finance)
  • IngestJob Django ORM model + migration 0001_initial.py
  • POST /library/api/workspaces/, GET /library/api/workspaces/{id}/, DELETE /library/api/workspaces/{id}/ (concept-safe)
  • POST /library/api/ingest/ with (library, source_ref, content_hash) idempotency
  • GET /library/api/jobs/{job_id}/, POST .../retry/, GET /library/api/jobs/
  • library.tasks.ingest_from_daedalus Celery task with content-hash-aware supersede logic
  • library.services.daedalus_s3 cross-bucket fetch + copy
  • HTTP Basic auth via daedalus-service user

Phase 2 — MCP Server (Mnemosyne roadmap Phase 5) Implemented

  • mcp_server/ module following the Django MCP Pattern
  • search tool (hybrid vector + fulltext + concept-graph + Synesis re-rank)
  • get_chunk tool (full text by chunk_uid)
  • list_libraries, list_collections, list_items discovery tools
  • get_health tool (Neo4j + S3 + embedding model probes)
  • Workspace_id parameter on every search/discovery tool (undocumented to LLM, scoping enforced in Cypher)
  • Single-mode rule: workspace-scoped vs global, never both in one query
  • ASGI mount + uvicorn deployment on port 22091; nginx proxies via /mcp/ on 23090
  • Prometheus metrics (mnemosyne_mcp_*)

Phase 3 — Per-turn token access control for Daedalus integration Implemented

Daedalus mints a short-lived HS256 JWT per chat turn and sends it as Authorization: Bearer to Pallas. Pallas forwards the token to outgoing Mnemosyne MCP calls (via pallas/_fastagent_patch). Mnemosyne validates the JWT and scopes every search to the workspace indicated by the ws claim.

Mnemosyne-side components:

  • MCPSigningKey model — stores active HS256 secrets keyed by kid. Managed via manage.py seed_signing_key --kid <kid>.
  • resolve_mcp_jwt(token_string) in mcp_server/auth.py — validates signature, exp, iss, jti replay; returns claims dict.
  • MCPAuthMiddleware.on_call_tool — detects JWT shape (three dot-separated segments), routes to resolve_mcp_jwt, stores claims in FastMCP context state via STATE_KEY_CLAIMS.
  • _scope_from_claims(claims, arg_workspace_id) — claims trump tool args; returns (ws, allowed_libraries).
  • allowed_libraries on SearchRequest — extends _WORKSPACE_SCOPE_CLAUSE to include user-managed libraries in addition to the workspace's own.

Token format (HS256):

{
  "iss": "daedalus",
  "sub": "chat",
  "ws":  "<workspace_uuid>",
  "libs": [],
  "iat": 1746000000,
  "exp": 1746000600,
  "jti": "<uuid4>"
}

The libs claim is reserved for future user-managed library assignment (deferred). Currently always []; the workspace's own library is always included via the ws claim.

Provisioning:

# On Mnemosyne host, once:
docker compose exec app python manage.py seed_signing_key --kid daedalus-1
# Copy the printed hex → DAEDALUS_MNEMOSYNE_SIGNING_SECRET in Daedalus .env

See the Daedalus-side spec §9 for the full integration architecture.