feat(ingest): add Daedalus cross-bucket S3 fetch + ingest_from_daedalus task

Adds DAEDALUS_S3_* settings (read-only credentials for the Daedalus bucket)
and a small `daedalus_s3.py` helper that fetches a file from Daedalus's
bucket and writes it into Mnemosyne's bucket via default_storage.

Adds the Celery task `library.tasks.ingest_from_daedalus`. Given an
IngestJob row, it:
  1. Resolves the target Library (by library_uid).
  2. Supersedes a prior Item with the same source_ref but different
     content_hash by deleting the old Item + chunks first.
  3. Fetches from Daedalus S3, copies into items/{item_uid}/original.{ext}.
  4. Creates the Item node, links it to a default Collection.
  5. Runs the existing EmbeddingPipeline.process_item.
  6. Marks the job completed with chunks/concepts counts.

Failures retry up to 3× with exponential backoff; final failure marks
the job failed with the exception text. Routed to the embedding queue
so single-worker setups must consume it.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-29 06:26:48 -04:00
parent 33658fbc8d
commit c485a8560c
4 changed files with 310 additions and 0 deletions

View File

@@ -40,6 +40,23 @@ AWS_S3_REGION_NAME=us-east-1
# Set to True to use local FileSystemStorage instead of S3 (dev/test) # Set to True to use local FileSystemStorage instead of S3 (dev/test)
USE_LOCAL_STORAGE=True USE_LOCAL_STORAGE=True
# --- Daedalus S3 (cross-bucket reads for ingest) ---
# Mnemosyne ingests files from the Daedalus S3 bucket. These vars
# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME
# (Mnemosyne's own bucket) by the ingest Celery task.
DAEDALUS_S3_ENDPOINT_URL=
DAEDALUS_S3_ACCESS_KEY_ID=
DAEDALUS_S3_SECRET_ACCESS_KEY=
DAEDALUS_S3_BUCKET_NAME=daedalus
DAEDALUS_S3_REGION_NAME=us-east-1
DAEDALUS_S3_USE_SSL=False
DAEDALUS_S3_VERIFY=True
# --- MCP Server ---
# Set to False for internal-only deployments where the MCP transport is
# already on a trusted network (10.10.0.0/24).
MCP_REQUIRE_AUTH=True
# --- Email (smtp4dev on Oberon) --- # --- Email (smtp4dev on Oberon) ---
EMAIL_HOST=oberon.incus EMAIL_HOST=oberon.incus
EMAIL_PORT=22025 EMAIL_PORT=22025

View File

