Files
mnemosyne/docs/mnemosyne_integration.md
Robert Helewka 409da7d109
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 56s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m30s
docs: replace daedalus-service basic auth with per-user DRF tokens
2026-05-22 22:59:59 -04:00

22 KiB

Mnemosyne Integration — Daedalus & Pallas Reference

This document describes Mnemosyne's role in the Daedalus + Pallas architecture and what's actually built today. The Daedalus-side spec lives in daedalus/docs/mnemosyne_integration.md.


Overview

Mnemosyne exposes two interfaces for the wider Ouranos ecosystem:

  1. REST API (/library/api/*) — consumed by the Daedalus backend authenticated as the owning Mnemosyne user via a per-user DRF token (Authorization: Token <key>, surfaced on /profile/settings/) for workspace lifecycle and asynchronous file ingestion. Phase 1, implemented.
  2. MCP Server (port 22091 internal, /mcp/ via nginx on 23090) — exposes search, browse, and retrieval tools. Phase 5 of Mnemosyne's own roadmap, implemented with workspace-scoped access control via long-lived team JWTs. Consumed by Pallas FastAgents in production (Daedalus integration Phase 2, implemented — see Phase 3 of this doc).

Phase status

Phase What Status
1. REST workspace + ingest API for Daedalus POST /workspaces/, DELETE /workspaces/{id}/, POST /ingest/, GET /jobs/{id}/ Implemented
2. MCP Server (Mnemosyne roadmap Phase 5) search, get_chunk, list_libraries, list_collections, list_items, get_health Implemented (workspace_id scoping enforced in Cypher)
3. Long-lived team JWT access control for Pallas instances Mnemosyne mints a 10-year HS256 JWT per Pallas instance (Team); Daedalus stores it encrypted and the operator pastes the plaintext into fastagent.secrets.yaml. Mnemosyne scopes search to the team's assigned workspaces via TeamWorkspaceAssignment. Implemented

1. MCP Server

Port & URL

Endpoint Internal (container) Public (via nginx on host port 23181)
Django REST API http://app:8000/ https://mnemosyne.ouranos.helu.ca/
MCP server http://mcp:8001/mcp/ https://mnemosyne.ouranos.helu.ca/mcp/
MCP health http://mcp:8001/mcp/health https://mnemosyne.ouranos.helu.ca/healthz
Django liveness http://app:8000/live/ internal only
Django readiness http://app:8000/ready/ internal only

Project structure (as built)

Follows the Django MCP Pattern:

mnemosyne/mnemosyne/mcp_server/
├── __init__.py
├── server.py              # FastMCP instance + tool registration
├── auth.py                # MCPAuthMiddleware
├── context.py             # get_mcp_user(), get_mcp_token()
└── tools/
    ├── __init__.py
    ├── search.py          # register_search_tools(mcp) → search, get_chunk
    ├── discovery.py       # register_discovery_tools(mcp) → list_libraries, list_collections, list_items
    └── health.py          # register_health_tools(mcp) → get_health

The ASGI mount lives at mnemosyne/mnemosyne/asgi.py (project-level) — it composes the FastMCP app at /mcp/ with a 307 redirect from bare /mcp so MCP clients that omit the trailing slash still land correctly.

Tools (as implemented)

Tool Module Description
search search.py Hybrid vector + full-text + concept-graph search → fusion → optional Synesis re-rank. Accepts library_uid, library_type, collection_uid, and (system-injected, undocumented to LLM) workspace_id for scoping.
get_chunk search.py Fetch full text of a chunk by uid (typically obtained from search). Honors workspace_id scoping.
list_libraries discovery.py List libraries with uid, name, library_type, description. Workspace_id-aware.
list_collections discovery.py List collections, optionally filtered by parent library. Workspace_id-aware.
list_items discovery.py List items with chunk_count, image_count, embedding_status. Workspace_id-aware.
get_health health.py Check Neo4j, S3, embedding model reachability. Used by Pallas health pollers.

The workspace_id parameter is present on every search/discovery tool but is deliberately undocumented in the LLM-facing tool description — it's a system-injected field the calling LLM should never know about. A workspace-scoped query returns ONLY that workspace's content; an unscoped query (workspace_id is NULL) returns ONLY global libraries. There is no mode that mixes the two — see library/services/search.py, _WORKSPACE_SCOPE_CLAUSE.

MCP Resources

Resource URI Source
mnemosyne://library-types library/content_types.pyLIBRARY_TYPE_DEFAULTS
mnemosyne://libraries Library.nodes.order_by("name") serialized to JSON

Deployment

Production runs as four containers from a single image via docker-compose.yaml. The nginx web container is the only publicly-exposed service, listening on host port 23181, which HAProxy on Titania reverse-proxies as https://mnemosyne.ouranos.helu.ca.

Container Internal port Role
app 8000 Django REST API + admin (gunicorn)
mcp 8001 FastMCP ASGI server (uvicorn)
worker Celery worker (embedding/ingest/batch)
web 80 → host 23181 nginx reverse proxy + static files

Auth is controlled by MCP_REQUIRE_AUTH in .env. Production sets it to True; the internal validator and ad-hoc testing may use False on an isolated network.

⚠️ DEBUG LOG Points — MCP Server

Location Log Event Level What to Log
Tool dispatch mcp_tool_called DEBUG Tool name, all input parameters
Vector search mcp_search_vector_query DEBUG Query text, embedding dims, library filter, limit
Vector search result mcp_search_vector_results DEBUG Candidate count, top/lowest scores
Full-text search mcp_search_fulltext_query DEBUG Query terms, index used
Re-ranking mcp_search_rerank DEBUG Candidates in/out, reranker model, duration_ms
Graph traversal mcp_graph_traverse DEBUG Starting node UID, relationships, depth, nodes visited
Neo4j query mcp_neo4j_query DEBUG Cypher query (parameterized), execution time_ms
Tool response mcp_tool_response DEBUG Tool name, result size (bytes/items), duration_ms
Health check mcp_health_check DEBUG Each dependency status, overall result

Important: All neomodel ORM calls inside async tool functions must be wrapped with sync_to_async(thread_sensitive=True).


2. REST API for Daedalus

All endpoints require an Authorization: Token <key> header carrying the DRF token of the Mnemosyne user the workspace belongs to (surfaced on /profile/settings/). Workspaces are scoped to their creating user via the Library.owner_username property; cross-user access returns 404. They are consumed by the Daedalus FastAPI backend only — not by any frontend.

Workspace lifecycle

Method Route Purpose
POST /library/api/workspaces/ Create workspace Library. Body: {workspace_id, name, library_type, description?}. Idempotent on workspace_id. library_type frozen at create.
GET /library/api/workspaces/{workspace_id}/ Workspace status (item_count, chunk_count, library_uid).
DELETE /library/api/workspaces/{workspace_id}/ Delete workspace Library + reachable content. Concept-safe: orphan-only Concept GC; concepts referenced by other libraries survive.

Ingest

Method Route Purpose
POST /library/api/ingest/ Accept a file (already in S3) for ingestion + embedding
GET /library/api/jobs/{job_id}/ Poll job status
POST /library/api/jobs/{job_id}/retry/ Retry a failed job
GET /library/api/jobs/?status=&library_uid= List recent jobs

Model: IngestJob

Lives in library/models.py (Django ORM on PostgreSQL, not Neo4j). Migration: library/migrations/0001_initial.py.

class IngestJob(models.Model):
    """Tracks the lifecycle of a content ingestion + embedding job."""

    id = models.CharField(max_length=64, primary_key=True)
    item_uid = models.CharField(max_length=64, db_index=True)
    celery_task_id = models.CharField(max_length=255, blank=True)

    status = models.CharField(
        max_length=20,
        choices=[
            ("pending", "Pending"),
            ("processing", "Processing"),
            ("completed", "Completed"),
            ("failed", "Failed"),
        ],
        default="pending",
        db_index=True,
    )
    progress = models.CharField(max_length=50, default="queued")
    error = models.TextField(blank=True, null=True)
    retry_count = models.PositiveIntegerField(default=0)

    chunks_created = models.PositiveIntegerField(default=0)
    concepts_extracted = models.PositiveIntegerField(default=0)
    embedding_model = models.CharField(max_length=100, blank=True)

    source = models.CharField(max_length=50, default="")
    source_ref = models.CharField(max_length=200, blank=True)
    s3_key = models.CharField(max_length=500)

    created_at = models.DateTimeField(auto_now_add=True)
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ["-created_at"]
        indexes = [
            models.Index(fields=["status", "-created_at"]),
            models.Index(fields=["source", "source_ref"]),
        ]

Ingest Request Schema

The target Library can be specified by either workspace_id (preferred for Daedalus) or library_uid. Idempotency key: (library, source_ref, content_hash). Same triple → existing job returned. New content_hash for the same source_ref → supersedes the prior Item.

{
  "s3_key": "workspaces/ws_abc/files/f_def/report.pdf",
  "title": "Q4 Technical Report",
  "workspace_id": "ws_abc",
  "file_type": "application/pdf",
  "file_size": 245000,
  "content_hash": "<sha256 hex, 64 chars>",
  "source": "daedalus",
  "source_ref": "ws_abc/f_def"
}

Job Status Response Schema

{
  "job_id": "job_789xyz",
  "item_uid": "item_abc123",
  "status": "processing",
  "progress": "embedding",
  "chunks_created": 0,
  "concepts_extracted": 0,
  "embedding_model": "qwen3-vl-embedding-8b",
  "started_at": "2026-03-12T15:42:01Z",
  "completed_at": null,
  "error": null
}

⚠️ DEBUG LOG Points — Ingest Endpoint

Location Log Event Level What to Log
Request received ingest_request_received INFO s3_key, title, library_uid, file_type, source, source_ref
S3 key validation ingest_s3_key_check DEBUG s3_key, exists (bool), bucket name
Library lookup ingest_library_lookup DEBUG library_uid, found (bool), library_type
Item node creation ingest_item_created INFO item_uid, title, library_uid, collection_uid
Celery task dispatch ingest_task_dispatched INFO job_id, item_uid, celery_task_id, queue name
Celery task dispatch failure ingest_task_dispatch_failed ERROR job_id, item_uid, exception details

3. Celery Embedding Pipeline

Task: ingest_from_daedalus

Defined in library/tasks.py. Routed to the embedding queue (per CELERY_TASK_ROUTES["library.tasks.ingest_*"]). Wraps the existing EmbeddingPipeline.process_item.

@shared_task(
    name="library.tasks.ingest_from_daedalus",
    bind=True,
    queue="embedding",
    max_retries=3,
    default_retry_delay=60,
    acks_late=True,
)
def ingest_from_daedalus(self, job_id: str): ...

Task flow (as built)

  1. Mark job processing, set started_at.
  2. Resolve target Library by library_uid.
  3. If a prior Item exists for this Library with the same source_ref but a different content_hash, delete it (chunks + images + embeddings) before continuing.
  4. Fetch file bytes from the Daedalus S3 bucket via library.services.daedalus_s3.fetch_from_daedalus.
  5. Create the Item neomodel node with s3_key=items/{item_uid}/original.{ext} and copy bytes into Mnemosyne's own bucket.
  6. Connect to a default Collection for the Library (auto-created on first ingest).
  7. Run EmbeddingPipeline.process_item(item.uid) — chunk per library_type, embed via the configured model, write Chunks + Concepts to Neo4j.
  8. Mark job completed with chunks_created, concepts_extracted, embedding_model, completed_at.

On any exception with retries remaining: re-raise via self.retry() (exponential backoff). On terminal failure: mark job failed with the exception text.

⚠️ DEBUG LOG Points — Celery Worker (Critical)

These are the most important log points in the entire integration. Without them, debugging async embedding failures is nearly impossible.

Location Log Event Level What to Log
Task pickup embed_task_started INFO job_id, item_uid, worker hostname, retry count
S3 fetch start embed_s3_fetch_start DEBUG s3_key, source bucket
S3 fetch complete embed_s3_fetch_complete DEBUG s3_key, file_size, duration_ms
S3 fetch failed embed_s3_fetch_failed ERROR s3_key, error, retry_count
S3 cross-bucket copy start s3_cross_bucket_copy_start DEBUG source_bucket, source_key, dest_bucket, dest_key
S3 cross-bucket copy complete s3_cross_bucket_copy_complete DEBUG source_key, dest_key, file_size, duration_ms
S3 cross-bucket copy failed s3_cross_bucket_copy_failed ERROR source_bucket, source_key, error
Chunking start embed_chunking_start DEBUG library_type, strategy, chunk_size, chunk_overlap
Chunking complete embed_chunking_complete INFO chunks_created, avg_chunk_size
Chunking failed embed_chunking_failed ERROR file_type, error
Embedding start embed_vectors_start DEBUG model_name, dimensions, batch_size, total_chunks
Embedding complete embed_vectors_complete INFO model_name, duration_ms, tokens_processed
Embedding failed embed_vectors_failed ERROR model_name, chunk_index, error
Neo4j write start embed_neo4j_write_start DEBUG chunks_to_write count
Neo4j write complete embed_neo4j_write_complete INFO chunks_written, duration_ms
Neo4j write failed embed_neo4j_write_failed ERROR chunk_index, neo4j_error
Concept extraction start embed_concepts_start DEBUG model_name
Concept extraction complete embed_concepts_complete INFO concepts_extracted, concept_names, duration_ms
Graph build start embed_graph_build_start DEBUG
Graph build complete embed_graph_build_complete INFO relationships_created, duration_ms
Job completed embed_job_completed INFO job_id, item_uid, total_duration_ms, chunks, concepts
Job failed embed_job_failed ERROR job_id, item_uid, exception_type, error, full traceback

4. S3 Bucket Strategy

Mnemosyne uses its own bucket (mnemosyne-content, Terraform-provisioned per Phase 1). On ingest, the Celery worker copies the file from the Daedalus bucket to Mnemosyne's bucket.

mnemosyne-content bucket
├── items/
│   └── {item_uid}/
│       └── original/{filename}     ← copied from Daedalus bucket
│       └── chunks/
│           └── chunk_000.txt
│           └── chunk_001.txt
├── images/
│   └── {image_uid}/{filename}

Configuration

# .env additions

# Mnemosyne's own bucket (existing)
AWS_STORAGE_BUCKET_NAME=mnemosyne-content

# Cross-bucket read access to Daedalus bucket
DAEDALUS_S3_BUCKET_NAME=daedalus
DAEDALUS_S3_ENDPOINT_URL=http://incus-s3.incus:9000
DAEDALUS_S3_ACCESS_KEY_ID=${VAULT_DAEDALUS_S3_READ_KEY}
DAEDALUS_S3_SECRET_ACCESS_KEY=${VAULT_DAEDALUS_S3_READ_SECRET}

# MCP server
MCP_SERVER_PORT=22091
MCP_REQUIRE_AUTH=False

5. Prometheus Metrics

# MCP tool calls
mnemosyne_mcp_tool_invocations_total{tool,status}              counter
mnemosyne_mcp_tool_duration_seconds{tool}                      histogram

# Ingest pipeline
mnemosyne_ingest_jobs_total{status}                            counter
mnemosyne_ingest_duration_seconds{library_type}                histogram
mnemosyne_chunks_created_total{library_type}                   counter
mnemosyne_concepts_extracted_total                             counter
mnemosyne_embeddings_generated_total{model}                    counter
mnemosyne_embedding_duration_seconds{model}                    histogram

# Search performance
mnemosyne_search_duration_seconds{search_type}                 histogram
mnemosyne_search_results_total{search_type}                    counter
mnemosyne_rerank_duration_seconds{model}                       histogram

# Infrastructure
mnemosyne_neo4j_query_duration_seconds{query_type}             histogram
mnemosyne_s3_operations_total{operation,status}                counter

6. Implementation Phases (Mnemosyne-specific)

Phase 1 — REST API for Daedalus (workspace + ingest) Implemented

  • Library.workspace_id + library_type enum (added business, finance)
  • IngestJob Django ORM model + migration 0001_initial.py
  • POST /library/api/workspaces/, GET /library/api/workspaces/{id}/, DELETE /library/api/workspaces/{id}/ (concept-safe)
  • POST /library/api/ingest/ with (library, source_ref, content_hash) idempotency
  • GET /library/api/jobs/{job_id}/, POST .../retry/, GET /library/api/jobs/
  • library.tasks.ingest_from_daedalus Celery task with content-hash-aware supersede logic
  • library.services.daedalus_s3 cross-bucket fetch + copy
  • Per-user DRF token auth (Authorization: Token <key>); workspaces scoped to the owning user via Library.owner_username

Phase 2 — MCP Server (Mnemosyne roadmap Phase 5) Implemented

  • mcp_server/ module following the Django MCP Pattern
  • search tool (hybrid vector + fulltext + concept-graph + Synesis re-rank)
  • get_chunk tool (full text by chunk_uid)
  • list_libraries, list_collections, list_items discovery tools
  • get_health tool (Neo4j + S3 + embedding model probes)
  • Workspace_id parameter on every search/discovery tool (undocumented to LLM, scoping enforced in Cypher)
  • Single-mode rule: workspace-scoped vs global, never both in one query
  • ASGI mount + uvicorn deployment on port 22091; nginx proxies via /mcp/ on 23090
  • Prometheus metrics (mnemosyne_mcp_*)

Phase 3 — Long-lived team JWT access control for Pallas instances Implemented

Each Pallas instance registered in Daedalus is mirrored as a Mnemosyne Team. Mnemosyne mints a long-lived (10-year) HS256 JWT for the team; the operator pastes the plaintext into the Pallas instance's fastagent.secrets.yaml. Every MCP call from that Pallas instance carries the team JWT as a static Authorization: Bearer header. Mnemosyne validates the JWT and scopes search to the workspaces assigned to that team.

Mnemosyne-side components:

  • MCPSigningKey model — stores active HS256 secrets keyed by kid. Managed via manage.py seed_signing_key --kid <kid>. The hex stays in Mnemosyne's DB; Daedalus never sees it.
  • Team model — one row per Pallas instance. id = PallasInstance.id on the Daedalus side (stable UUID). active_jti identifies the single currently-valid JWT; rotation changes this field, immediately invalidating the old token.
  • TeamWorkspaceAssignment model — maps a Team to a set of Daedalus workspace UUIDs. Updated by Daedalus via PUT /mcp_server/api/teams/{id}/workspaces/ whenever workspace attachments change.
  • resolve_mcp_jwt(token_string) in mcp_server/auth.py — validates signature, exp, iss. For team JWTs (iss=mnemosyne, typ=team): parses sub=team:<uuid>claims["team_id"]; bypasses the per-turn JTI replay cache (team tokens are intentionally reused).
  • _libraries_for_team(team_id, jti) — looks up the Team row, verifies active=True and active_jti == jti, then translates TeamWorkspaceAssignment rows into Library UIDs via a single Cypher query.
  • MCPAuthMiddleware.on_call_tool — routes team JWTs through _libraries_for_team; routes legacy per-turn JWTs through _scope_from_claims (backward-compatible).
  • REST control plane at /mcp_server/api/teams/:
    • POST / — create team by UUID; mints JWT, returns plaintext once.
    • GET /{id}/ — team state (workspace_ids, active status).
    • DELETE /{id}/ — soft-delete (active=False); all JWTs immediately invalid.
    • PUT /{id}/workspaces/ — replace workspace assignment list (idempotent).
    • POST /{id}/rotate/ — mint new JWT with new active_jti; returns plaintext once.

Team JWT format (HS256):

{
  "iss": "mnemosyne",
  "aud": "mnemosyne",
  "typ": "team",
  "sub": "team:<pallas-instance-uuid>",
  "iat": 1746000000,
  "exp": 2061360000,
  "jti": "<active_jti uuid>"
}

Provisioning (once per Pallas instance):

# 1. Seed the MCPSigningKey on Mnemosyne (once per deployment, not per instance):
docker compose exec app python manage.py seed_signing_key --kid daedalus-1 --retire-other
# The hex stays in Mnemosyne's DB — no operator action required.

# 2. Register the Pallas instance in Daedalus admin UI (/admin/pallas/).
#    Daedalus calls POST /mcp_server/api/teams/ automatically.
#    The team JWT is minted and stored encrypted in Daedalus.

# 3. Reveal the JWT via Daedalus admin UI (one-shot):
#    GET /api/v1/pallas/{id}/team-jwt
#    Copy the returned JWT string.

# 4. Paste into fastagent.secrets.yaml on the Pallas host:
#    mcp:
#      servers:
#        mnemosyne:
#          headers:
#            Authorization: "Bearer <JWT>"

# 5. Restart the Pallas agent processes.

# 6. Attach workspaces in Daedalus workspace settings UI.
#    Daedalus calls PUT /mcp_server/api/teams/{id}/workspaces/ automatically.

See the Daedalus-side spec §9 for the full operator walkthrough including JWT rotation and disaster recovery.