From ec4f12d601173c6c534cd806b6ecad59874b249b Mon Sep 17 00:00:00 2001 From: Robert Helewka Date: Thu, 11 Jun 2026 22:19:25 -0400 Subject: [PATCH] feat(ingest): source-bucket registry keyed on ingest source Generalises the Daedalus-only cross-bucket fetch into a registry (SOURCE_S3_BUCKETS) keyed on the IngestJob `source` field, so new upstream sources (Spelunker) can ingest from their own buckets. The ingest task now calls fetch_from_source(job.source, job.s3_key) and falls back to "daedalus" for blank/unknown sources (backwards compatible). Adds SPELUNKER_S3_* env vars and worker env scoping. Replaces daedalus_s3.py with source_s3.py. Co-Authored-By: Claude Fable 5 --- .env.example | 13 ++++ docker-compose.yaml | 7 ++ mnemosyne/library/services/daedalus_s3.py | 70 ------------------ mnemosyne/library/services/source_s3.py | 88 +++++++++++++++++++++++ mnemosyne/library/tasks.py | 8 +-- mnemosyne/library/tests/test_source_s3.py | 61 ++++++++++++++++ mnemosyne/mnemosyne/settings.py | 45 ++++++++++-- 7 files changed, 214 insertions(+), 78 deletions(-) delete mode 100644 mnemosyne/library/services/daedalus_s3.py create mode 100644 mnemosyne/library/services/source_s3.py create mode 100644 mnemosyne/library/tests/test_source_s3.py diff --git a/.env.example b/.env.example index ee25cbf..07f4e30 100644 --- a/.env.example +++ b/.env.example @@ -78,6 +78,19 @@ DAEDALUS_S3_REGION_NAME=us-east-1 DAEDALUS_S3_USE_SSL=True DAEDALUS_S3_VERIFY=True +# --- Spelunker S3 (cross-bucket reads for ingest, source="spelunker") --- +# Consumed by: worker only +# Spelunker scrapes web/git documents into its own bucket and posts ingest +# requests with source="spelunker". These creds should be scoped read-only +# to the Spelunker bucket in your secret manager. +SPELUNKER_S3_ENDPOINT_URL=https://nyx.helu.ca:8555 +SPELUNKER_S3_ACCESS_KEY_ID= +SPELUNKER_S3_SECRET_ACCESS_KEY= +SPELUNKER_S3_BUCKET_NAME=spelunker +SPELUNKER_S3_REGION_NAME=us-east-1 +SPELUNKER_S3_USE_SSL=True +SPELUNKER_S3_VERIFY=True + # --- Celery / RabbitMQ (Oberon) --------------------------------------------- # Consumed by: app (producer), worker (consumer). NOT mcp. # Remember to percent-encode any password characters that have meaning in a diff --git a/docker-compose.yaml b/docker-compose.yaml index e865179..00b5f93 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -339,6 +339,13 @@ services: - DAEDALUS_S3_REGION_NAME=${DAEDALUS_S3_REGION_NAME} - DAEDALUS_S3_USE_SSL=${DAEDALUS_S3_USE_SSL} - DAEDALUS_S3_VERIFY=${DAEDALUS_S3_VERIFY} + - SPELUNKER_S3_ENDPOINT_URL=${SPELUNKER_S3_ENDPOINT_URL} + - SPELUNKER_S3_ACCESS_KEY_ID=${SPELUNKER_S3_ACCESS_KEY_ID} + - SPELUNKER_S3_SECRET_ACCESS_KEY=${SPELUNKER_S3_SECRET_ACCESS_KEY} + - SPELUNKER_S3_BUCKET_NAME=${SPELUNKER_S3_BUCKET_NAME} + - SPELUNKER_S3_REGION_NAME=${SPELUNKER_S3_REGION_NAME} + - SPELUNKER_S3_USE_SSL=${SPELUNKER_S3_USE_SSL} + - SPELUNKER_S3_VERIFY=${SPELUNKER_S3_VERIFY} # Celery / RabbitMQ - CELERY_BROKER_URL=${CELERY_BROKER_URL} - CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND} diff --git a/mnemosyne/library/services/daedalus_s3.py b/mnemosyne/library/services/daedalus_s3.py deleted file mode 100644 index d75636c..0000000 --- a/mnemosyne/library/services/daedalus_s3.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -Cross-bucket S3 helper for ingesting files from the Daedalus S3 bucket -into Mnemosyne's own bucket. - -Daedalus uploads files to its own bucket (configured per Daedalus deployment) -and posts an ingest request to Mnemosyne with the s3_key. This module fetches -that file using read-only Daedalus credentials and writes it to Mnemosyne's -bucket via the standard `default_storage` backend so the rest of the pipeline -(parsing, chunking, embedding) works unchanged. -""" - -import logging - -import boto3 -from django.conf import settings -from django.core.files.base import ContentFile -from django.core.files.storage import default_storage - -logger = logging.getLogger(__name__) - - -def _daedalus_s3_client(): - """Build a boto3 S3 client pointed at the Daedalus bucket.""" - return boto3.client( - "s3", - endpoint_url=settings.DAEDALUS_S3_ENDPOINT_URL or None, - aws_access_key_id=settings.DAEDALUS_S3_ACCESS_KEY_ID, - aws_secret_access_key=settings.DAEDALUS_S3_SECRET_ACCESS_KEY, - region_name=settings.DAEDALUS_S3_REGION_NAME, - use_ssl=settings.DAEDALUS_S3_USE_SSL, - verify=settings.DAEDALUS_S3_VERIFY, - ) - - -def fetch_from_daedalus(daedalus_s3_key: str) -> bytes: - """ - Read a file from the Daedalus S3 bucket. - - :param daedalus_s3_key: Object key in the Daedalus bucket. - :returns: File bytes. - :raises: botocore exceptions on failure (caller decides retry). - """ - client = _daedalus_s3_client() - bucket = settings.DAEDALUS_S3_BUCKET_NAME - logger.debug( - "Fetching from Daedalus S3 bucket=%s key=%s", bucket, daedalus_s3_key - ) - response = client.get_object(Bucket=bucket, Key=daedalus_s3_key) - data = response["Body"].read() - logger.info( - "Fetched from Daedalus S3 bucket=%s key=%s size=%d", - bucket, daedalus_s3_key, len(data), - ) - return data - - -def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str: - """ - Write bytes into Mnemosyne's S3 bucket via the default_storage backend. - - :param data: File bytes (already in memory). - :param mnemosyne_s3_key: Target object key in Mnemosyne's bucket. - :returns: The actual key written (may differ from requested if - `file_overwrite=False` and the key existed). - """ - saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data)) - logger.info( - "Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data), - ) - return saved_key diff --git a/mnemosyne/library/services/source_s3.py b/mnemosyne/library/services/source_s3.py new file mode 100644 index 0000000..f59c1e1 --- /dev/null +++ b/mnemosyne/library/services/source_s3.py @@ -0,0 +1,88 @@ +""" +Cross-bucket S3 helper for ingesting files from upstream source buckets +into Mnemosyne's own bucket. + +Each upstream system (Daedalus, Spelunker, ...) writes files to its own S3 +bucket and posts an ingest request to Mnemosyne carrying a `source` +identifier and the object key. This module looks up that source's read +credentials in settings.SOURCE_S3_BUCKETS, fetches the file, and writes it +into Mnemosyne's bucket via the standard `default_storage` backend so the +rest of the pipeline (parsing, chunking, embedding) works unchanged. +""" + +import logging + +import boto3 +from django.conf import settings +from django.core.files.base import ContentFile +from django.core.files.storage import default_storage + +logger = logging.getLogger(__name__) + +DEFAULT_SOURCE = "daedalus" + + +def _source_config(source: str) -> dict: + """ + Resolve the S3 read config for an ingest source. + + Unknown or blank sources fall back to DEFAULT_SOURCE for backwards + compatibility with pre-registry ingest requests. + """ + registry = settings.SOURCE_S3_BUCKETS + key = source if source in registry else DEFAULT_SOURCE + if key not in registry: + raise RuntimeError( + f"No S3 bucket configured for source '{source}' " + f"(and no '{DEFAULT_SOURCE}' fallback)." + ) + return registry[key] + + +def _source_s3_client(config: dict): + """Build a boto3 S3 client pointed at a source bucket.""" + return boto3.client( + "s3", + endpoint_url=config["endpoint_url"] or None, + aws_access_key_id=config["access_key_id"], + aws_secret_access_key=config["secret_access_key"], + region_name=config["region_name"], + use_ssl=config["use_ssl"], + verify=config["verify"], + ) + + +def fetch_from_source(source: str, s3_key: str) -> bytes: + """ + Read a file from an upstream source's S3 bucket. + + :param source: Ingest source identifier (e.g. "daedalus", "spelunker"). + :param s3_key: Object key in the source bucket. + :returns: File bytes. + :raises: botocore exceptions on failure (caller decides retry). + """ + config = _source_config(source) + client = _source_s3_client(config) + bucket = config["bucket_name"] + logger.debug("Fetching from source=%s bucket=%s key=%s", source, bucket, s3_key) + response = client.get_object(Bucket=bucket, Key=s3_key) + data = response["Body"].read() + logger.info( + "Fetched from source=%s bucket=%s key=%s size=%d", + source, bucket, s3_key, len(data), + ) + return data + + +def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str: + """ + Write bytes into Mnemosyne's S3 bucket via the default_storage backend. + + :param data: File bytes (already in memory). + :param mnemosyne_s3_key: Target object key in Mnemosyne's bucket. + :returns: The actual key written (may differ from requested if + `file_overwrite=False` and the key existed). + """ + saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data)) + logger.info("Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data)) + return saved_key diff --git a/mnemosyne/library/tasks.py b/mnemosyne/library/tasks.py index 1290211..bf55e03 100644 --- a/mnemosyne/library/tasks.py +++ b/mnemosyne/library/tasks.py @@ -347,9 +347,9 @@ def ingest_from_daedalus(self, job_id: str): from datetime import datetime, timezone from library.models import IngestJob, Item, Library - from library.services.daedalus_s3 import ( + from library.services.source_s3 import ( copy_into_mnemosyne, - fetch_from_daedalus, + fetch_from_source, ) from library.services.pipeline import EmbeddingPipeline @@ -401,11 +401,11 @@ def ingest_from_daedalus(self, job_id: str): ) _delete_item_and_chunks(prior_item_uid) - # --- 3. Fetch from Daedalus, copy into Mnemosyne bucket --- + # --- 3. Fetch from the source bucket, copy into Mnemosyne bucket --- job.progress = "copying" job.save(update_fields=["progress"]) - data = fetch_from_daedalus(job.s3_key) + data = fetch_from_source(job.source, job.s3_key) # --- 4. Create Item node --- ext = _normalize_file_type(job.file_type) diff --git a/mnemosyne/library/tests/test_source_s3.py b/mnemosyne/library/tests/test_source_s3.py new file mode 100644 index 0000000..d319e51 --- /dev/null +++ b/mnemosyne/library/tests/test_source_s3.py @@ -0,0 +1,61 @@ +"""Tests for the source-bucket registry resolution.""" +from unittest.mock import MagicMock, patch + +from django.test import SimpleTestCase, override_settings + +from library.services import source_s3 + +_REGISTRY = { + "daedalus": { + "endpoint_url": "https://nyx.test:8555", + "access_key_id": "dae_key", + "secret_access_key": "dae_secret", + "bucket_name": "daedalus", + "region_name": "us-east-1", + "use_ssl": True, + "verify": True, + }, + "spelunker": { + "endpoint_url": "https://nyx.test:8555", + "access_key_id": "spel_key", + "secret_access_key": "spel_secret", + "bucket_name": "spelunker", + "region_name": "us-east-1", + "use_ssl": True, + "verify": True, + }, +} + + +@override_settings(SOURCE_S3_BUCKETS=_REGISTRY) +class SourceConfigTest(SimpleTestCase): + def test_known_source(self): + cfg = source_s3._source_config("spelunker") + self.assertEqual(cfg["bucket_name"], "spelunker") + self.assertEqual(cfg["access_key_id"], "spel_key") + + def test_unknown_source_falls_back_to_daedalus(self): + cfg = source_s3._source_config("totally-unknown") + self.assertEqual(cfg["bucket_name"], "daedalus") + + def test_blank_source_falls_back_to_daedalus(self): + cfg = source_s3._source_config("") + self.assertEqual(cfg["bucket_name"], "daedalus") + + @patch("library.services.source_s3.boto3.client") + def test_fetch_uses_source_bucket(self, mock_boto_client): + client = MagicMock() + body = MagicMock() + body.read.return_value = b"file-bytes" + client.get_object.return_value = {"Body": body} + mock_boto_client.return_value = client + + data = source_s3.fetch_from_source("spelunker", "3/readme.md") + + self.assertEqual(data, b"file-bytes") + client.get_object.assert_called_once_with( + Bucket="spelunker", Key="3/readme.md" + ) + # boto client built with the spelunker source credentials + _, kwargs = mock_boto_client.call_args + self.assertEqual(kwargs["aws_access_key_id"], "spel_key") diff --git a/mnemosyne/mnemosyne/settings.py b/mnemosyne/mnemosyne/settings.py index 83d0f92..1847725 100644 --- a/mnemosyne/mnemosyne/settings.py +++ b/mnemosyne/mnemosyne/settings.py @@ -241,10 +241,15 @@ else: }, } -# --- Daedalus S3 (cross-bucket reads for ingest) --- -# Mnemosyne ingests files written to Daedalus's S3 bucket. These vars -# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME -# (Mnemosyne's own bucket) by the Celery ingest task before processing. +# --- Source S3 buckets (cross-bucket reads for ingest) --- +# Mnemosyne ingests files that upstream systems write to their own S3 +# buckets. Each ingest request carries a `source` identifier; the Celery +# ingest task looks up that source's read credentials here, fetches the +# file, and copies it into AWS_STORAGE_BUCKET_NAME (Mnemosyne's own bucket) +# before processing. Add a new entry per upstream source. +# +# Backwards compatible: the "daedalus" entry is built from the existing +# DAEDALUS_S3_* env vars. DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="") DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="") DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="") @@ -253,6 +258,38 @@ DAEDALUS_S3_REGION_NAME = env("DAEDALUS_S3_REGION_NAME", default="us-east-1") DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False) DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True) +# Spelunker writes scraped/extracted documents to its own bucket. +SPELUNKER_S3_ENDPOINT_URL = env("SPELUNKER_S3_ENDPOINT_URL", default="") +SPELUNKER_S3_ACCESS_KEY_ID = env("SPELUNKER_S3_ACCESS_KEY_ID", default="") +SPELUNKER_S3_SECRET_ACCESS_KEY = env("SPELUNKER_S3_SECRET_ACCESS_KEY", default="") +SPELUNKER_S3_BUCKET_NAME = env("SPELUNKER_S3_BUCKET_NAME", default="spelunker") +SPELUNKER_S3_REGION_NAME = env("SPELUNKER_S3_REGION_NAME", default="us-east-1") +SPELUNKER_S3_USE_SSL = env.bool("SPELUNKER_S3_USE_SSL", default=False) +SPELUNKER_S3_VERIFY = env.bool("SPELUNKER_S3_VERIFY", default=True) + +# Registry keyed by the ingest `source` field. Unknown/blank sources fall +# back to "daedalus" for backwards compatibility. +SOURCE_S3_BUCKETS = { + "daedalus": { + "endpoint_url": DAEDALUS_S3_ENDPOINT_URL, + "access_key_id": DAEDALUS_S3_ACCESS_KEY_ID, + "secret_access_key": DAEDALUS_S3_SECRET_ACCESS_KEY, + "bucket_name": DAEDALUS_S3_BUCKET_NAME, + "region_name": DAEDALUS_S3_REGION_NAME, + "use_ssl": DAEDALUS_S3_USE_SSL, + "verify": DAEDALUS_S3_VERIFY, + }, + "spelunker": { + "endpoint_url": SPELUNKER_S3_ENDPOINT_URL, + "access_key_id": SPELUNKER_S3_ACCESS_KEY_ID, + "secret_access_key": SPELUNKER_S3_SECRET_ACCESS_KEY, + "bucket_name": SPELUNKER_S3_BUCKET_NAME, + "region_name": SPELUNKER_S3_REGION_NAME, + "use_ssl": SPELUNKER_S3_USE_SSL, + "verify": SPELUNKER_S3_VERIFY, + }, +} + # --- Celery / RabbitMQ --- CELERY_BROKER_URL = env( "CELERY_BROKER_URL",