# Mnemosyne Integration — Daedalus & Pallas Reference This document describes Mnemosyne's role in the Daedalus + Pallas architecture and what's actually built today. The Daedalus-side spec lives in [`daedalus/docs/mnemosyne_integration.md`](../../daedalus/docs/mnemosyne_integration.md). --- ## Overview Mnemosyne exposes two interfaces for the wider Ouranos ecosystem: 1. **REST API** (`/library/api/*`) — consumed by the Daedalus backend authenticated as the owning Mnemosyne user via a per-user `UserToken` (`Authorization: Bearer `, minted at `/profile/tokens/`) for workspace lifecycle and asynchronous file ingestion. Phase 1, **implemented**. 2. **MCP Server** (port 22091 internal, `/mcp/` via nginx on 23090) — exposes search, browse, and retrieval tools. Phase 5 of Mnemosyne's own roadmap, **implemented** with workspace-scoped access control via long-lived team JWTs. Consumed by Pallas FastAgents in production (Daedalus integration Phase 2, **implemented** — see [Phase 3 of this doc](#3-phase-3-long-lived-team-jwt-access-control-for-pallas-instances)). ### Phase status | Phase | What | Status | |-------|------|--------| | 1. REST workspace + ingest API for Daedalus | `POST /workspaces/`, `DELETE /workspaces/{id}/`, `POST /ingest/`, `GET /jobs/{id}/` | **Implemented** | | 2. MCP Server (Mnemosyne roadmap Phase 5) | `search`, `get_chunk`, `list_libraries`, `list_collections`, `list_items`, `get_health` | **Implemented** (workspace_id scoping enforced in Cypher) | | 3. Long-lived team JWT access control for Pallas instances | Mnemosyne mints a 10-year HS256 JWT per Pallas instance (Team); Daedalus stores it encrypted and the operator pastes the plaintext into `fastagent.secrets.yaml`. Mnemosyne scopes search to the team's assigned workspaces via `TeamWorkspaceAssignment`. | **Implemented** | --- ## 1. MCP Server ### Port & URL | Endpoint | Internal (container) | Public (via nginx on host port 23181) | |---|---|---| | Django REST API | `http://app:8000/` | `https://mnemosyne.ouranos.helu.ca/` | | MCP server | `http://mcp:8001/mcp/` | `https://mnemosyne.ouranos.helu.ca/mcp/` | | MCP health | `http://mcp:8001/mcp/health` | `https://mnemosyne.ouranos.helu.ca/healthz` | | Django liveness | `http://app:8000/live/` | internal only | | Django readiness | `http://app:8000/ready/` | internal only | ### Project structure (as built) Follows the [Django MCP Pattern](Pattern_Django-MCP_V1-00.md): ``` mnemosyne/mnemosyne/mcp_server/ ├── __init__.py ├── server.py # FastMCP instance + tool registration ├── auth.py # MCPAuthMiddleware ├── context.py # get_mcp_user(), get_mcp_token() └── tools/ ├── __init__.py ├── search.py # register_search_tools(mcp) → search, get_chunk ├── discovery.py # register_discovery_tools(mcp) → list_libraries, list_collections, list_items └── health.py # register_health_tools(mcp) → get_health ``` The ASGI mount lives at `mnemosyne/mnemosyne/asgi.py` (project-level) — it composes the FastMCP app at `/mcp/` with a 307 redirect from bare `/mcp` so MCP clients that omit the trailing slash still land correctly. ### Tools (as implemented) | Tool | Module | Description | |------|--------|-------------| | `search` | `search.py` | Hybrid vector + full-text + concept-graph search → fusion → optional Synesis re-rank. Accepts `library_uid`, `library_type`, `collection_uid`, and (system-injected, undocumented to LLM) `workspace_id` for scoping. | | `get_chunk` | `search.py` | Fetch full text of a chunk by uid (typically obtained from `search`). Honors workspace_id scoping. | | `list_libraries` | `discovery.py` | List libraries with uid, name, library_type, description. Workspace_id-aware. | | `list_collections` | `discovery.py` | List collections, optionally filtered by parent library. Workspace_id-aware. | | `list_items` | `discovery.py` | List items with chunk_count, image_count, embedding_status. Workspace_id-aware. | | `get_health` | `health.py` | Check Neo4j, S3, embedding model reachability. Used by Pallas health pollers. | The `workspace_id` parameter is present on every search/discovery tool but is **deliberately undocumented in the LLM-facing tool description** — it's a system-injected field the calling LLM should never know about. A workspace-scoped query returns ONLY that workspace's content; an unscoped query (workspace_id is NULL) returns ONLY global libraries. There is no mode that mixes the two — see `library/services/search.py`, `_WORKSPACE_SCOPE_CLAUSE`. ### MCP Resources | Resource URI | Source | |---|---| | `mnemosyne://library-types` | `library/content_types.py` → `LIBRARY_TYPE_DEFAULTS` | | `mnemosyne://libraries` | `Library.nodes.order_by("name")` serialized to JSON | ### Deployment Production runs as four containers from a single image via `docker-compose.yaml`. The nginx `web` container is the only publicly-exposed service, listening on **host port 23181**, which HAProxy on Titania reverse-proxies as `https://mnemosyne.ouranos.helu.ca`. | Container | Internal port | Role | |-----------|--------------|------| | `app` | 8000 | Django REST API + admin (gunicorn) | | `mcp` | 8001 | FastMCP ASGI server (uvicorn) | | `worker` | — | Celery worker (embedding/ingest/batch) | | `web` | 80 → host **23181** | nginx reverse proxy + static files | Auth is controlled by `MCP_REQUIRE_AUTH` in `.env`. Production sets it to `True`; the internal validator and ad-hoc testing may use `False` on an isolated network. ### ⚠️ DEBUG LOG Points — MCP Server | Location | Log Event | Level | What to Log | |----------|-----------|-------|-------------| | Tool dispatch | `mcp_tool_called` | DEBUG | Tool name, all input parameters | | Vector search | `mcp_search_vector_query` | DEBUG | Query text, embedding dims, library filter, limit | | Vector search result | `mcp_search_vector_results` | DEBUG | Candidate count, top/lowest scores | | Full-text search | `mcp_search_fulltext_query` | DEBUG | Query terms, index used | | Re-ranking | `mcp_search_rerank` | DEBUG | Candidates in/out, reranker model, duration_ms | | Graph traversal | `mcp_graph_traverse` | DEBUG | Starting node UID, relationships, depth, nodes visited | | Neo4j query | `mcp_neo4j_query` | DEBUG | Cypher query (parameterized), execution time_ms | | Tool response | `mcp_tool_response` | DEBUG | Tool name, result size (bytes/items), duration_ms | | Health check | `mcp_health_check` | DEBUG | Each dependency status, overall result | **Important:** All neomodel ORM calls inside async tool functions **must** be wrapped with `sync_to_async(thread_sensitive=True)`. --- ## 2. REST API for Daedalus All endpoints require an `Authorization: Bearer <plaintext>` header carrying a `UserToken` belonging to the Mnemosyne user the workspace belongs to (minted at `/profile/tokens/`). Workspaces are scoped to their creating user via the `Library.owner_username` property; cross-user access returns 404. Anonymous requests get 401 with `WWW-Authenticate: Bearer`. These endpoints are consumed by the Daedalus FastAPI backend only — not by any frontend. ### Workspace lifecycle | Method | Route | Purpose | |--------|-------|---------| | `POST` | `/library/api/workspaces/` | Create workspace Library. Body: `{workspace_id, name, library_type, description?}`. Idempotent on `workspace_id`. `library_type` frozen at create. | | `GET` | `/library/api/workspaces/{workspace_id}/` | Workspace status (item_count, chunk_count, library_uid). | | `DELETE` | `/library/api/workspaces/{workspace_id}/` | Delete workspace Library + reachable content. Concept-safe: orphan-only Concept GC; concepts referenced by other libraries survive. | ### Ingest | Method | Route | Purpose | |--------|-------|---------| | `POST` | `/library/api/ingest/` | Accept a file (already in S3) for ingestion + embedding | | `GET` | `/library/api/jobs/{job_id}/` | Poll job status | | `POST` | `/library/api/jobs/{job_id}/retry/` | Retry a failed job | | `GET` | `/library/api/jobs/?status=&library_uid=` | List recent jobs | ### Model: `IngestJob` Lives in `library/models.py` (Django ORM on PostgreSQL, not Neo4j). Migration: `library/migrations/0001_initial.py`. ```python class IngestJob(models.Model): """Tracks the lifecycle of a content ingestion + embedding job.""" id = models.CharField(max_length=64, primary_key=True) item_uid = models.CharField(max_length=64, db_index=True) celery_task_id = models.CharField(max_length=255, blank=True) status = models.CharField( max_length=20, choices=[ ("pending", "Pending"), ("processing", "Processing"), ("completed", "Completed"), ("failed", "Failed"), ], default="pending", db_index=True, ) progress = models.CharField(max_length=50, default="queued") error = models.TextField(blank=True, null=True) retry_count = models.PositiveIntegerField(default=0) chunks_created = models.PositiveIntegerField(default=0) concepts_extracted = models.PositiveIntegerField(default=0) embedding_model = models.CharField(max_length=100, blank=True) source = models.CharField(max_length=50, default="") source_ref = models.CharField(max_length=200, blank=True) s3_key = models.CharField(max_length=500) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(null=True, blank=True) completed_at = models.DateTimeField(null=True, blank=True) class Meta: ordering = ["-created_at"] indexes = [ models.Index(fields=["status", "-created_at"]), models.Index(fields=["source", "source_ref"]), ] ``` ### Ingest Request Schema The target Library can be specified by either `workspace_id` (preferred for Daedalus) or `library_uid`. Idempotency key: `(library, source_ref, content_hash)`. Same triple → existing job returned. New `content_hash` for the same `source_ref` → supersedes the prior Item. ```json { "s3_key": "workspaces/ws_abc/files/f_def/report.pdf", "title": "Q4 Technical Report", "workspace_id": "ws_abc", "file_type": "application/pdf", "file_size": 245000, "content_hash": "<sha256 hex, 64 chars>", "source": "daedalus", "source_ref": "ws_abc/f_def" } ``` ### Job Status Response Schema ```json { "job_id": "job_789xyz", "item_uid": "item_abc123", "status": "processing", "progress": "embedding", "chunks_created": 0, "concepts_extracted": 0, "embedding_model": "qwen3-vl-embedding-8b", "started_at": "2026-03-12T15:42:01Z", "completed_at": null, "error": null } ``` ### ⚠️ DEBUG LOG Points — Ingest Endpoint | Location | Log Event | Level | What to Log | |----------|-----------|-------|-------------| | Request received | `ingest_request_received` | INFO | s3_key, title, library_uid, file_type, source, source_ref | | S3 key validation | `ingest_s3_key_check` | DEBUG | s3_key, exists (bool), bucket name | | Library lookup | `ingest_library_lookup` | DEBUG | library_uid, found (bool), library_type | | Item node creation | `ingest_item_created` | INFO | item_uid, title, library_uid, collection_uid | | Celery task dispatch | `ingest_task_dispatched` | INFO | job_id, item_uid, celery_task_id, queue name | | Celery task dispatch failure | `ingest_task_dispatch_failed` | ERROR | job_id, item_uid, exception details | --- ## 3. Celery Embedding Pipeline ### Task: `ingest_from_daedalus` Defined in `library/tasks.py`. Routed to the `embedding` queue (per `CELERY_TASK_ROUTES["library.tasks.ingest_*"]`). Wraps the existing `EmbeddingPipeline.process_item`. ```python @shared_task( name="library.tasks.ingest_from_daedalus", bind=True, queue="embedding", max_retries=3, default_retry_delay=60, acks_late=True, ) def ingest_from_daedalus(self, job_id: str): ... ``` ### Task flow (as built) 1. Mark job `processing`, set `started_at`. 2. Resolve target Library by `library_uid`. 3. If a prior Item exists for this Library with the same `source_ref` but a *different* `content_hash`, delete it (chunks + images + embeddings) before continuing. 4. Fetch file bytes from the Daedalus S3 bucket via `library.services.daedalus_s3.fetch_from_daedalus`. 5. Create the `Item` neomodel node with `s3_key=items/{item_uid}/original.{ext}` and copy bytes into Mnemosyne's own bucket. 6. Connect to a default Collection for the Library (auto-created on first ingest). 7. Run `EmbeddingPipeline.process_item(item.uid)` — chunk per `library_type`, embed via the configured model, write Chunks + Concepts to Neo4j. 8. Mark job `completed` with `chunks_created`, `concepts_extracted`, `embedding_model`, `completed_at`. On any exception with retries remaining: re-raise via `self.retry()` (exponential backoff). On terminal failure: mark job `failed` with the exception text. ### ⚠️ DEBUG LOG Points — Celery Worker (Critical) These are the most important log points in the entire integration. Without them, debugging async embedding failures is nearly impossible. | Location | Log Event | Level | What to Log | |----------|-----------|-------|-------------| | Task pickup | `embed_task_started` | INFO | job_id, item_uid, worker hostname, retry count | | S3 fetch start | `embed_s3_fetch_start` | DEBUG | s3_key, source bucket | | S3 fetch complete | `embed_s3_fetch_complete` | DEBUG | s3_key, file_size, duration_ms | | S3 fetch failed | `embed_s3_fetch_failed` | ERROR | s3_key, error, retry_count | | S3 cross-bucket copy start | `s3_cross_bucket_copy_start` | DEBUG | source_bucket, source_key, dest_bucket, dest_key | | S3 cross-bucket copy complete | `s3_cross_bucket_copy_complete` | DEBUG | source_key, dest_key, file_size, duration_ms | | S3 cross-bucket copy failed | `s3_cross_bucket_copy_failed` | ERROR | source_bucket, source_key, error | | Chunking start | `embed_chunking_start` | DEBUG | library_type, strategy, chunk_size, chunk_overlap | | Chunking complete | `embed_chunking_complete` | INFO | chunks_created, avg_chunk_size | | Chunking failed | `embed_chunking_failed` | ERROR | file_type, error | | Embedding start | `embed_vectors_start` | DEBUG | model_name, dimensions, batch_size, total_chunks | | Embedding complete | `embed_vectors_complete` | INFO | model_name, duration_ms, tokens_processed | | Embedding failed | `embed_vectors_failed` | ERROR | model_name, chunk_index, error | | Neo4j write start | `embed_neo4j_write_start` | DEBUG | chunks_to_write count | | Neo4j write complete | `embed_neo4j_write_complete` | INFO | chunks_written, duration_ms | | Neo4j write failed | `embed_neo4j_write_failed` | ERROR | chunk_index, neo4j_error | | Concept extraction start | `embed_concepts_start` | DEBUG | model_name | | Concept extraction complete | `embed_concepts_complete` | INFO | concepts_extracted, concept_names, duration_ms | | Graph build start | `embed_graph_build_start` | DEBUG | — | | Graph build complete | `embed_graph_build_complete` | INFO | relationships_created, duration_ms | | Job completed | `embed_job_completed` | INFO | job_id, item_uid, total_duration_ms, chunks, concepts | | Job failed | `embed_job_failed` | ERROR | job_id, item_uid, exception_type, error, full traceback | --- ## 4. S3 Bucket Strategy Mnemosyne uses its own bucket (`mnemosyne-content`, Terraform-provisioned per Phase 1). On ingest, the Celery worker copies the file from the Daedalus bucket to Mnemosyne's bucket. ``` mnemosyne-content bucket ├── items/ │ └── {item_uid}/ │ └── original/{filename} ← copied from Daedalus bucket │ └── chunks/ │ └── chunk_000.txt │ └── chunk_001.txt ├── images/ │ └── {image_uid}/{filename} ``` ### Configuration ```bash # .env additions # Mnemosyne's own bucket (existing) AWS_STORAGE_BUCKET_NAME=mnemosyne-content # Cross-bucket read access to Daedalus bucket DAEDALUS_S3_BUCKET_NAME=daedalus DAEDALUS_S3_ENDPOINT_URL=http://incus-s3.incus:9000 DAEDALUS_S3_ACCESS_KEY_ID=${VAULT_DAEDALUS_S3_READ_KEY} DAEDALUS_S3_SECRET_ACCESS_KEY=${VAULT_DAEDALUS_S3_READ_SECRET} # MCP server MCP_SERVER_PORT=22091 MCP_REQUIRE_AUTH=False ``` --- ## 5. Prometheus Metrics ``` # MCP tool calls mnemosyne_mcp_tool_invocations_total{tool,status} counter mnemosyne_mcp_tool_duration_seconds{tool} histogram # Ingest pipeline mnemosyne_ingest_jobs_total{status} counter mnemosyne_ingest_duration_seconds{library_type} histogram mnemosyne_chunks_created_total{library_type} counter mnemosyne_concepts_extracted_total counter mnemosyne_embeddings_generated_total{model} counter mnemosyne_embedding_duration_seconds{model} histogram # Search performance mnemosyne_search_duration_seconds{search_type} histogram mnemosyne_search_results_total{search_type} counter mnemosyne_rerank_duration_seconds{model} histogram # Infrastructure mnemosyne_neo4j_query_duration_seconds{query_type} histogram mnemosyne_s3_operations_total{operation,status} counter ``` --- ## 6. Implementation Phases (Mnemosyne-specific) ### Phase 1 — REST API for Daedalus (workspace + ingest) ✅ Implemented - [x] `Library.workspace_id` + `library_type` enum (added `business`, `finance`) - [x] `IngestJob` Django ORM model + migration `0001_initial.py` - [x] `POST /library/api/workspaces/`, `GET /library/api/workspaces/{id}/`, `DELETE /library/api/workspaces/{id}/` (concept-safe) - [x] `POST /library/api/ingest/` with `(library, source_ref, content_hash)` idempotency - [x] `GET /library/api/jobs/{job_id}/`, `POST .../retry/`, `GET /library/api/jobs/` - [x] `library.tasks.ingest_from_daedalus` Celery task with content-hash-aware supersede logic - [x] `library.services.daedalus_s3` cross-bucket fetch + copy - [x] Per-user `UserToken` auth (`Authorization: Bearer <plaintext>`, minted at `/profile/tokens/`); workspaces scoped to the owning user via `Library.owner_username` ### Phase 2 — MCP Server (Mnemosyne roadmap Phase 5) ✅ Implemented - [x] `mcp_server/` module following the [Django MCP Pattern](Pattern_Django-MCP_V1-00.md) - [x] `search` tool (hybrid vector + fulltext + concept-graph + Synesis re-rank) - [x] `get_chunk` tool (full text by chunk_uid) - [x] `list_libraries`, `list_collections`, `list_items` discovery tools - [x] `get_health` tool (Neo4j + S3 + embedding model probes) - [x] Workspace_id parameter on every search/discovery tool (undocumented to LLM, scoping enforced in Cypher) - [x] Single-mode rule: workspace-scoped vs global, never both in one query - [x] ASGI mount + uvicorn deployment on port 22091; nginx proxies via `/mcp/` on 23090 - [x] Prometheus metrics (`mnemosyne_mcp_*`) ### Phase 3 — Long-lived team JWT access control for Pallas instances ✅ Implemented Each Pallas instance registered in Daedalus is mirrored as a Mnemosyne **Team**. Mnemosyne mints a long-lived (10-year) HS256 JWT for the team; the operator pastes the plaintext into the Pallas instance's `fastagent.secrets.yaml`. Every MCP call from that Pallas instance carries the team JWT as a static `Authorization: Bearer` header. Mnemosyne validates the JWT and scopes search to the workspaces assigned to that team. **Mnemosyne-side components:** - [x] `MCPSigningKey` model — stores active HS256 secrets keyed by `kid`. Managed via `manage.py seed_signing_key --kid <kid>`. The hex stays in Mnemosyne's DB; Daedalus never sees it. - [x] `Team` model — one row per Pallas instance. `id` = `PallasInstance.id` on the Daedalus side (stable UUID). `active_jti` identifies the single currently-valid JWT; rotation changes this field, immediately invalidating the old token. - [x] `TeamWorkspaceAssignment` model — maps a `Team` to a set of Daedalus workspace UUIDs. Updated by Daedalus via `PUT /mcp_server/api/teams/{id}/workspaces/` whenever workspace attachments change. - [x] `resolve_mcp_jwt(token_string)` in `mcp_server/auth.py` — validates signature, `exp`, `iss`. For team JWTs (`iss=mnemosyne`, `typ=team`): parses `sub=team:<uuid>` → `claims["team_id"]`; bypasses the per-turn JTI replay cache (team tokens are intentionally reused). - [x] `_libraries_for_team(team_id, jti)` — looks up the `Team` row, verifies `active=True` and `active_jti == jti`, then translates `TeamWorkspaceAssignment` rows into Library UIDs via a single Cypher query. - [x] `MCPAuthMiddleware.on_call_tool` — routes team JWTs through `_libraries_for_team`; routes legacy per-turn JWTs through `_scope_from_claims` (backward-compatible). - [x] REST control plane at `/mcp_server/api/teams/`: - `POST /` — create team by UUID; mints JWT, returns plaintext once. - `GET /{id}/` — team state (workspace_ids, active status). - `DELETE /{id}/` — soft-delete (`active=False`); all JWTs immediately invalid. - `PUT /{id}/workspaces/` — replace workspace assignment list (idempotent). - `POST /{id}/rotate/` — mint new JWT with new `active_jti`; returns plaintext once. **Team JWT format (HS256):** ```json { "iss": "mnemosyne", "aud": "mnemosyne", "typ": "team", "sub": "team:<pallas-instance-uuid>", "iat": 1746000000, "exp": 2061360000, "jti": "<active_jti uuid>" } ``` **Provisioning (once per Pallas instance):** ```bash # 1. Seed the MCPSigningKey on Mnemosyne (once per deployment, not per instance): docker compose exec app python manage.py seed_signing_key --kid daedalus-1 --retire-other # The hex stays in Mnemosyne's DB — no operator action required. # 2. Register the Pallas instance in Daedalus admin UI (/admin/pallas/). # Daedalus calls POST /mcp_server/api/teams/ automatically. # The team JWT is minted and stored encrypted in Daedalus. # 3. Reveal the JWT via Daedalus admin UI (one-shot): # GET /api/v1/pallas/{id}/team-jwt # Copy the returned JWT string. # 4. Paste into fastagent.secrets.yaml on the Pallas host: # mcp: # servers: # mnemosyne: # headers: # Authorization: "Bearer <JWT>" # 5. Restart the Pallas agent processes. # 6. Attach workspaces in Daedalus workspace settings UI. # Daedalus calls PUT /mcp_server/api/teams/{id}/workspaces/ automatically. ``` See the Daedalus-side spec [§9](../../daedalus/docs/mnemosyne_integration.md#9-phase-2--workspace-scoped-mcp-search-implemented) for the full operator walkthrough including JWT rotation and disaster recovery.