@@ -0,0 +1,70 @@
"""
Cross-bucket S3 helper for ingesting files from the Daedalus S3 bucket
into Mnemosyne's own bucket.
Daedalus uploads files to its own bucket (configured per Daedalus deployment)
and posts an ingest request to Mnemosyne with the s3_key. This module fetches
that file using read-only Daedalus credentials and writes it to Mnemosyne's
bucket via the standard `default_storage` backend so the rest of the pipeline
(parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
def _daedalus_s3_client():
"""Build a boto3 S3 client pointed at the Daedalus bucket."""
return boto3.client(
"s3",
endpoint_url=settings.DAEDALUS_S3_ENDPOINT_URL or None,
aws_access_key_id=settings.DAEDALUS_S3_ACCESS_KEY_ID,
aws_secret_access_key=settings.DAEDALUS_S3_SECRET_ACCESS_KEY,
region_name=settings.DAEDALUS_S3_REGION_NAME,
use_ssl=settings.DAEDALUS_S3_USE_SSL,
verify=settings.DAEDALUS_S3_VERIFY,
)
def fetch_from_daedalus(daedalus_s3_key: str) -> bytes:
"""
Read a file from the Daedalus S3 bucket.
:param daedalus_s3_key: Object key in the Daedalus bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
client = _daedalus_s3_client()
bucket = settings.DAEDALUS_S3_BUCKET_NAME
logger.debug(
"Fetching from Daedalus S3 bucket=%s key=%s", bucket, daedalus_s3_key
)
response = client.get_object(Bucket=bucket, Key=daedalus_s3_key)
data = response["Body"].read()
logger.info(
"Fetched from Daedalus S3 bucket=%s key=%s size=%d",
bucket, daedalus_s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info(
"Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data),
)
return saved_key

View File

@@ -10,6 +10,7 @@ import logging
from celery import shared_task from celery import shared_task
from django.core.cache import cache from django.core.cache import cache
from neomodel import db
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -280,3 +281,212 @@ def _resolve_user(user_id: int = None):
return User.objects.get(pk=user_id) return User.objects.get(pk=user_id)
except Exception: except Exception:
return None return None
# ---------------------------------------------------------------------------
# Ingest task (Daedalus integration)
# ---------------------------------------------------------------------------
@shared_task(
name="library.tasks.ingest_from_daedalus",
bind=True,
queue="embedding",
max_retries=3,
default_retry_delay=60,
acks_late=True,
)
def ingest_from_daedalus(self, job_id: str):
"""
Process a single IngestJob: fetch from Daedalus S3 → create Item →
run embedding pipeline → mark complete.
Idempotent on (library_uid, source_ref, content_hash) — handled in the
REST view that creates the IngestJob, so by the time this task runs the
job either represents new content or a content_hash-changed re-ingest.
For a content_hash-changed re-ingest, the prior Item with the same
source_ref is deleted before the new one is processed (ensures no
stale chunks linger).
"""
from datetime import datetime, timezone
from library.models import IngestJob, Item, Library
from library.services.daedalus_s3 import (
copy_into_mnemosyne,
fetch_from_daedalus,
)
from library.services.pipeline import EmbeddingPipeline
logger.info(
"Task ingest_from_daedalus starting job_id=%s task_id=%s",
job_id, self.request.id,
)
try:
job = IngestJob.objects.get(pk=job_id)
except IngestJob.DoesNotExist:
logger.error("IngestJob not found job_id=%s", job_id)
return {"success": False, "error": "job_not_found"}
job.status = "processing"
job.progress = "fetching"
job.started_at = datetime.now(timezone.utc)
job.celery_task_id = self.request.id
job.save(update_fields=["status", "progress", "started_at", "celery_task_id"])
try:
# --- 1. Resolve target Library ---
try:
lib = Library.nodes.get(uid=job.library_uid)
except Library.DoesNotExist:
raise RuntimeError(f"Library not found: {job.library_uid}")
# --- 2. Supersede prior Item with same source_ref but different hash ---
prior_item_uid = None
if job.source_ref:
rows, _ = db.cypher_query(
"""
MATCH (l:Library {uid: $library_uid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)
WHERE i.metadata IS NOT NULL
AND i.metadata CONTAINS $source_ref_marker
RETURN i.uid LIMIT 1
""",
{
"library_uid": lib.uid,
"source_ref_marker": f'"source_ref": "{job.source_ref}"',
},
)
if rows:
prior_item_uid = rows[0][0]
logger.info(
"Superseding prior Item job_id=%s prior_item_uid=%s",
job_id, prior_item_uid,
)
_delete_item_and_chunks(prior_item_uid)
# --- 3. Fetch from Daedalus, copy into Mnemosyne bucket ---
job.progress = "copying"
job.save(update_fields=["progress"])
data = fetch_from_daedalus(job.s3_key)
# --- 4. Create Item node ---
ext = (job.file_type or "bin").lstrip(".").lower() or "bin"
item = Item(
title=job.title,
file_type=ext,
file_size=len(data),
content_hash=job.content_hash,
embedding_status="pending",
metadata={
"source": job.source,
"source_ref": job.source_ref,
},
)
item.save()
mnemosyne_s3_key = f"items/{item.uid}/original.{ext}"
copy_into_mnemosyne(data, mnemosyne_s3_key)
item.s3_key = mnemosyne_s3_key
item.save()
# --- 5. Connect to library/collection ---
col = _resolve_or_create_default_collection(lib, job.collection_uid)
col.items.connect(item)
job.item_uid = item.uid
job.save(update_fields=["item_uid"])
# --- 6. Run the embedding pipeline ---
job.progress = "embedding"
job.save(update_fields=["progress"])
def progress_cb(percent, message):
_update_progress(self, percent, message)
pipeline = EmbeddingPipeline(user=None)
result = pipeline.process_item(item.uid, progress_callback=progress_cb)
# --- 7. Mark complete ---
job.status = "completed"
job.progress = "done"
job.chunks_created = result.get("chunks_created", 0)
job.concepts_extracted = result.get("concepts_extracted", 0)
job.embedding_model = result.get("embedding_model", "")
job.completed_at = datetime.now(timezone.utc)
job.save()
logger.info(
"Task ingest_from_daedalus completed job_id=%s item_uid=%s "
"chunks=%d concepts=%d",
job_id, item.uid, job.chunks_created, job.concepts_extracted,
)
return {
"success": True,
"job_id": job_id,
"item_uid": item.uid,
**result,
}
except Exception as exc:
logger.error(
"Task ingest_from_daedalus failed job_id=%s: %s",
job_id, exc, exc_info=True,
)
if self.request.retries < self.max_retries:
job.retry_count = self.request.retries + 1
job.save(update_fields=["retry_count"])
raise self.retry(exc=exc)
job.status = "failed"
job.error = str(exc)
job.completed_at = datetime.now(timezone.utc)
job.save(update_fields=["status", "error", "completed_at"])
return {"success": False, "job_id": job_id, "error": str(exc)}
def _delete_item_and_chunks(item_uid: str):
"""Delete an Item, its chunks, and its images. Concept GC is workspace-delete only."""
db.cypher_query(
"""
MATCH (i:Item {uid: $uid})
OPTIONAL MATCH (i)-[:HAS_CHUNK]->(c:Chunk)
OPTIONAL MATCH (i)-[:HAS_IMAGE]->(img:Image)
OPTIONAL MATCH (img)-[:HAS_EMBEDDING]->(emb:ImageEmbedding)
DETACH DELETE c, img, emb, i
""",
{"uid": item_uid},
)
def _resolve_or_create_default_collection(lib, collection_uid: str = ""):
"""
Find or create the default Collection for a Library.
Daedalus integration creates one Collection per Library, named "default".
Explicit collection_uid is honored if provided.
"""
from library.models import Collection
if collection_uid:
try:
return Collection.nodes.get(uid=collection_uid)
except Collection.DoesNotExist:
pass
# Look for an existing "default" collection in this library
rows, _ = db.cypher_query(
"MATCH (l:Library {uid: $library_uid})-[:CONTAINS]->(c:Collection {name: 'default'}) "
"RETURN c.uid LIMIT 1",
{"library_uid": lib.uid},
)
if rows:
return Collection.nodes.get(uid=rows[0][0])
col = Collection(name="default", description="Default collection")
col.save()
lib.collections.connect(col)
col.library.connect(lib)
return col

View File

@@ -181,6 +181,18 @@ else:
}, },
} }
# --- Daedalus S3 (cross-bucket reads for ingest) ---
# Mnemosyne ingests files written to Daedalus's S3 bucket. These vars
# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME
# (Mnemosyne's own bucket) by the Celery ingest task before processing.
DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="")
DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="")
DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="")
DAEDALUS_S3_BUCKET_NAME = env("DAEDALUS_S3_BUCKET_NAME", default="daedalus")
DAEDALUS_S3_REGION_NAME = env("DAEDALUS_S3_REGION_NAME", default="us-east-1")
DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False)
DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True)
# --- Celery / RabbitMQ --- # --- Celery / RabbitMQ ---
CELERY_BROKER_URL = env( CELERY_BROKER_URL = env(
"CELERY_BROKER_URL", "CELERY_BROKER_URL",
@@ -196,6 +208,7 @@ CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_TASK_ROUTES = { CELERY_TASK_ROUTES = {
"library.tasks.embed_*": {"queue": "embedding"}, "library.tasks.embed_*": {"queue": "embedding"},
"library.tasks.ingest_*": {"queue": "embedding"},
"library.tasks.batch_*": {"queue": "batch"}, "library.tasks.batch_*": {"queue": "batch"},
} }