2 Commits

Author SHA1 Message Date
4dde063299 fix(web): trust XFF for real client IP and correct port to 23081
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 3m41s
Build & Deploy Docs / build-and-deploy (push) Successful in 1m9s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m29s
- Configure nginx `set_real_ip_from` for RFC1918 ranges and enable
  `real_ip_recursive` so allowlists evaluate the true client IP
  instead of Docker's NAT gateway, preventing public exposure of
  `/metrics` and `/nginx_status`
- Update published port from 23181 to 23081 in docker-compose
2026-06-17 06:58:36 -04:00
ec4f12d601 feat(ingest): source-bucket registry keyed on ingest source
Generalises the Daedalus-only cross-bucket fetch into a registry
(SOURCE_S3_BUCKETS) keyed on the IngestJob `source` field, so new
upstream sources (Spelunker) can ingest from their own buckets. The
ingest task now calls fetch_from_source(job.source, job.s3_key) and
falls back to "daedalus" for blank/unknown sources (backwards compatible).

Adds SPELUNKER_S3_* env vars and worker env scoping. Replaces
daedalus_s3.py with source_s3.py.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 22:30:08 -04:00
8 changed files with 229 additions and 80 deletions

View File

@@ -78,6 +78,19 @@ DAEDALUS_S3_REGION_NAME=us-east-1
DAEDALUS_S3_USE_SSL=True
DAEDALUS_S3_VERIFY=True
# --- Spelunker S3 (cross-bucket reads for ingest, source="spelunker") ---
# Consumed by: worker only
# Spelunker scrapes web/git documents into its own bucket and posts ingest
# requests with source="spelunker". These creds should be scoped read-only
# to the Spelunker bucket in your secret manager.
SPELUNKER_S3_ENDPOINT_URL=https://nyx.helu.ca:8555
SPELUNKER_S3_ACCESS_KEY_ID=
SPELUNKER_S3_SECRET_ACCESS_KEY=
SPELUNKER_S3_BUCKET_NAME=spelunker
SPELUNKER_S3_REGION_NAME=us-east-1
SPELUNKER_S3_USE_SSL=True
SPELUNKER_S3_VERIFY=True
# --- Celery / RabbitMQ (Oberon) ---------------------------------------------
# Consumed by: app (producer), worker (consumer). NOT mcp.
# Remember to percent-encode any password characters that have meaning in a

View File

@@ -339,6 +339,13 @@ services:
- DAEDALUS_S3_REGION_NAME=${DAEDALUS_S3_REGION_NAME}
- DAEDALUS_S3_USE_SSL=${DAEDALUS_S3_USE_SSL}
- DAEDALUS_S3_VERIFY=${DAEDALUS_S3_VERIFY}
- SPELUNKER_S3_ENDPOINT_URL=${SPELUNKER_S3_ENDPOINT_URL}
- SPELUNKER_S3_ACCESS_KEY_ID=${SPELUNKER_S3_ACCESS_KEY_ID}
- SPELUNKER_S3_SECRET_ACCESS_KEY=${SPELUNKER_S3_SECRET_ACCESS_KEY}
- SPELUNKER_S3_BUCKET_NAME=${SPELUNKER_S3_BUCKET_NAME}
- SPELUNKER_S3_REGION_NAME=${SPELUNKER_S3_REGION_NAME}
- SPELUNKER_S3_USE_SSL=${SPELUNKER_S3_USE_SSL}
- SPELUNKER_S3_VERIFY=${SPELUNKER_S3_VERIFY}
# Celery / RabbitMQ
- CELERY_BROKER_URL=${CELERY_BROKER_URL}
- CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND}
@@ -370,7 +377,7 @@ services:
retries: 3
start_period: 60s
# ── Web: nginx reverse proxy, public port 23181 ────────────────────────────
# ── Web: nginx reverse proxy, public port 23081 ────────────────────────────
# No Django env — nginx only knows how to route. Public listener is
# templated into the conf file by Ansible if the port ever needs to change.
web:
@@ -383,7 +390,7 @@ services:
mcp:
condition: service_healthy
ports:
- "23181:80"
- "23081:80"
volumes:
- ./nginx/mnemosyne.conf:/etc/nginx/conf.d/default.conf:ro
- static:/var/www/static:ro

