# Mnemosyne Integration — Daedalus & Pallas Reference This document summarises the Mnemosyne-specific implementation required for integration with the Daedalus & Pallas architecture. The full specification lives in [`daedalus/docs/mnemosyne_integration.md`](../../daedalus/docs/mnemosyne_integration.md). --- ## Overview Mnemosyne exposes two interfaces for the wider Ouranos ecosystem: 1. **MCP Server** (port 22091) — consumed by Pallas agents for synchronous search, browse, and retrieval operations 2. **REST Ingest API** — consumed by the Daedalus backend for asynchronous file ingestion and embedding job lifecycle management --- ## 1. MCP Server (Phase 5) ### Port & URL | Service | Port | URL | |---------|------|-----| | Mnemosyne MCP | 22091 | `http://puck.incus:22091/mcp` | | Health check | 22091 | `http://puck.incus:22091/mcp/health` | ### Project Structure Following the [Django MCP Pattern](Pattern_Django-MCP_V1-00.md): ``` mnemosyne/mnemosyne/mcp_server/ ├── __init__.py ├── server.py # FastMCP instance + tool registration ├── asgi.py # Starlette ASGI mount at /mcp ├── middleware.py # MCPAuthMiddleware (disabled for internal use) ├── context.py # get_mcp_user(), get_mcp_token() └── tools/ ├── __init__.py ├── search.py # register_search_tools(mcp) → search_knowledge, search_by_category ├── browse.py # register_browse_tools(mcp) → list_libraries, list_collections, get_item, get_concepts └── health.py # register_health_tools(mcp) → get_health ``` ### Tools to Implement | Tool | Module | Description | |------|--------|-------------| | `search_knowledge` | `search.py` | Hybrid vector + full-text + graph search → re-rank → return chunks with citations | | `search_by_category` | `search.py` | Same as above, scoped to a specific `library_type` | | `list_libraries` | `browse.py` | List all libraries with type, description, counts | | `list_collections` | `browse.py` | List collections within a library | | `get_item` | `browse.py` | Retrieve item detail with chunk previews and concept links | | `get_concepts` | `browse.py` | Traverse concept graph from a starting concept or item | | `get_health` | `health.py` | Check Neo4j, S3, embedding model reachability | ### MCP Resources | Resource URI | Source | |---|---| | `mnemosyne://library-types` | `library/content_types.py` → `LIBRARY_TYPE_DEFAULTS` | | `mnemosyne://libraries` | `Library.nodes.order_by("name")` serialized to JSON | ### Deployment Separate Uvicorn process alongside Django's Gunicorn: ```bash # Django WSGI (existing) gunicorn --bind :22090 --workers 3 mnemosyne.wsgi # MCP ASGI (new) uvicorn mcp_server.asgi:app --host 0.0.0.0 --port 22091 --workers 1 ``` Auth is disabled (`MCP_REQUIRE_AUTH=False`) since all traffic is internal (10.10.0.0/24). ### ⚠️ DEBUG LOG Points — MCP Server | Location | Log Event | Level | What to Log | |----------|-----------|-------|-------------| | Tool dispatch | `mcp_tool_called` | DEBUG | Tool name, all input parameters | | Vector search | `mcp_search_vector_query` | DEBUG | Query text, embedding dims, library filter, limit | | Vector search result | `mcp_search_vector_results` | DEBUG | Candidate count, top/lowest scores | | Full-text search | `mcp_search_fulltext_query` | DEBUG | Query terms, index used | | Re-ranking | `mcp_search_rerank` | DEBUG | Candidates in/out, reranker model, duration_ms | | Graph traversal | `mcp_graph_traverse` | DEBUG | Starting node UID, relationships, depth, nodes visited | | Neo4j query | `mcp_neo4j_query` | DEBUG | Cypher query (parameterized), execution time_ms | | Tool response | `mcp_tool_response` | DEBUG | Tool name, result size (bytes/items), duration_ms | | Health check | `mcp_health_check` | DEBUG | Each dependency status, overall result | **Important:** All neomodel ORM calls inside async tool functions **must** be wrapped with `sync_to_async(thread_sensitive=True)`. --- ## 2. REST Ingest API ### New Endpoints | Method | Route | Purpose | |--------|-------|---------| | `POST` | `/api/v1/library/ingest` | Accept a file for ingestion + embedding | | `GET` | `/api/v1/library/jobs/{job_id}` | Poll job status | | `POST` | `/api/v1/library/jobs/{job_id}/retry` | Retry a failed job | | `GET` | `/api/v1/library/jobs` | List recent jobs (optional `?status=` filter) | These endpoints are consumed by the **Daedalus FastAPI backend** only. Not by the frontend. ### New Model: `IngestJob` Add to `library/` app (Django ORM on PostgreSQL, not Neo4j): ```python class IngestJob(models.Model): """Tracks the lifecycle of a content ingestion + embedding job.""" id = models.CharField(max_length=64, primary_key=True) item_uid = models.CharField(max_length=64, db_index=True) celery_task_id = models.CharField(max_length=255, blank=True) status = models.CharField( max_length=20, choices=[ ("pending", "Pending"), ("processing", "Processing"), ("completed", "Completed"), ("failed", "Failed"), ], default="pending", db_index=True, ) progress = models.CharField(max_length=50, default="queued") error = models.TextField(blank=True, null=True) retry_count = models.PositiveIntegerField(default=0) chunks_created = models.PositiveIntegerField(default=0) concepts_extracted = models.PositiveIntegerField(default=0) embedding_model = models.CharField(max_length=100, blank=True) source = models.CharField(max_length=50, default="") source_ref = models.CharField(max_length=200, blank=True) s3_key = models.CharField(max_length=500) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(null=True, blank=True) completed_at = models.DateTimeField(null=True, blank=True) class Meta: ordering = ["-created_at"] indexes = [ models.Index(fields=["status", "-created_at"]), models.Index(fields=["source", "source_ref"]), ] ``` ### Ingest Request Schema ```json { "s3_key": "workspaces/ws_abc/files/f_def/report.pdf", "title": "Q4 Technical Report", "library_uid": "lib_technical_001", "collection_uid": "col_reports_2026", "file_type": "application/pdf", "file_size": 245000, "source": "daedalus", "source_ref": "ws_abc/f_def" } ``` ### Job Status Response Schema ```json { "job_id": "job_789xyz", "item_uid": "item_abc123", "status": "processing", "progress": "embedding", "chunks_created": 0, "concepts_extracted": 0, "embedding_model": "qwen3-vl-embedding-8b", "started_at": "2026-03-12T15:42:01Z", "completed_at": null, "error": null } ``` ### ⚠️ DEBUG LOG Points — Ingest Endpoint | Location | Log Event | Level | What to Log | |----------|-----------|-------|-------------| | Request received | `ingest_request_received` | INFO | s3_key, title, library_uid, file_type, source, source_ref | | S3 key validation | `ingest_s3_key_check` | DEBUG | s3_key, exists (bool), bucket name | | Library lookup | `ingest_library_lookup` | DEBUG | library_uid, found (bool), library_type | | Item node creation | `ingest_item_created` | INFO | item_uid, title, library_uid, collection_uid | | Celery task dispatch | `ingest_task_dispatched` | INFO | job_id, item_uid, celery_task_id, queue name | | Celery task dispatch failure | `ingest_task_dispatch_failed` | ERROR | job_id, item_uid, exception details | --- ## 3. Celery Embedding Pipeline ### New Task: `embed_item` ```python @shared_task( name="library.embed_item", bind=True, max_retries=3, default_retry_delay=60, autoretry_for=(S3ConnectionError, EmbeddingModelError), retry_backoff=True, retry_backoff_max=600, acks_late=True, queue="embedding", ) def embed_item(self, job_id, item_uid): ... ``` ### Task Flow 1. Update job → `processing` / `fetching` 2. Fetch file from Daedalus S3 bucket (cross-bucket read) 3. Copy to Mnemosyne's own S3 bucket 4. Load library type → chunking config 5. Chunk content per strategy 6. Store chunk text in S3 7. Generate embeddings (Arke/vLLM batch call) 8. Write Chunk nodes + vectors to Neo4j 9. Extract concepts (LLM call) 10. Build graph relationships 11. Update job → `completed` On failure at any step: update job → `failed` with error message. ### ⚠️ DEBUG LOG Points — Celery Worker (Critical) These are the most important log points in the entire integration. Without them, debugging async embedding failures is nearly impossible. | Location | Log Event | Level | What to Log | |----------|-----------|-------|-------------| | Task pickup | `embed_task_started` | INFO | job_id, item_uid, worker hostname, retry count | | S3 fetch start | `embed_s3_fetch_start` | DEBUG | s3_key, source bucket | | S3 fetch complete | `embed_s3_fetch_complete` | DEBUG | s3_key, file_size, duration_ms | | S3 fetch failed | `embed_s3_fetch_failed` | ERROR | s3_key, error, retry_count | | S3 cross-bucket copy start | `s3_cross_bucket_copy_start` | DEBUG | source_bucket, source_key, dest_bucket, dest_key | | S3 cross-bucket copy complete | `s3_cross_bucket_copy_complete` | DEBUG | source_key, dest_key, file_size, duration_ms | | S3 cross-bucket copy failed | `s3_cross_bucket_copy_failed` | ERROR | source_bucket, source_key, error | | Chunking start | `embed_chunking_start` | DEBUG | library_type, strategy, chunk_size, chunk_overlap | | Chunking complete | `embed_chunking_complete` | INFO | chunks_created, avg_chunk_size | | Chunking failed | `embed_chunking_failed` | ERROR | file_type, error | | Embedding start | `embed_vectors_start` | DEBUG | model_name, dimensions, batch_size, total_chunks | | Embedding complete | `embed_vectors_complete` | INFO | model_name, duration_ms, tokens_processed | | Embedding failed | `embed_vectors_failed` | ERROR | model_name, chunk_index, error | | Neo4j write start | `embed_neo4j_write_start` | DEBUG | chunks_to_write count | | Neo4j write complete | `embed_neo4j_write_complete` | INFO | chunks_written, duration_ms | | Neo4j write failed | `embed_neo4j_write_failed` | ERROR | chunk_index, neo4j_error | | Concept extraction start | `embed_concepts_start` | DEBUG | model_name | | Concept extraction complete | `embed_concepts_complete` | INFO | concepts_extracted, concept_names, duration_ms | | Graph build start | `embed_graph_build_start` | DEBUG | — | | Graph build complete | `embed_graph_build_complete` | INFO | relationships_created, duration_ms | | Job completed | `embed_job_completed` | INFO | job_id, item_uid, total_duration_ms, chunks, concepts | | Job failed | `embed_job_failed` | ERROR | job_id, item_uid, exception_type, error, full traceback | --- ## 4. S3 Bucket Strategy Mnemosyne uses its own bucket (`mnemosyne-content`, Terraform-provisioned per Phase 1). On ingest, the Celery worker copies the file from the Daedalus bucket to Mnemosyne's bucket. ``` mnemosyne-content bucket ├── items/ │ └── {item_uid}/ │ └── original/{filename} ← copied from Daedalus bucket │ └── chunks/ │ └── chunk_000.txt │ └── chunk_001.txt ├── images/ │ └── {image_uid}/{filename} ``` ### Configuration ```bash # .env additions # Mnemosyne's own bucket (existing) AWS_STORAGE_BUCKET_NAME=mnemosyne-content # Cross-bucket read access to Daedalus bucket DAEDALUS_S3_BUCKET_NAME=daedalus DAEDALUS_S3_ENDPOINT_URL=http://incus-s3.incus:9000 DAEDALUS_S3_ACCESS_KEY_ID=${VAULT_DAEDALUS_S3_READ_KEY} DAEDALUS_S3_SECRET_ACCESS_KEY=${VAULT_DAEDALUS_S3_READ_SECRET} # MCP server MCP_SERVER_PORT=22091 MCP_REQUIRE_AUTH=False ``` --- ## 5. Prometheus Metrics ``` # MCP tool calls mnemosyne_mcp_tool_invocations_total{tool,status} counter mnemosyne_mcp_tool_duration_seconds{tool} histogram # Ingest pipeline mnemosyne_ingest_jobs_total{status} counter mnemosyne_ingest_duration_seconds{library_type} histogram mnemosyne_chunks_created_total{library_type} counter mnemosyne_concepts_extracted_total counter mnemosyne_embeddings_generated_total{model} counter mnemosyne_embedding_duration_seconds{model} histogram # Search performance mnemosyne_search_duration_seconds{search_type} histogram mnemosyne_search_results_total{search_type} counter mnemosyne_rerank_duration_seconds{model} histogram # Infrastructure mnemosyne_neo4j_query_duration_seconds{query_type} histogram mnemosyne_s3_operations_total{operation,status} counter ``` --- ## 6. Implementation Phases (Mnemosyne-specific) ### Phase 1 — REST Ingest API - [ ] Create `IngestJob` model + Django migration - [ ] Implement `POST /api/v1/library/ingest` endpoint - [ ] Implement `GET /api/v1/library/jobs/{job_id}` endpoint - [ ] Implement `POST /api/v1/library/jobs/{job_id}/retry` endpoint - [ ] Implement `GET /api/v1/library/jobs` list endpoint - [ ] Implement `embed_item` Celery task with full debug logging - [ ] Add S3 cross-bucket copy logic - [ ] Add ingest API serializers and URL routing ### Phase 2 — MCP Server (Phase 5 of Mnemosyne roadmap) - [ ] Create `mcp_server/` module following Django MCP Pattern - [ ] Implement `search_knowledge` tool (hybrid search + re-rank) - [ ] Implement `search_by_category` tool - [ ] Implement `list_libraries`, `list_collections`, `get_item`, `get_concepts` tools - [ ] Implement `get_health` tool per Pallas health spec - [ ] Register MCP resources (`mnemosyne://library-types`, `mnemosyne://libraries`) - [ ] ASGI mount + Uvicorn deployment on port 22091 - [ ] Systemd service for MCP Uvicorn process - [ ] Add Prometheus metrics