Files
mnemosyne/docs/mnemosyne_integration.md
Robert Helewka e5618973fc docs(integration): mark Phases 1+2 as implemented; add Phase 3 stub
The integration doc was forward-looking spec but most of it now ships:

  Phase 1 (REST workspace + ingest API for Daedalus)         implemented
  Phase 2 (MCP server: search/get_chunk/list_*/get_health)   implemented
  Phase 3 (per-turn signed-token access control)            📋 deferred

Updated:
- Tool table reflects actual implementation (search, get_chunk,
  list_libraries, list_collections, list_items, get_health) instead
  of the speculative names (search_knowledge, search_by_category, etc.)
- Project structure matches the as-built layout (tools/discovery.py
  exists; no separate browse.py).
- REST API table covers both workspace lifecycle endpoints and ingest
  endpoints, with correct routes (/library/api/...).
- Ingest request schema includes content_hash and workspace_id
  (the actual idempotency key on the Mnemosyne side).
- Celery task description matches library.tasks.ingest_from_daedalus
  rather than the placeholder embed_item.
- Phase 6 checklist marks Phases 1+2 done; adds Phase 3 (per-turn
  token access control) with a per-Mnemosyne-side TODO list pointing
  at the matching Daedalus-side §9 design.

Internal MCP port stays 22091; public access via nginx on 23090.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-02 21:54:05 -04:00

19 KiB

Mnemosyne Integration — Daedalus & Pallas Reference

This document describes Mnemosyne's role in the Daedalus + Pallas architecture and what's actually built today. The Daedalus-side spec lives in daedalus/docs/mnemosyne_integration.md.


Overview