View File

@@ -1,70 +0,0 @@
"""
Cross-bucket S3 helper for ingesting files from the Daedalus S3 bucket
into Mnemosyne's own bucket.
Daedalus uploads files to its own bucket (configured per Daedalus deployment)
and posts an ingest request to Mnemosyne with the s3_key. This module fetches
that file using read-only Daedalus credentials and writes it to Mnemosyne's
bucket via the standard `default_storage` backend so the rest of the pipeline
(parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
def _daedalus_s3_client():
"""Build a boto3 S3 client pointed at the Daedalus bucket."""
return boto3.client(
"s3",
endpoint_url=settings.DAEDALUS_S3_ENDPOINT_URL or None,
aws_access_key_id=settings.DAEDALUS_S3_ACCESS_KEY_ID,
aws_secret_access_key=settings.DAEDALUS_S3_SECRET_ACCESS_KEY,
region_name=settings.DAEDALUS_S3_REGION_NAME,
use_ssl=settings.DAEDALUS_S3_USE_SSL,
verify=settings.DAEDALUS_S3_VERIFY,
)
def fetch_from_daedalus(daedalus_s3_key: str) -> bytes:
"""
Read a file from the Daedalus S3 bucket.
:param daedalus_s3_key: Object key in the Daedalus bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
client = _daedalus_s3_client()
bucket = settings.DAEDALUS_S3_BUCKET_NAME
logger.debug(
"Fetching from Daedalus S3 bucket=%s key=%s", bucket, daedalus_s3_key
)
response = client.get_object(Bucket=bucket, Key=daedalus_s3_key)
data = response["Body"].read()
logger.info(
"Fetched from Daedalus S3 bucket=%s key=%s size=%d",
bucket, daedalus_s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info(
"Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data),
)
return saved_key

View File

@@ -0,0 +1,88 @@
"""
Cross-bucket S3 helper for ingesting files from upstream source buckets
into Mnemosyne's own bucket.
Each upstream system (Daedalus, Spelunker, ...) writes files to its own S3
bucket and posts an ingest request to Mnemosyne carrying a `source`
identifier and the object key. This module looks up that source's read
credentials in settings.SOURCE_S3_BUCKETS, fetches the file, and writes it
into Mnemosyne's bucket via the standard `default_storage` backend so the
rest of the pipeline (parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
DEFAULT_SOURCE = "daedalus"
def _source_config(source: str) -> dict:
"""
Resolve the S3 read config for an ingest source.
Unknown or blank sources fall back to DEFAULT_SOURCE for backwards
compatibility with pre-registry ingest requests.
"""
registry = settings.SOURCE_S3_BUCKETS
key = source if source in registry else DEFAULT_SOURCE
if key not in registry:
raise RuntimeError(
f"No S3 bucket configured for source '{source}' "
f"(and no '{DEFAULT_SOURCE}' fallback)."
)
return registry[key]
def _source_s3_client(config: dict):
"""Build a boto3 S3 client pointed at a source bucket."""
return boto3.client(
"s3",
endpoint_url=config["endpoint_url"] or None,
aws_access_key_id=config["access_key_id"],
aws_secret_access_key=config["secret_access_key"],
region_name=config["region_name"],
use_ssl=config["use_ssl"],
verify=config["verify"],
)
def fetch_from_source(source: str, s3_key: str) -> bytes:
"""
Read a file from an upstream source's S3 bucket.
:param source: Ingest source identifier (e.g. "daedalus", "spelunker").
:param s3_key: Object key in the source bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
config = _source_config(source)
client = _source_s3_client(config)
bucket = config["bucket_name"]
logger.debug("Fetching from source=%s bucket=%s key=%s", source, bucket, s3_key)
response = client.get_object(Bucket=bucket, Key=s3_key)
data = response["Body"].read()
logger.info(
"Fetched from source=%s bucket=%s key=%s size=%d",
source, bucket, s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info("Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data))
return saved_key

View File

@@ -347,9 +347,9 @@ def ingest_from_daedalus(self, job_id: str):
from datetime import datetime, timezone
from library.models import IngestJob, Item, Library
from library.services.daedalus_s3 import (
from library.services.source_s3 import (
copy_into_mnemosyne,
fetch_from_daedalus,
fetch_from_source,
)
from library.services.pipeline import EmbeddingPipeline
@@ -401,11 +401,11 @@ def ingest_from_daedalus(self, job_id: str):
)
_delete_item_and_chunks(prior_item_uid)
# --- 3. Fetch from Daedalus, copy into Mnemosyne bucket ---
# --- 3. Fetch from the source bucket, copy into Mnemosyne bucket ---
job.progress = "copying"
job.save(update_fields=["progress"])
data = fetch_from_daedalus(job.s3_key)
data = fetch_from_source(job.source, job.s3_key)
# --- 4. Create Item node ---
ext = _normalize_file_type(job.file_type)

View File

@@ -0,0 +1,61 @@
"""Tests for the source-bucket registry resolution."""
from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase, override_settings
from library.services import source_s3
_REGISTRY = {
"daedalus": {
"endpoint_url": "https://nyx.test:8555",
"access_key_id": "dae_key",
"secret_access_key": "dae_secret",
"bucket_name": "daedalus",
"region_name": "us-east-1",
"use_ssl": True,
"verify": True,
},
"spelunker": {
"endpoint_url": "https://nyx.test:8555",
"access_key_id": "spel_key",
"secret_access_key": "spel_secret",
"bucket_name": "spelunker",
"region_name": "us-east-1",
"use_ssl": True,
"verify": True,
},
}
@override_settings(SOURCE_S3_BUCKETS=_REGISTRY)
class SourceConfigTest(SimpleTestCase):
def test_known_source(self):
cfg = source_s3._source_config("spelunker")
self.assertEqual(cfg["bucket_name"], "spelunker")
self.assertEqual(cfg["access_key_id"], "spel_key")
def test_unknown_source_falls_back_to_daedalus(self):
cfg = source_s3._source_config("totally-unknown")
self.assertEqual(cfg["bucket_name"], "daedalus")
def test_blank_source_falls_back_to_daedalus(self):
cfg = source_s3._source_config("")
self.assertEqual(cfg["bucket_name"], "daedalus")
@patch("library.services.source_s3.boto3.client")
def test_fetch_uses_source_bucket(self, mock_boto_client):
client = MagicMock()
body = MagicMock()
body.read.return_value = b"file-bytes"
client.get_object.return_value = {"Body": body}
mock_boto_client.return_value = client
data = source_s3.fetch_from_source("spelunker", "3/readme.md")
self.assertEqual(data, b"file-bytes")
client.get_object.assert_called_once_with(
Bucket="spelunker", Key="3/readme.md"
)
# boto client built with the spelunker source credentials
_, kwargs = mock_boto_client.call_args
self.assertEqual(kwargs["aws_access_key_id"], "spel_key")

View File

@@ -241,10 +241,15 @@ else:
},
}
# --- Daedalus S3 (cross-bucket reads for ingest) ---
# Mnemosyne ingests files written to Daedalus's S3 bucket. These vars
# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME
# (Mnemosyne's own bucket) by the Celery ingest task before processing.
# --- Source S3 buckets (cross-bucket reads for ingest) ---
# Mnemosyne ingests files that upstream systems write to their own S3
# buckets. Each ingest request carries a `source` identifier; the Celery
# ingest task looks up that source's read credentials here, fetches the
# file, and copies it into AWS_STORAGE_BUCKET_NAME (Mnemosyne's own bucket)
# before processing. Add a new entry per upstream source.
#
# Backwards compatible: the "daedalus" entry is built from the existing
# DAEDALUS_S3_* env vars.
DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="")
DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="")
DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="")
@@ -253,6 +258,38 @@ DAEDALUS_S3_REGION_NAME = env("DAEDALUS_S3_REGION_NAME", default="us-east-1")
DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False)
DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True)
# Spelunker writes scraped/extracted documents to its own bucket.
SPELUNKER_S3_ENDPOINT_URL = env("SPELUNKER_S3_ENDPOINT_URL", default="")
SPELUNKER_S3_ACCESS_KEY_ID = env("SPELUNKER_S3_ACCESS_KEY_ID", default="")
SPELUNKER_S3_SECRET_ACCESS_KEY = env("SPELUNKER_S3_SECRET_ACCESS_KEY", default="")
SPELUNKER_S3_BUCKET_NAME = env("SPELUNKER_S3_BUCKET_NAME", default="spelunker")
SPELUNKER_S3_REGION_NAME = env("SPELUNKER_S3_REGION_NAME", default="us-east-1")
SPELUNKER_S3_USE_SSL = env.bool("SPELUNKER_S3_USE_SSL", default=False)
SPELUNKER_S3_VERIFY = env.bool("SPELUNKER_S3_VERIFY", default=True)
# Registry keyed by the ingest `source` field. Unknown/blank sources fall
# back to "daedalus" for backwards compatibility.
SOURCE_S3_BUCKETS = {
"daedalus": {
"endpoint_url": DAEDALUS_S3_ENDPOINT_URL,
"access_key_id": DAEDALUS_S3_ACCESS_KEY_ID,
"secret_access_key": DAEDALUS_S3_SECRET_ACCESS_KEY,
"bucket_name": DAEDALUS_S3_BUCKET_NAME,
"region_name": DAEDALUS_S3_REGION_NAME,
"use_ssl": DAEDALUS_S3_USE_SSL,
"verify": DAEDALUS_S3_VERIFY,
},
"spelunker": {
"endpoint_url": SPELUNKER_S3_ENDPOINT_URL,
"access_key_id": SPELUNKER_S3_ACCESS_KEY_ID,
"secret_access_key": SPELUNKER_S3_SECRET_ACCESS_KEY,
"bucket_name": SPELUNKER_S3_BUCKET_NAME,
"region_name": SPELUNKER_S3_REGION_NAME,
"use_ssl": SPELUNKER_S3_USE_SSL,
"verify": SPELUNKER_S3_VERIFY,
},
}
# --- Celery / RabbitMQ ---
CELERY_BROKER_URL = env(
"CELERY_BROKER_URL",

View File

@@ -32,6 +32,19 @@
# resolution at startup and returns 502 after `docker compose restart app`.
resolver 127.0.0.11 valid=10s;
# Recover the real client IP from X-Forwarded-For (set by HAProxy on Titania)
# before evaluating the RFC1918 allowlists below. nginx runs as a sidecar with
# a published port, so every proxied request arrives via Docker's NAT gateway
# (an RFC1918 address) — without this, the allowlists match that gateway and
# pass ALL external traffic, exposing /metrics and /nginx_status publicly.
# HAProxy's own health checks (e.g. to /healthz) carry no XFF and keep their
# real 10.10.x.x source, so they stay allowed.
set_real_ip_from 10.0.0.0/8;
set_real_ip_from 172.16.0.0/12;
set_real_ip_from 192.168.0.0/16;
real_ip_header X-Forwarded-For;
real_ip_recursive on;
# Preserve X-Forwarded-Proto from the upstream reverse proxy (HAProxy TLS
# termination on Titania); fall back to $scheme only if there's no upstream
# header. Inside the compose network $scheme is always `http` because HAProxy