From c485a8560ca3efccaeaa9ea7922f5a4aec8bb78f Mon Sep 17 00:00:00 2001 From: Robert Helewka Date: Wed, 29 Apr 2026 06:26:48 -0400 Subject: [PATCH] feat(ingest): add Daedalus cross-bucket S3 fetch + ingest_from_daedalus task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds DAEDALUS_S3_* settings (read-only credentials for the Daedalus bucket) and a small `daedalus_s3.py` helper that fetches a file from Daedalus's bucket and writes it into Mnemosyne's bucket via default_storage. Adds the Celery task `library.tasks.ingest_from_daedalus`. Given an IngestJob row, it: 1. Resolves the target Library (by library_uid). 2. Supersedes a prior Item with the same source_ref but different content_hash by deleting the old Item + chunks first. 3. Fetches from Daedalus S3, copies into items/{item_uid}/original.{ext}. 4. Creates the Item node, links it to a default Collection. 5. Runs the existing EmbeddingPipeline.process_item. 6. Marks the job completed with chunks/concepts counts. Failures retry up to 3× with exponential backoff; final failure marks the job failed with the exception text. Routed to the embedding queue so single-worker setups must consume it. Co-Authored-By: Claude Opus 4.7 --- mnemosyne/.env example | 17 ++ mnemosyne/library/services/daedalus_s3.py | 70 ++++++++ mnemosyne/library/tasks.py | 210 ++++++++++++++++++++++ mnemosyne/mnemosyne/settings.py | 13 ++ 4 files changed, 310 insertions(+) create mode 100644 mnemosyne/library/services/daedalus_s3.py diff --git a/mnemosyne/.env example b/mnemosyne/.env example index 21a62f6..44c69d5 100644 --- a/mnemosyne/.env example +++ b/mnemosyne/.env example @@ -40,6 +40,23 @@ AWS_S3_REGION_NAME=us-east-1 # Set to True to use local FileSystemStorage instead of S3 (dev/test) USE_LOCAL_STORAGE=True +# --- Daedalus S3 (cross-bucket reads for ingest) --- +# Mnemosyne ingests files from the Daedalus S3 bucket. These vars +# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME +# (Mnemosyne's own bucket) by the ingest Celery task. +DAEDALUS_S3_ENDPOINT_URL= +DAEDALUS_S3_ACCESS_KEY_ID= +DAEDALUS_S3_SECRET_ACCESS_KEY= +DAEDALUS_S3_BUCKET_NAME=daedalus +DAEDALUS_S3_REGION_NAME=us-east-1 +DAEDALUS_S3_USE_SSL=False +DAEDALUS_S3_VERIFY=True + +# --- MCP Server --- +# Set to False for internal-only deployments where the MCP transport is +# already on a trusted network (10.10.0.0/24). +MCP_REQUIRE_AUTH=True + # --- Email (smtp4dev on Oberon) --- EMAIL_HOST=oberon.incus EMAIL_PORT=22025 diff --git a/mnemosyne/library/services/daedalus_s3.py b/mnemosyne/library/services/daedalus_s3.py new file mode 100644 index 0000000..d75636c --- /dev/null +++ b/mnemosyne/library/services/daedalus_s3.py @@ -0,0 +1,70 @@ +""" +Cross-bucket S3 helper for ingesting files from the Daedalus S3 bucket +into Mnemosyne's own bucket. + +Daedalus uploads files to its own bucket (configured per Daedalus deployment) +and posts an ingest request to Mnemosyne with the s3_key. This module fetches +that file using read-only Daedalus credentials and writes it to Mnemosyne's +bucket via the standard `default_storage` backend so the rest of the pipeline +(parsing, chunking, embedding) works unchanged. +""" + +import logging + +import boto3 +from django.conf import settings +from django.core.files.base import ContentFile +from django.core.files.storage import default_storage + +logger = logging.getLogger(__name__) + + +def _daedalus_s3_client(): + """Build a boto3 S3 client pointed at the Daedalus bucket.""" + return boto3.client( + "s3", + endpoint_url=settings.DAEDALUS_S3_ENDPOINT_URL or None, + aws_access_key_id=settings.DAEDALUS_S3_ACCESS_KEY_ID, + aws_secret_access_key=settings.DAEDALUS_S3_SECRET_ACCESS_KEY, + region_name=settings.DAEDALUS_S3_REGION_NAME, + use_ssl=settings.DAEDALUS_S3_USE_SSL, + verify=settings.DAEDALUS_S3_VERIFY, + ) + + +def fetch_from_daedalus(daedalus_s3_key: str) -> bytes: + """ + Read a file from the Daedalus S3 bucket. + + :param daedalus_s3_key: Object key in the Daedalus bucket. + :returns: File bytes. + :raises: botocore exceptions on failure (caller decides retry). + """ + client = _daedalus_s3_client() + bucket = settings.DAEDALUS_S3_BUCKET_NAME + logger.debug( + "Fetching from Daedalus S3 bucket=%s key=%s", bucket, daedalus_s3_key + ) + response = client.get_object(Bucket=bucket, Key=daedalus_s3_key) + data = response["Body"].read() + logger.info( + "Fetched from Daedalus S3 bucket=%s key=%s size=%d", + bucket, daedalus_s3_key, len(data), + ) + return data + + +def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str: + """ + Write bytes into Mnemosyne's S3 bucket via the default_storage backend. + + :param data: File bytes (already in memory). + :param mnemosyne_s3_key: Target object key in Mnemosyne's bucket. + :returns: The actual key written (may differ from requested if + `file_overwrite=False` and the key existed). + """ + saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data)) + logger.info( + "Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data), + ) + return saved_key diff --git a/mnemosyne/library/tasks.py b/mnemosyne/library/tasks.py index 9582d59..ddd6b05 100644 --- a/mnemosyne/library/tasks.py +++ b/mnemosyne/library/tasks.py @@ -10,6 +10,7 @@ import logging from celery import shared_task from django.core.cache import cache +from neomodel import db logger = logging.getLogger(__name__) @@ -280,3 +281,212 @@ def _resolve_user(user_id: int = None): return User.objects.get(pk=user_id) except Exception: return None + + +# --------------------------------------------------------------------------- +# Ingest task (Daedalus integration) +# --------------------------------------------------------------------------- + + +@shared_task( + name="library.tasks.ingest_from_daedalus", + bind=True, + queue="embedding", + max_retries=3, + default_retry_delay=60, + acks_late=True, +) +def ingest_from_daedalus(self, job_id: str): + """ + Process a single IngestJob: fetch from Daedalus S3 → create Item → + run embedding pipeline → mark complete. + + Idempotent on (library_uid, source_ref, content_hash) — handled in the + REST view that creates the IngestJob, so by the time this task runs the + job either represents new content or a content_hash-changed re-ingest. + + For a content_hash-changed re-ingest, the prior Item with the same + source_ref is deleted before the new one is processed (ensures no + stale chunks linger). + """ + from datetime import datetime, timezone + + from library.models import IngestJob, Item, Library + from library.services.daedalus_s3 import ( + copy_into_mnemosyne, + fetch_from_daedalus, + ) + from library.services.pipeline import EmbeddingPipeline + + logger.info( + "Task ingest_from_daedalus starting job_id=%s task_id=%s", + job_id, self.request.id, + ) + + try: + job = IngestJob.objects.get(pk=job_id) + except IngestJob.DoesNotExist: + logger.error("IngestJob not found job_id=%s", job_id) + return {"success": False, "error": "job_not_found"} + + job.status = "processing" + job.progress = "fetching" + job.started_at = datetime.now(timezone.utc) + job.celery_task_id = self.request.id + job.save(update_fields=["status", "progress", "started_at", "celery_task_id"]) + + try: + # --- 1. Resolve target Library --- + try: + lib = Library.nodes.get(uid=job.library_uid) + except Library.DoesNotExist: + raise RuntimeError(f"Library not found: {job.library_uid}") + + # --- 2. Supersede prior Item with same source_ref but different hash --- + prior_item_uid = None + if job.source_ref: + rows, _ = db.cypher_query( + """ + MATCH (l:Library {uid: $library_uid})-[:CONTAINS]->(:Collection) + -[:CONTAINS]->(i:Item) + WHERE i.metadata IS NOT NULL + AND i.metadata CONTAINS $source_ref_marker + RETURN i.uid LIMIT 1 + """, + { + "library_uid": lib.uid, + "source_ref_marker": f'"source_ref": "{job.source_ref}"', + }, + ) + if rows: + prior_item_uid = rows[0][0] + logger.info( + "Superseding prior Item job_id=%s prior_item_uid=%s", + job_id, prior_item_uid, + ) + _delete_item_and_chunks(prior_item_uid) + + # --- 3. Fetch from Daedalus, copy into Mnemosyne bucket --- + job.progress = "copying" + job.save(update_fields=["progress"]) + + data = fetch_from_daedalus(job.s3_key) + + # --- 4. Create Item node --- + ext = (job.file_type or "bin").lstrip(".").lower() or "bin" + item = Item( + title=job.title, + file_type=ext, + file_size=len(data), + content_hash=job.content_hash, + embedding_status="pending", + metadata={ + "source": job.source, + "source_ref": job.source_ref, + }, + ) + item.save() + + mnemosyne_s3_key = f"items/{item.uid}/original.{ext}" + copy_into_mnemosyne(data, mnemosyne_s3_key) + item.s3_key = mnemosyne_s3_key + item.save() + + # --- 5. Connect to library/collection --- + col = _resolve_or_create_default_collection(lib, job.collection_uid) + col.items.connect(item) + + job.item_uid = item.uid + job.save(update_fields=["item_uid"]) + + # --- 6. Run the embedding pipeline --- + job.progress = "embedding" + job.save(update_fields=["progress"]) + + def progress_cb(percent, message): + _update_progress(self, percent, message) + + pipeline = EmbeddingPipeline(user=None) + result = pipeline.process_item(item.uid, progress_callback=progress_cb) + + # --- 7. Mark complete --- + job.status = "completed" + job.progress = "done" + job.chunks_created = result.get("chunks_created", 0) + job.concepts_extracted = result.get("concepts_extracted", 0) + job.embedding_model = result.get("embedding_model", "") + job.completed_at = datetime.now(timezone.utc) + job.save() + + logger.info( + "Task ingest_from_daedalus completed job_id=%s item_uid=%s " + "chunks=%d concepts=%d", + job_id, item.uid, job.chunks_created, job.concepts_extracted, + ) + return { + "success": True, + "job_id": job_id, + "item_uid": item.uid, + **result, + } + + except Exception as exc: + logger.error( + "Task ingest_from_daedalus failed job_id=%s: %s", + job_id, exc, exc_info=True, + ) + if self.request.retries < self.max_retries: + job.retry_count = self.request.retries + 1 + job.save(update_fields=["retry_count"]) + raise self.retry(exc=exc) + + job.status = "failed" + job.error = str(exc) + job.completed_at = datetime.now(timezone.utc) + job.save(update_fields=["status", "error", "completed_at"]) + return {"success": False, "job_id": job_id, "error": str(exc)} + + +def _delete_item_and_chunks(item_uid: str): + """Delete an Item, its chunks, and its images. Concept GC is workspace-delete only.""" + db.cypher_query( + """ + MATCH (i:Item {uid: $uid}) + OPTIONAL MATCH (i)-[:HAS_CHUNK]->(c:Chunk) + OPTIONAL MATCH (i)-[:HAS_IMAGE]->(img:Image) + OPTIONAL MATCH (img)-[:HAS_EMBEDDING]->(emb:ImageEmbedding) + DETACH DELETE c, img, emb, i + """, + {"uid": item_uid}, + ) + + +def _resolve_or_create_default_collection(lib, collection_uid: str = ""): + """ + Find or create the default Collection for a Library. + + Daedalus integration creates one Collection per Library, named "default". + Explicit collection_uid is honored if provided. + """ + from library.models import Collection + + if collection_uid: + try: + return Collection.nodes.get(uid=collection_uid) + except Collection.DoesNotExist: + pass + + # Look for an existing "default" collection in this library + rows, _ = db.cypher_query( + "MATCH (l:Library {uid: $library_uid})-[:CONTAINS]->(c:Collection {name: 'default'}) " + "RETURN c.uid LIMIT 1", + {"library_uid": lib.uid}, + ) + if rows: + return Collection.nodes.get(uid=rows[0][0]) + + col = Collection(name="default", description="Default collection") + col.save() + lib.collections.connect(col) + col.library.connect(lib) + return col diff --git a/mnemosyne/mnemosyne/settings.py b/mnemosyne/mnemosyne/settings.py index 695a84d..cb52f33 100644 --- a/mnemosyne/mnemosyne/settings.py +++ b/mnemosyne/mnemosyne/settings.py @@ -181,6 +181,18 @@ else: }, } +# --- Daedalus S3 (cross-bucket reads for ingest) --- +# Mnemosyne ingests files written to Daedalus's S3 bucket. These vars +# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME +# (Mnemosyne's own bucket) by the Celery ingest task before processing. +DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="") +DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="") +DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="") +DAEDALUS_S3_BUCKET_NAME = env("DAEDALUS_S3_BUCKET_NAME", default="daedalus") +DAEDALUS_S3_REGION_NAME = env("DAEDALUS_S3_REGION_NAME", default="us-east-1") +DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False) +DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True) + # --- Celery / RabbitMQ --- CELERY_BROKER_URL = env( "CELERY_BROKER_URL", @@ -196,6 +208,7 @@ CELERY_TASK_ACKS_LATE = True CELERY_WORKER_PREFETCH_MULTIPLIER = 1 CELERY_TASK_ROUTES = { "library.tasks.embed_*": {"queue": "embedding"}, + "library.tasks.ingest_*": {"queue": "embedding"}, "library.tasks.batch_*": {"queue": "batch"}, }