Mnemosyne exposes two interfaces for the wider Ouranos ecosystem:

  1. REST API (/library/api/*) — consumed by the Daedalus backend (HTTP Basic auth, service account daedalus-service) for workspace lifecycle and asynchronous file ingestion. Phase 1, implemented.
  2. MCP Server (port 22091 internal, /mcp/ via nginx on 23090) — exposes search, browse, and retrieval tools. Phase 5 of Mnemosyne's own roadmap, implemented with workspace_id scoping. Currently consumed by an internal validator (validator/) and ad-hoc clients; planned production consumer is Pallas FastAgents in Daedalus integration Phase 2 (deferred — see Phase 3 of this doc).

Phase status

Phase What Status
1. REST workspace + ingest API for Daedalus POST /workspaces/, DELETE /workspaces/{id}/, POST /ingest/, GET /jobs/{id}/ Implemented
2. MCP Server (Mnemosyne roadmap Phase 5) search, get_chunk, list_libraries, list_collections, list_items, get_health Implemented (workspace_id scoping in place; access-control to follow in Phase 3)
3. Per-turn signed-token access control for Daedalus integration Daedalus mints tokens carrying {workspace_id, allowed_libraries} claims; Mnemosyne validates and scopes search server-side Deferred

1. MCP Server

Port & URL

Endpoint Internal Public (via nginx)
MCP server http://mcp:22091/mcp/ http://puck.incus:23090/mcp/
Health check http://mcp:22091/mcp/health http://puck.incus:23090/healthz

Project structure (as built)

Follows the Django MCP Pattern:

mnemosyne/mnemosyne/mcp_server/
├── __init__.py
├── server.py              # FastMCP instance + tool registration
├── auth.py                # MCPAuthMiddleware
├── context.py             # get_mcp_user(), get_mcp_token()
└── tools/
    ├── __init__.py
    ├── search.py          # register_search_tools(mcp) → search, get_chunk
    ├── discovery.py       # register_discovery_tools(mcp) → list_libraries, list_collections, list_items
    └── health.py          # register_health_tools(mcp) → get_health

The ASGI mount lives at mnemosyne/mnemosyne/asgi.py (project-level) — it composes the FastMCP app at /mcp/ with a 307 redirect from bare /mcp so MCP clients that omit the trailing slash still land correctly.

Tools (as implemented)

Tool Module Description
search search.py Hybrid vector + full-text + concept-graph search → fusion → optional Synesis re-rank. Accepts library_uid, library_type, collection_uid, and (system-injected, undocumented to LLM) workspace_id for scoping.
get_chunk search.py Fetch full text of a chunk by uid (typically obtained from search). Honors workspace_id scoping.
list_libraries discovery.py List libraries with uid, name, library_type, description. Workspace_id-aware.
list_collections discovery.py List collections, optionally filtered by parent library. Workspace_id-aware.
list_items discovery.py List items with chunk_count, image_count, embedding_status. Workspace_id-aware.
get_health health.py Check Neo4j, S3, embedding model reachability. Used by Pallas health pollers.

The workspace_id parameter is present on every search/discovery tool but is deliberately undocumented in the LLM-facing tool description — it's a system-injected field the calling LLM should never know about. A workspace-scoped query returns ONLY that workspace's content; an unscoped query (workspace_id is NULL) returns ONLY global libraries. There is no mode that mixes the two — see library/services/search.py, _WORKSPACE_SCOPE_CLAUSE.

MCP Resources

Resource URI Source
mnemosyne://library-types library/content_types.pyLIBRARY_TYPE_DEFAULTS
mnemosyne://libraries Library.nodes.order_by("name") serialized to JSON

Deployment

Separate Uvicorn process alongside Django's Gunicorn:

# Django WSGI (existing)
gunicorn --bind :22090 --workers 3 mnemosyne.wsgi

# MCP ASGI (new)
uvicorn mcp_server.asgi:app --host 0.0.0.0 --port 22091 --workers 1

Auth is disabled (MCP_REQUIRE_AUTH=False) since all traffic is internal (10.10.0.0/24).

⚠️ DEBUG LOG Points — MCP Server

Location Log Event Level What to Log
Tool dispatch mcp_tool_called DEBUG Tool name, all input parameters
Vector search mcp_search_vector_query DEBUG Query text, embedding dims, library filter, limit
Vector search result mcp_search_vector_results DEBUG Candidate count, top/lowest scores
Full-text search mcp_search_fulltext_query DEBUG Query terms, index used
Re-ranking mcp_search_rerank DEBUG Candidates in/out, reranker model, duration_ms
Graph traversal mcp_graph_traverse DEBUG Starting node UID, relationships, depth, nodes visited
Neo4j query mcp_neo4j_query DEBUG Cypher query (parameterized), execution time_ms
Tool response mcp_tool_response DEBUG Tool name, result size (bytes/items), duration_ms
Health check mcp_health_check DEBUG Each dependency status, overall result

Important: All neomodel ORM calls inside async tool functions must be wrapped with sync_to_async(thread_sensitive=True).


2. REST API for Daedalus

All endpoints require HTTP Basic auth as daedalus-service. They are consumed by the Daedalus FastAPI backend only — not by any frontend.

Workspace lifecycle

Method Route Purpose
POST /library/api/workspaces/ Create workspace Library. Body: {workspace_id, name, library_type, description?}. Idempotent on workspace_id. library_type frozen at create.
GET /library/api/workspaces/{workspace_id}/ Workspace status (item_count, chunk_count, library_uid).
DELETE /library/api/workspaces/{workspace_id}/ Delete workspace Library + reachable content. Concept-safe: orphan-only Concept GC; concepts referenced by other libraries survive.

Ingest

Method Route Purpose
POST /library/api/ingest/ Accept a file (already in S3) for ingestion + embedding
GET /library/api/jobs/{job_id}/ Poll job status
POST /library/api/jobs/{job_id}/retry/ Retry a failed job
GET /library/api/jobs/?status=&library_uid= List recent jobs

Model: IngestJob

Lives in library/models.py (Django ORM on PostgreSQL, not Neo4j). Migration: library/migrations/0001_initial.py.

class IngestJob(models.Model):
    """Tracks the lifecycle of a content ingestion + embedding job."""

    id = models.CharField(max_length=64, primary_key=True)
    item_uid = models.CharField(max_length=64, db_index=True)
    celery_task_id = models.CharField(max_length=255, blank=True)

    status = models.CharField(
        max_length=20,
        choices=[
            ("pending", "Pending"),
            ("processing", "Processing"),
            ("completed", "Completed"),
            ("failed", "Failed"),
        ],
        default="pending",
        db_index=True,
    )
    progress = models.CharField(max_length=50, default="queued")
    error = models.TextField(blank=True, null=True)
    retry_count = models.PositiveIntegerField(default=0)

    chunks_created = models.PositiveIntegerField(default=0)
    concepts_extracted = models.PositiveIntegerField(default=0)
    embedding_model = models.CharField(max_length=100, blank=True)

    source = models.CharField(max_length=50, default="")
    source_ref = models.CharField(max_length=200, blank=True)
    s3_key = models.CharField(max_length=500)

    created_at = models.DateTimeField(auto_now_add=True)
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ["-created_at"]
        indexes = [
            models.Index(fields=["status", "-created_at"]),
            models.Index(fields=["source", "source_ref"]),
        ]

Ingest Request Schema

The target Library can be specified by either workspace_id (preferred for Daedalus) or library_uid. Idempotency key: (library, source_ref, content_hash). Same triple → existing job returned. New content_hash for the same source_ref → supersedes the prior Item.

{
  "s3_key": "workspaces/ws_abc/files/f_def/report.pdf",
  "title": "Q4 Technical Report",
  "workspace_id": "ws_abc",
  "file_type": "application/pdf",
  "file_size": 245000,
  "content_hash": "<sha256 hex, 64 chars>",
  "source": "daedalus",
  "source_ref": "ws_abc/f_def"
}

Job Status Response Schema

{
  "job_id": "job_789xyz",
  "item_uid": "item_abc123",
  "status": "processing",
  "progress": "embedding",
  "chunks_created": 0,
  "concepts_extracted": 0,
  "embedding_model": "qwen3-vl-embedding-8b",
  "started_at": "2026-03-12T15:42:01Z",
  "completed_at": null,
  "error": null
}

⚠️ DEBUG LOG Points — Ingest Endpoint

Location Log Event Level What to Log
Request received ingest_request_received INFO s3_key, title, library_uid, file_type, source, source_ref
S3 key validation ingest_s3_key_check DEBUG s3_key, exists (bool), bucket name
Library lookup ingest_library_lookup DEBUG library_uid, found (bool), library_type
Item node creation ingest_item_created INFO item_uid, title, library_uid, collection_uid
Celery task dispatch ingest_task_dispatched INFO job_id, item_uid, celery_task_id, queue name
Celery task dispatch failure ingest_task_dispatch_failed ERROR job_id, item_uid, exception details

3. Celery Embedding Pipeline

Task: ingest_from_daedalus

Defined in library/tasks.py. Routed to the embedding queue (per CELERY_TASK_ROUTES["library.tasks.ingest_*"]). Wraps the existing EmbeddingPipeline.process_item.

@shared_task(
    name="library.tasks.ingest_from_daedalus",
    bind=True,
    queue="embedding",
    max_retries=3,
    default_retry_delay=60,
    acks_late=True,
)
def ingest_from_daedalus(self, job_id: str): ...

Task flow (as built)

  1. Mark job processing, set started_at.
  2. Resolve target Library by library_uid.
  3. If a prior Item exists for this Library with the same source_ref but a different content_hash, delete it (chunks + images + embeddings) before continuing.
  4. Fetch file bytes from the Daedalus S3 bucket via library.services.daedalus_s3.fetch_from_daedalus.
  5. Create the Item neomodel node with s3_key=items/{item_uid}/original.{ext} and copy bytes into Mnemosyne's own bucket.
  6. Connect to a default Collection for the Library (auto-created on first ingest).
  7. Run EmbeddingPipeline.process_item(item.uid) — chunk per library_type, embed via the configured model, write Chunks + Concepts to Neo4j.
  8. Mark job completed with chunks_created, concepts_extracted, embedding_model, completed_at.

On any exception with retries remaining: re-raise via self.retry() (exponential backoff). On terminal failure: mark job failed with the exception text.

⚠️ DEBUG LOG Points — Celery Worker (Critical)

These are the most important log points in the entire integration. Without them, debugging async embedding failures is nearly impossible.

Location Log Event Level What to Log
Task pickup embed_task_started INFO job_id, item_uid, worker hostname, retry count
S3 fetch start embed_s3_fetch_start DEBUG s3_key, source bucket
S3 fetch complete embed_s3_fetch_complete DEBUG s3_key, file_size, duration_ms
S3 fetch failed embed_s3_fetch_failed ERROR s3_key, error, retry_count
S3 cross-bucket copy start s3_cross_bucket_copy_start DEBUG source_bucket, source_key, dest_bucket, dest_key
S3 cross-bucket copy complete s3_cross_bucket_copy_complete DEBUG source_key, dest_key, file_size, duration_ms
S3 cross-bucket copy failed s3_cross_bucket_copy_failed ERROR source_bucket, source_key, error
Chunking start embed_chunking_start DEBUG library_type, strategy, chunk_size, chunk_overlap
Chunking complete embed_chunking_complete INFO chunks_created, avg_chunk_size
Chunking failed embed_chunking_failed ERROR file_type, error
Embedding start embed_vectors_start DEBUG model_name, dimensions, batch_size, total_chunks
Embedding complete embed_vectors_complete INFO model_name, duration_ms, tokens_processed
Embedding failed embed_vectors_failed ERROR model_name, chunk_index, error
Neo4j write start embed_neo4j_write_start DEBUG chunks_to_write count
Neo4j write complete embed_neo4j_write_complete INFO chunks_written, duration_ms
Neo4j write failed embed_neo4j_write_failed ERROR chunk_index, neo4j_error
Concept extraction start embed_concepts_start DEBUG model_name
Concept extraction complete embed_concepts_complete INFO concepts_extracted, concept_names, duration_ms
Graph build start embed_graph_build_start DEBUG
Graph build complete embed_graph_build_complete INFO relationships_created, duration_ms
Job completed embed_job_completed INFO job_id, item_uid, total_duration_ms, chunks, concepts
Job failed embed_job_failed ERROR job_id, item_uid, exception_type, error, full traceback

4. S3 Bucket Strategy

Mnemosyne uses its own bucket (mnemosyne-content, Terraform-provisioned per Phase 1). On ingest, the Celery worker copies the file from the Daedalus bucket to Mnemosyne's bucket.

mnemosyne-content bucket
├── items/
│   └── {item_uid}/
│       └── original/{filename}     ← copied from Daedalus bucket
│       └── chunks/
│           └── chunk_000.txt
│           └── chunk_001.txt
├── images/
│   └── {image_uid}/{filename}

Configuration

# .env additions

# Mnemosyne's own bucket (existing)
AWS_STORAGE_BUCKET_NAME=mnemosyne-content

# Cross-bucket read access to Daedalus bucket
DAEDALUS_S3_BUCKET_NAME=daedalus
DAEDALUS_S3_ENDPOINT_URL=http://incus-s3.incus:9000
DAEDALUS_S3_ACCESS_KEY_ID=${VAULT_DAEDALUS_S3_READ_KEY}
DAEDALUS_S3_SECRET_ACCESS_KEY=${VAULT_DAEDALUS_S3_READ_SECRET}

# MCP server
MCP_SERVER_PORT=22091
MCP_REQUIRE_AUTH=False

5. Prometheus Metrics

# MCP tool calls
mnemosyne_mcp_tool_invocations_total{tool,status}              counter
mnemosyne_mcp_tool_duration_seconds{tool}                      histogram

# Ingest pipeline
mnemosyne_ingest_jobs_total{status}                            counter
mnemosyne_ingest_duration_seconds{library_type}                histogram
mnemosyne_chunks_created_total{library_type}                   counter
mnemosyne_concepts_extracted_total                             counter
mnemosyne_embeddings_generated_total{model}                    counter
mnemosyne_embedding_duration_seconds{model}                    histogram

# Search performance
mnemosyne_search_duration_seconds{search_type}                 histogram
mnemosyne_search_results_total{search_type}                    counter
mnemosyne_rerank_duration_seconds{model}                       histogram

# Infrastructure
mnemosyne_neo4j_query_duration_seconds{query_type}             histogram
mnemosyne_s3_operations_total{operation,status}                counter

6. Implementation Phases (Mnemosyne-specific)

Phase 1 — REST API for Daedalus (workspace + ingest) Implemented

  • Library.workspace_id + library_type enum (added business, finance)
  • IngestJob Django ORM model + migration 0001_initial.py
  • POST /library/api/workspaces/, GET /library/api/workspaces/{id}/, DELETE /library/api/workspaces/{id}/ (concept-safe)
  • POST /library/api/ingest/ with (library, source_ref, content_hash) idempotency
  • GET /library/api/jobs/{job_id}/, POST .../retry/, GET /library/api/jobs/
  • library.tasks.ingest_from_daedalus Celery task with content-hash-aware supersede logic
  • library.services.daedalus_s3 cross-bucket fetch + copy
  • HTTP Basic auth via daedalus-service user

Phase 2 — MCP Server (Mnemosyne roadmap Phase 5) Implemented

  • mcp_server/ module following the Django MCP Pattern
  • search tool (hybrid vector + fulltext + concept-graph + Synesis re-rank)
  • get_chunk tool (full text by chunk_uid)
  • list_libraries, list_collections, list_items discovery tools
  • get_health tool (Neo4j + S3 + embedding model probes)
  • Workspace_id parameter on every search/discovery tool (undocumented to LLM, scoping enforced in Cypher)
  • Single-mode rule: workspace-scoped vs global, never both in one query
  • ASGI mount + uvicorn deployment on port 22091; nginx proxies via /mcp/ on 23090
  • Prometheus metrics (mnemosyne_mcp_*)

Phase 3 — Per-turn token access control for Daedalus integration 📋 Deferred

The Phase 2 MCP server is search-capable but currently has no token-based library-access scoping beyond workspace_id (which is parameter-level, not auth-level). The intended production access-control layer for the Daedalus integration is a per-turn signed token model:

  • Daedalus mints short-lived tokens carrying {sub: agent_id, workspace_id, allowed_libraries, exp}.
  • Pallas forwards the inbound bearer to its outgoing Mnemosyne MCP calls (requires a small upstream patch — see Daedalus-side §9.4).
  • Mnemosyne's MCP token validator extracts the claims; search Cypher additionally filters WHERE lib.uid IN $allowed_libraries.
  • Workspace libraries are auto-included in the per-turn token's allowed list when the agent is being invoked from that workspace.

Mnemosyne-side work for Phase 3:

  • Extend MCPToken (or sibling) to carry signed claims {workspace_id, allowed_libraries, exp}
  • Token validator reads claims, attaches them to the FastMCP request context
  • search / list_* tools consult claim-derived allowed-library set in addition to existing parameter filters
  • Document the JWT/signing format Daedalus mints to (likely HS256 with a shared secret in Vault, or RS256 against Daedalus's JWKS — TBD)

See the Daedalus-side spec §9 for the full integration architecture.