Files
mnemosyne/docs/mnemosyne_integration.md
Robert Helewka e5618973fc docs(integration): mark Phases 1+2 as implemented; add Phase 3 stub
The integration doc was forward-looking spec but most of it now ships:

  Phase 1 (REST workspace + ingest API for Daedalus)         implemented
  Phase 2 (MCP server: search/get_chunk/list_*/get_health)   implemented
  Phase 3 (per-turn signed-token access control)            📋 deferred

Updated:
- Tool table reflects actual implementation (search, get_chunk,
  list_libraries, list_collections, list_items, get_health) instead
  of the speculative names (search_knowledge, search_by_category, etc.)
- Project structure matches the as-built layout (tools/discovery.py
  exists; no separate browse.py).
- REST API table covers both workspace lifecycle endpoints and ingest
  endpoints, with correct routes (/library/api/...).
- Ingest request schema includes content_hash and workspace_id
  (the actual idempotency key on the Mnemosyne side).
- Celery task description matches library.tasks.ingest_from_daedalus
  rather than the placeholder embed_item.
- Phase 6 checklist marks Phases 1+2 done; adds Phase 3 (per-turn
  token access control) with a per-Mnemosyne-side TODO list pointing
  at the matching Daedalus-side §9 design.

Internal MCP port stays 22091; public access via nginx on 23090.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-02 21:54:05 -04:00

384 lines
19 KiB
Markdown

