Files
mnemosyne/mnemosyne/library/services/source_s3.py
Robert Helewka ec4f12d601 feat(ingest): source-bucket registry keyed on ingest source
Generalises the Daedalus-only cross-bucket fetch into a registry
(SOURCE_S3_BUCKETS) keyed on the IngestJob `source` field, so new
upstream sources (Spelunker) can ingest from their own buckets. The
ingest task now calls fetch_from_source(job.source, job.s3_key) and
falls back to "daedalus" for blank/unknown sources (backwards compatible).

Adds SPELUNKER_S3_* env vars and worker env scoping. Replaces
daedalus_s3.py with source_s3.py.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 22:30:08 -04:00

89 lines
3.0 KiB
Python

"""
Cross-bucket S3 helper for ingesting files from upstream source buckets
into Mnemosyne's own bucket.
Each upstream system (Daedalus, Spelunker, ...) writes files to its own S3
bucket and posts an ingest request to Mnemosyne carrying a `source`
identifier and the object key. This module looks up that source's read
credentials in settings.SOURCE_S3_BUCKETS, fetches the file, and writes it
into Mnemosyne's bucket via the standard `default_storage` backend so the
rest of the pipeline (parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
DEFAULT_SOURCE = "daedalus"
def _source_config(source: str) -> dict:
"""
Resolve the S3 read config for an ingest source.
Unknown or blank sources fall back to DEFAULT_SOURCE for backwards
compatibility with pre-registry ingest requests.
"""
registry = settings.SOURCE_S3_BUCKETS
key = source if source in registry else DEFAULT_SOURCE
if key not in registry:
raise RuntimeError(
f"No S3 bucket configured for source '{source}' "
f"(and no '{DEFAULT_SOURCE}' fallback)."
)
return registry[key]
def _source_s3_client(config: dict):
"""Build a boto3 S3 client pointed at a source bucket."""
return boto3.client(
"s3",
endpoint_url=config["endpoint_url"] or None,
aws_access_key_id=config["access_key_id"],
aws_secret_access_key=config["secret_access_key"],
region_name=config["region_name"],
use_ssl=config["use_ssl"],
verify=config["verify"],
)
def fetch_from_source(source: str, s3_key: str) -> bytes:
"""
Read a file from an upstream source's S3 bucket.
:param source: Ingest source identifier (e.g. "daedalus", "spelunker").
:param s3_key: Object key in the source bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
config = _source_config(source)
client = _source_s3_client(config)
bucket = config["bucket_name"]
logger.debug("Fetching from source=%s bucket=%s key=%s", source, bucket, s3_key)
response = client.get_object(Bucket=bucket, Key=s3_key)
data = response["Body"].read()
logger.info(
"Fetched from source=%s bucket=%s key=%s size=%d",
source, bucket, s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info("Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data))
return saved_key