# Mnemosyne Integration — Daedalus & Pallas Reference
This document describes Mnemosyne's role in the Daedalus + Pallas architecture and what's actually built today. The Daedalus-side spec lives in [`daedalus/docs/mnemosyne_integration.md`](../../daedalus/docs/mnemosyne_integration.md).
---
## Overview
Mnemosyne exposes two interfaces for the wider Ouranos ecosystem:
1. **REST API** (`/library/api/*`) — consumed by the Daedalus backend (HTTP Basic auth, service account `daedalus-service`) for workspace lifecycle and asynchronous file ingestion. Phase 1, **implemented**.
2. **MCP Server** (port 22091 internal, `/mcp/` via nginx on 23090) — exposes search, browse, and retrieval tools. Phase 5 of Mnemosyne's own roadmap, **implemented** with workspace_id scoping. Currently consumed by an internal validator (`validator/`) and ad-hoc clients; planned production consumer is Pallas FastAgents in Daedalus integration Phase 2 (deferred — see [Phase 3 of this doc](#3-phase-3-deferred-per-turn-token-access-control)).
### Phase status
| Phase | What | Status |
|-------|------|--------|
| 1. REST workspace + ingest API for Daedalus | `POST /workspaces/`, `DELETE /workspaces/{id}/`, `POST /ingest/`, `GET /jobs/{id}/` | **Implemented** |
| 2. MCP Server (Mnemosyne roadmap Phase 5) | `search`, `get_chunk`, `list_libraries`, `list_collections`, `list_items`, `get_health` | **Implemented** (workspace_id scoping in place; access-control to follow in Phase 3) |
| 3. Per-turn signed-token access control for Daedalus integration | Daedalus mints tokens carrying `{workspace_id, allowed_libraries}` claims; Mnemosyne validates and scopes search server-side | **Deferred** |
---
## 1. MCP Server
### Port & URL
| Endpoint | Internal | Public (via nginx) |
|---|---|---|
| MCP server | `http://mcp:22091/mcp/` | `http://puck.incus:23090/mcp/` |
| Health check | `http://mcp:22091/mcp/health` | `http://puck.incus:23090/healthz` |
### Project structure (as built)
Follows the [Django MCP Pattern](Pattern_Django-MCP_V1-00.md):
```
mnemosyne/mnemosyne/mcp_server/
├── __init__.py
├── server.py # FastMCP instance + tool registration
├── auth.py # MCPAuthMiddleware
├── context.py # get_mcp_user(), get_mcp_token()
└── tools/
├── __init__.py
├── search.py # register_search_tools(mcp) → search, get_chunk
├── discovery.py # register_discovery_tools(mcp) → list_libraries, list_collections, list_items
└── health.py # register_health_tools(mcp) → get_health
```
The ASGI mount lives at `mnemosyne/mnemosyne/asgi.py` (project-level) — it composes the FastMCP app at `/mcp/` with a 307 redirect from bare `/mcp` so MCP clients that omit the trailing slash still land correctly.
### Tools (as implemented)
| Tool | Module | Description |
|------|--------|-------------|
| `search` | `search.py` | Hybrid vector + full-text + concept-graph search → fusion → optional Synesis re-rank. Accepts `library_uid`, `library_type`, `collection_uid`, and (system-injected, undocumented to LLM) `workspace_id` for scoping. |
| `get_chunk` | `search.py` | Fetch full text of a chunk by uid (typically obtained from `search`). Honors workspace_id scoping. |
| `list_libraries` | `discovery.py` | List libraries with uid, name, library_type, description. Workspace_id-aware. |
| `list_collections` | `discovery.py` | List collections, optionally filtered by parent library. Workspace_id-aware. |
| `list_items` | `discovery.py` | List items with chunk_count, image_count, embedding_status. Workspace_id-aware. |
| `get_health` | `health.py` | Check Neo4j, S3, embedding model reachability. Used by Pallas health pollers. |
The `workspace_id` parameter is present on every search/discovery tool but is **deliberately undocumented in the LLM-facing tool description** — it's a system-injected field the calling LLM should never know about. A workspace-scoped query returns ONLY that workspace's content; an unscoped query (workspace_id is NULL) returns ONLY global libraries. There is no mode that mixes the two — see `library/services/search.py`, `_WORKSPACE_SCOPE_CLAUSE`.
### MCP Resources
| Resource URI | Source |
|---|---|
| `mnemosyne://library-types` | `library/content_types.py``LIBRARY_TYPE_DEFAULTS` |
| `mnemosyne://libraries` | `Library.nodes.order_by("name")` serialized to JSON |
### Deployment
Separate Uvicorn process alongside Django's Gunicorn:
```bash
# Django WSGI (existing)
gunicorn --bind :22090 --workers 3 mnemosyne.wsgi
# MCP ASGI (new)
uvicorn mcp_server.asgi:app --host 0.0.0.0 --port 22091 --workers 1
```
Auth is disabled (`MCP_REQUIRE_AUTH=False`) since all traffic is internal (10.10.0.0/24).
### ⚠️ DEBUG LOG Points — MCP Server
| Location | Log Event | Level | What to Log |
|----------|-----------|-------|-------------|
| Tool dispatch | `mcp_tool_called` | DEBUG | Tool name, all input parameters |
| Vector search | `mcp_search_vector_query` | DEBUG | Query text, embedding dims, library filter, limit |
| Vector search result | `mcp_search_vector_results` | DEBUG | Candidate count, top/lowest scores |
| Full-text search | `mcp_search_fulltext_query` | DEBUG | Query terms, index used |
| Re-ranking | `mcp_search_rerank` | DEBUG | Candidates in/out, reranker model, duration_ms |
| Graph traversal | `mcp_graph_traverse` | DEBUG | Starting node UID, relationships, depth, nodes visited |
| Neo4j query | `mcp_neo4j_query` | DEBUG | Cypher query (parameterized), execution time_ms |
| Tool response | `mcp_tool_response` | DEBUG | Tool name, result size (bytes/items), duration_ms |
| Health check | `mcp_health_check` | DEBUG | Each dependency status, overall result |
**Important:** All neomodel ORM calls inside async tool functions **must** be wrapped with `sync_to_async(thread_sensitive=True)`.
---
## 2. REST API for Daedalus
All endpoints require HTTP Basic auth as `daedalus-service`. They are consumed by the Daedalus FastAPI backend only — not by any frontend.
### Workspace lifecycle
| Method | Route | Purpose |
|--------|-------|---------|
| `POST` | `/library/api/workspaces/` | Create workspace Library. Body: `{workspace_id, name, library_type, description?}`. Idempotent on `workspace_id`. `library_type` frozen at create. |
| `GET` | `/library/api/workspaces/{workspace_id}/` | Workspace status (item_count, chunk_count, library_uid). |
| `DELETE` | `/library/api/workspaces/{workspace_id}/` | Delete workspace Library + reachable content. Concept-safe: orphan-only Concept GC; concepts referenced by other libraries survive. |
### Ingest
| Method | Route | Purpose |
|--------|-------|---------|
| `POST` | `/library/api/ingest/` | Accept a file (already in S3) for ingestion + embedding |
| `GET` | `/library/api/jobs/{job_id}/` | Poll job status |
| `POST` | `/library/api/jobs/{job_id}/retry/` | Retry a failed job |
| `GET` | `/library/api/jobs/?status=&library_uid=` | List recent jobs |
### Model: `IngestJob`
Lives in `library/models.py` (Django ORM on PostgreSQL, not Neo4j). Migration: `library/migrations/0001_initial.py`.
```python
class IngestJob(models.Model):
"""Tracks the lifecycle of a content ingestion + embedding job."""
id = models.CharField(max_length=64, primary_key=True)
item_uid = models.CharField(max_length=64, db_index=True)
celery_task_id = models.CharField(max_length=255, blank=True)
status = models.CharField(
max_length=20,
choices=[
("pending", "Pending"),
("processing", "Processing"),
("completed", "Completed"),
("failed", "Failed"),
],
default="pending",
db_index=True,
)
progress = models.CharField(max_length=50, default="queued")
error = models.TextField(blank=True, null=True)
retry_count = models.PositiveIntegerField(default=0)
chunks_created = models.PositiveIntegerField(default=0)
concepts_extracted = models.PositiveIntegerField(default=0)
embedding_model = models.CharField(max_length=100, blank=True)
source = models.CharField(max_length=50, default="")
source_ref = models.CharField(max_length=200, blank=True)
s3_key = models.CharField(max_length=500)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ["-created_at"]
indexes = [
models.Index(fields=["status", "-created_at"]),
models.Index(fields=["source", "source_ref"]),
]
```
### Ingest Request Schema
The target Library can be specified by either `workspace_id` (preferred for Daedalus) or `library_uid`. Idempotency key: `(library, source_ref, content_hash)`. Same triple → existing job returned. New `content_hash` for the same `source_ref` → supersedes the prior Item.
```json
{
"s3_key": "workspaces/ws_abc/files/f_def/report.pdf",
"title": "Q4 Technical Report",
"workspace_id": "ws_abc",
"file_type": "application/pdf",
"file_size": 245000,
"content_hash": "<sha256 hex, 64 chars>",
"source": "daedalus",
"source_ref": "ws_abc/f_def"
}
```
### Job Status Response Schema
```json
{
"job_id": "job_789xyz",
"item_uid": "item_abc123",
"status": "processing",
"progress": "embedding",
"chunks_created": 0,
"concepts_extracted": 0,
"embedding_model": "qwen3-vl-embedding-8b",
"started_at": "2026-03-12T15:42:01Z",
"completed_at": null,
"error": null
}
```
### ⚠️ DEBUG LOG Points — Ingest Endpoint
| Location | Log Event | Level | What to Log |
|----------|-----------|-------|-------------|
| Request received | `ingest_request_received` | INFO | s3_key, title, library_uid, file_type, source, source_ref |
| S3 key validation | `ingest_s3_key_check` | DEBUG | s3_key, exists (bool), bucket name |
| Library lookup | `ingest_library_lookup` | DEBUG | library_uid, found (bool), library_type |
| Item node creation | `ingest_item_created` | INFO | item_uid, title, library_uid, collection_uid |
| Celery task dispatch | `ingest_task_dispatched` | INFO | job_id, item_uid, celery_task_id, queue name |
| Celery task dispatch failure | `ingest_task_dispatch_failed` | ERROR | job_id, item_uid, exception details |
---
## 3. Celery Embedding Pipeline
### Task: `ingest_from_daedalus`
Defined in `library/tasks.py`. Routed to the `embedding` queue (per `CELERY_TASK_ROUTES["library.tasks.ingest_*"]`). Wraps the existing `EmbeddingPipeline.process_item`.
```python
@shared_task(
name="library.tasks.ingest_from_daedalus",
bind=True,
queue="embedding",
max_retries=3,
default_retry_delay=60,
acks_late=True,
)
def ingest_from_daedalus(self, job_id: str): ...
```
### Task flow (as built)
1. Mark job `processing`, set `started_at`.
2. Resolve target Library by `library_uid`.
3. If a prior Item exists for this Library with the same `source_ref` but a *different* `content_hash`, delete it (chunks + images + embeddings) before continuing.
4. Fetch file bytes from the Daedalus S3 bucket via `library.services.daedalus_s3.fetch_from_daedalus`.
5. Create the `Item` neomodel node with `s3_key=items/{item_uid}/original.{ext}` and copy bytes into Mnemosyne's own bucket.
6. Connect to a default Collection for the Library (auto-created on first ingest).
7. Run `EmbeddingPipeline.process_item(item.uid)` — chunk per `library_type`, embed via the configured model, write Chunks + Concepts to Neo4j.
8. Mark job `completed` with `chunks_created`, `concepts_extracted`, `embedding_model`, `completed_at`.
On any exception with retries remaining: re-raise via `self.retry()` (exponential backoff). On terminal failure: mark job `failed` with the exception text.
### ⚠️ DEBUG LOG Points — Celery Worker (Critical)
These are the most important log points in the entire integration. Without them, debugging async embedding failures is nearly impossible.
| Location | Log Event | Level | What to Log |
|----------|-----------|-------|-------------|
| Task pickup | `embed_task_started` | INFO | job_id, item_uid, worker hostname, retry count |
| S3 fetch start | `embed_s3_fetch_start` | DEBUG | s3_key, source bucket |
| S3 fetch complete | `embed_s3_fetch_complete` | DEBUG | s3_key, file_size, duration_ms |
| S3 fetch failed | `embed_s3_fetch_failed` | ERROR | s3_key, error, retry_count |
| S3 cross-bucket copy start | `s3_cross_bucket_copy_start` | DEBUG | source_bucket, source_key, dest_bucket, dest_key |
| S3 cross-bucket copy complete | `s3_cross_bucket_copy_complete` | DEBUG | source_key, dest_key, file_size, duration_ms |
| S3 cross-bucket copy failed | `s3_cross_bucket_copy_failed` | ERROR | source_bucket, source_key, error |
| Chunking start | `embed_chunking_start` | DEBUG | library_type, strategy, chunk_size, chunk_overlap |
| Chunking complete | `embed_chunking_complete` | INFO | chunks_created, avg_chunk_size |
| Chunking failed | `embed_chunking_failed` | ERROR | file_type, error |
| Embedding start | `embed_vectors_start` | DEBUG | model_name, dimensions, batch_size, total_chunks |
| Embedding complete | `embed_vectors_complete` | INFO | model_name, duration_ms, tokens_processed |
| Embedding failed | `embed_vectors_failed` | ERROR | model_name, chunk_index, error |
| Neo4j write start | `embed_neo4j_write_start` | DEBUG | chunks_to_write count |
| Neo4j write complete | `embed_neo4j_write_complete` | INFO | chunks_written, duration_ms |
| Neo4j write failed | `embed_neo4j_write_failed` | ERROR | chunk_index, neo4j_error |
| Concept extraction start | `embed_concepts_start` | DEBUG | model_name |
| Concept extraction complete | `embed_concepts_complete` | INFO | concepts_extracted, concept_names, duration_ms |
| Graph build start | `embed_graph_build_start` | DEBUG | — |
| Graph build complete | `embed_graph_build_complete` | INFO | relationships_created, duration_ms |
| Job completed | `embed_job_completed` | INFO | job_id, item_uid, total_duration_ms, chunks, concepts |
| Job failed | `embed_job_failed` | ERROR | job_id, item_uid, exception_type, error, full traceback |
---
## 4. S3 Bucket Strategy
Mnemosyne uses its own bucket (`mnemosyne-content`, Terraform-provisioned per Phase 1). On ingest, the Celery worker copies the file from the Daedalus bucket to Mnemosyne's bucket.
```
mnemosyne-content bucket
├── items/
│ └── {item_uid}/
│ └── original/{filename} ← copied from Daedalus bucket
│ └── chunks/
│ └── chunk_000.txt
│ └── chunk_001.txt
├── images/
│ └── {image_uid}/{filename}
```
### Configuration
```bash
# .env additions
# Mnemosyne's own bucket (existing)
AWS_STORAGE_BUCKET_NAME=mnemosyne-content
# Cross-bucket read access to Daedalus bucket
DAEDALUS_S3_BUCKET_NAME=daedalus
DAEDALUS_S3_ENDPOINT_URL=http://incus-s3.incus:9000
DAEDALUS_S3_ACCESS_KEY_ID=${VAULT_DAEDALUS_S3_READ_KEY}
DAEDALUS_S3_SECRET_ACCESS_KEY=${VAULT_DAEDALUS_S3_READ_SECRET}
# MCP server
MCP_SERVER_PORT=22091
MCP_REQUIRE_AUTH=False
```
---
## 5. Prometheus Metrics
```
# MCP tool calls
mnemosyne_mcp_tool_invocations_total{tool,status} counter
mnemosyne_mcp_tool_duration_seconds{tool} histogram
# Ingest pipeline
mnemosyne_ingest_jobs_total{status} counter
mnemosyne_ingest_duration_seconds{library_type} histogram
mnemosyne_chunks_created_total{library_type} counter
mnemosyne_concepts_extracted_total counter
mnemosyne_embeddings_generated_total{model} counter
mnemosyne_embedding_duration_seconds{model} histogram
# Search performance
mnemosyne_search_duration_seconds{search_type} histogram
mnemosyne_search_results_total{search_type} counter
mnemosyne_rerank_duration_seconds{model} histogram
# Infrastructure
mnemosyne_neo4j_query_duration_seconds{query_type} histogram
mnemosyne_s3_operations_total{operation,status} counter
```
---
## 6. Implementation Phases (Mnemosyne-specific)
### Phase 1 — REST API for Daedalus (workspace + ingest) ✅ Implemented
- [x] `Library.workspace_id` + `library_type` enum (added `business`, `finance`)
- [x] `IngestJob` Django ORM model + migration `0001_initial.py`
- [x] `POST /library/api/workspaces/`, `GET /library/api/workspaces/{id}/`, `DELETE /library/api/workspaces/{id}/` (concept-safe)
- [x] `POST /library/api/ingest/` with `(library, source_ref, content_hash)` idempotency
- [x] `GET /library/api/jobs/{job_id}/`, `POST .../retry/`, `GET /library/api/jobs/`
- [x] `library.tasks.ingest_from_daedalus` Celery task with content-hash-aware supersede logic
- [x] `library.services.daedalus_s3` cross-bucket fetch + copy
- [x] HTTP Basic auth via `daedalus-service` user
### Phase 2 — MCP Server (Mnemosyne roadmap Phase 5) ✅ Implemented
- [x] `mcp_server/` module following the [Django MCP Pattern](Pattern_Django-MCP_V1-00.md)
- [x] `search` tool (hybrid vector + fulltext + concept-graph + Synesis re-rank)
- [x] `get_chunk` tool (full text by chunk_uid)
- [x] `list_libraries`, `list_collections`, `list_items` discovery tools
- [x] `get_health` tool (Neo4j + S3 + embedding model probes)
- [x] Workspace_id parameter on every search/discovery tool (undocumented to LLM, scoping enforced in Cypher)
- [x] Single-mode rule: workspace-scoped vs global, never both in one query
- [x] ASGI mount + uvicorn deployment on port 22091; nginx proxies via `/mcp/` on 23090
- [x] Prometheus metrics (`mnemosyne_mcp_*`)
### Phase 3 — Per-turn token access control for Daedalus integration 📋 Deferred
The Phase 2 MCP server is search-capable but currently has no token-based library-access scoping beyond `workspace_id` (which is parameter-level, not auth-level). The intended production access-control layer for the Daedalus integration is a per-turn signed token model:
- Daedalus mints short-lived tokens carrying `{sub: agent_id, workspace_id, allowed_libraries, exp}`.
- Pallas forwards the inbound bearer to its outgoing Mnemosyne MCP calls (requires a small upstream patch — see Daedalus-side §9.4).
- Mnemosyne's MCP token validator extracts the claims; search Cypher additionally filters `WHERE lib.uid IN $allowed_libraries`.
- Workspace libraries are auto-included in the per-turn token's allowed list when the agent is being invoked from that workspace.
Mnemosyne-side work for Phase 3:
- [ ] Extend `MCPToken` (or sibling) to carry signed claims `{workspace_id, allowed_libraries, exp}`
- [ ] Token validator reads claims, attaches them to the FastMCP request context
- [ ] `search` / `list_*` tools consult claim-derived allowed-library set in addition to existing parameter filters
- [ ] Document the JWT/signing format Daedalus mints to (likely HS256 with a shared secret in Vault, or RS256 against Daedalus's JWKS — TBD)
See the Daedalus-side spec [§9](../../daedalus/docs/mnemosyne_integration.md#9-phase-2--knowledge-library-access-control-deferred) for the full integration architecture.