4 Commits

Author SHA1 Message Date
539d9b6c34 fix(tests): repair stale mock.patch targets after service refactors
All checks were successful
CVE Scan & Docker Build / security-scan (pull_request) Successful in 5m24s
CVE Scan & Docker Build / build-and-push (pull_request) Successful in 2m58s
Several library tests patched symbols at import paths that no longer
expose them, so they errored (AttributeError) instead of testing anything
— giving false confidence. The underlying code is correct; only the test
patch targets were stale after earlier refactors moved imports
function-local.

- test_pipeline: patch source modules (library.models.Item,
  llm_manager.models.LLMModel, library.services.parsers.DocumentParser,
  .chunker.ContentTypeChunker, .embedding_client.EmbeddingClient,
  .vision.VisionAnalyzer, .concepts.ConceptExtractor) since pipeline.py
  imports them inside methods. default_storage stays (still module-level).
- test_search_api: patch library.services.search.SearchService (the view
  imports it function-local).
- test_tasks: patch library.services.pipeline.EmbeddingPipeline (tasks.py
  imports it function-local).
- test_search_views_admin_scope: patch library.utils.neo4j_available; the
  guard moved to utils when views._all_library_uids became a thin alias.
- test_concepts: remove SampleIndexSelectionTests — _select_sample_indices
  was deleted in the document-level concept-extraction refactor (dead test).

Not addressed here: SearchAPIAuthTest / SearchAPIValidationTest return 302
instead of 401/400. Static analysis ruled out routing, middleware, and DRF
config; reproducing needs a running server (DB-backed). Flagged for sandbox
diagnosis — not a stale-patch issue.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 20:12:46 -04:00
a90c6e7479 feat(metrics): add scrape-time system model health collector
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 3m49s
Build & Deploy Docs / build-and-deploy (push) Successful in 1m9s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m32s
Add a Prometheus custom collector that probes the four system-default
models (chat, vision, embedding, reranker) at /metrics scrape time and
emits up/down, configured, and probe-latency gauges. This complements
the ingest-pipeline counters in the Celery worker, which only move
during active ingests and cannot signal model outages on an idle queue.

- New `library/health_collector.py` registers a custom collector with
  a 55s in-process cache to avoid hammering GPU endpoints on rapid
  scrapes or across multiple gunicorn workers.
- New `library/services/model_health.py` centralises the probe logic,
  resolving system-default models via SystemSettings and dispatching
  to chat/embedding/rerank endpoints with a short timeout.
- Register the collector only in the web process (gunicorn/runserver)
  via `LibraryConfig.ready`, excluding Celery, pytest, and management
  commands to prevent duplicate registration and stray probes.
- Add unit tests covering the collector cache, metric shape, and
  per-role probe dispatch.
2026-06-17 09:06:11 -04:00
4dde063299 fix(web): trust XFF for real client IP and correct port to 23081
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 3m41s
Build & Deploy Docs / build-and-deploy (push) Successful in 1m9s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m29s
- Configure nginx `set_real_ip_from` for RFC1918 ranges and enable
  `real_ip_recursive` so allowlists evaluate the true client IP
  instead of Docker's NAT gateway, preventing public exposure of
  `/metrics` and `/nginx_status`
- Update published port from 23181 to 23081 in docker-compose
2026-06-17 06:58:36 -04:00
ec4f12d601 feat(ingest): source-bucket registry keyed on ingest source
Generalises the Daedalus-only cross-bucket fetch into a registry
(SOURCE_S3_BUCKETS) keyed on the IngestJob `source` field, so new
upstream sources (Spelunker) can ingest from their own buckets. The
ingest task now calls fetch_from_source(job.source, job.s3_key) and
falls back to "daedalus" for blank/unknown sources (backwards compatible).

Adds SPELUNKER_S3_* env vars and worker env scoping. Replaces
daedalus_s3.py with source_s3.py.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 22:30:08 -04:00
19 changed files with 538 additions and 140 deletions

View File

@@ -78,6 +78,19 @@ DAEDALUS_S3_REGION_NAME=us-east-1
DAEDALUS_S3_USE_SSL=True
DAEDALUS_S3_VERIFY=True
# --- Spelunker S3 (cross-bucket reads for ingest, source="spelunker") ---
# Consumed by: worker only
# Spelunker scrapes web/git documents into its own bucket and posts ingest
# requests with source="spelunker". These creds should be scoped read-only
# to the Spelunker bucket in your secret manager.
SPELUNKER_S3_ENDPOINT_URL=https://nyx.helu.ca:8555
SPELUNKER_S3_ACCESS_KEY_ID=
SPELUNKER_S3_SECRET_ACCESS_KEY=
SPELUNKER_S3_BUCKET_NAME=spelunker
SPELUNKER_S3_REGION_NAME=us-east-1
SPELUNKER_S3_USE_SSL=True
SPELUNKER_S3_VERIFY=True
# --- Celery / RabbitMQ (Oberon) ---------------------------------------------
# Consumed by: app (producer), worker (consumer). NOT mcp.
# Remember to percent-encode any password characters that have meaning in a

View File

@@ -339,6 +339,13 @@ services:
- DAEDALUS_S3_REGION_NAME=${DAEDALUS_S3_REGION_NAME}
- DAEDALUS_S3_USE_SSL=${DAEDALUS_S3_USE_SSL}
- DAEDALUS_S3_VERIFY=${DAEDALUS_S3_VERIFY}
- SPELUNKER_S3_ENDPOINT_URL=${SPELUNKER_S3_ENDPOINT_URL}
- SPELUNKER_S3_ACCESS_KEY_ID=${SPELUNKER_S3_ACCESS_KEY_ID}
- SPELUNKER_S3_SECRET_ACCESS_KEY=${SPELUNKER_S3_SECRET_ACCESS_KEY}
- SPELUNKER_S3_BUCKET_NAME=${SPELUNKER_S3_BUCKET_NAME}
- SPELUNKER_S3_REGION_NAME=${SPELUNKER_S3_REGION_NAME}
- SPELUNKER_S3_USE_SSL=${SPELUNKER_S3_USE_SSL}
- SPELUNKER_S3_VERIFY=${SPELUNKER_S3_VERIFY}
# Celery / RabbitMQ
- CELERY_BROKER_URL=${CELERY_BROKER_URL}
- CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND}
@@ -370,7 +377,7 @@ services:
retries: 3
start_period: 60s
# ── Web: nginx reverse proxy, public port 23181 ────────────────────────────
# ── Web: nginx reverse proxy, public port 23081 ────────────────────────────
# No Django env — nginx only knows how to route. Public listener is
# templated into the conf file by Ansible if the port ever needs to change.
web:
@@ -383,7 +390,7 @@ services:
mcp:
condition: service_healthy
ports:
- "23181:80"
- "23081:80"
volumes:
- ./nginx/mnemosyne.conf:/etc/nginx/conf.d/default.conf:ro
- static:/var/www/static:ro

View File

@@ -88,6 +88,29 @@ def _should_skip_probe() -> bool:
return False
def _is_web_process() -> bool:
"""
True when running inside the web (gunicorn / runserver) process.
The reachability collector must only register here: ``/metrics`` is served
by the web process, and registering in the Celery worker would both probe
the GPU endpoints from a process whose metrics nobody scrapes and risk
duplicate registration. Celery launches via ``celery`` argv; management
commands are excluded above.
"""
argv0 = sys.argv[0]
if "celery" in argv0 or (len(sys.argv) >= 2 and sys.argv[1] == "celery"):
return False
if "pytest" in argv0 or "PYTEST_CURRENT_TEST" in os.environ:
return False
# gunicorn (prod) or runserver (dev).
if "gunicorn" in argv0:
return True
if len(sys.argv) >= 2 and sys.argv[1] == "runserver":
return True
return False
def _run_startup_probe():
"""
Emit ERROR/WARNING logs if the stack is misconfigured for search.
@@ -199,4 +222,7 @@ class LibraryConfig(AppConfig):
verbose_name = "Library"
def ready(self):
pass
if _is_web_process():
from library.health_collector import register
register()

View File

@@ -0,0 +1,99 @@
"""
Scrape-time Prometheus collector for system-default model reachability.
The ingest-pipeline counters in ``library/metrics.py`` live in the Celery
worker process and only move during an active ingest, so they cannot signal
"models down" on an idle queue. This collector runs in the **web** process
(where ``/metrics`` is served by ``django_prometheus``) and probes the four
system-default models at scrape time, emitting an up/down gauge that is
present regardless of queue activity.
Probe results are cached for a short TTL so rapid scrapes — or multiple
gunicorn workers each scraped in turn — cannot hammer the GPU endpoints.
"""
import logging
import threading
import time
from prometheus_client.core import GaugeMetricFamily
from library.services.model_health import probe_system_models
logger = logging.getLogger(__name__)
# Cache probe results so repeated scrapes don't re-probe the router. The
# value is comfortably above a 15s scrape_interval but bounded so a recovered
# model shows green within a minute.
_CACHE_TTL_SECONDS = 55
_lock = threading.Lock()
_cache: dict = {"ts": 0.0, "results": None}
def _cached_probe() -> list[dict]:
"""Return probe results, re-probing only when the cache has expired."""
now = time.monotonic()
with _lock:
if _cache["results"] is not None and (now - _cache["ts"]) < _CACHE_TTL_SECONDS:
return _cache["results"]
try:
results = probe_system_models()
except Exception as exc: # never let a probe failure break /metrics
logger.warning("Model health probe failed during scrape: %s", exc)
# Serve the stale cache if we have one; otherwise report nothing.
return _cache["results"] or []
_cache["ts"] = now
_cache["results"] = results
return results
class SystemModelHealthCollector:
"""prometheus_client custom collector for system-default model health."""
def collect(self):
results = _cached_probe()
up = GaugeMetricFamily(
"mnemosyne_system_default_model_up",
"System-default model endpoint reachable (1) or not (0)",
labels=["role", "model", "api"],
)
configured = GaugeMetricFamily(
"mnemosyne_system_default_model_configured",
"A system-default model is configured for this role (1) or not (0)",
labels=["role"],
)
latency = GaugeMetricFamily(
"mnemosyne_system_default_model_probe_latency_seconds",
"Latency of the last reachability probe for this role",
labels=["role"],
)
for r in results:
role = r["role"]
configured.add_metric([role], 1 if r["configured"] else 0)
if not r["configured"]:
continue
up.add_metric(
[role, r["model_name"] or "", r["api_name"] or ""],
1 if r["ok"] else 0,
)
if r["latency_ms"] is not None:
latency.add_metric([role], r["latency_ms"] / 1000.0)
yield configured
yield up
yield latency
def register():
"""Register the collector against the default registry (idempotent)."""
from prometheus_client import REGISTRY
# Guard against duplicate registration (autoreload, repeated ready()).
for collector in list(getattr(REGISTRY, "_collector_to_names", {})):
if isinstance(collector, SystemModelHealthCollector):
return
REGISTRY.register(SystemModelHealthCollector())
logger.info("Registered SystemModelHealthCollector on Prometheus default registry")

View File

@@ -1,70 +0,0 @@
"""
Cross-bucket S3 helper for ingesting files from the Daedalus S3 bucket
into Mnemosyne's own bucket.
Daedalus uploads files to its own bucket (configured per Daedalus deployment)
and posts an ingest request to Mnemosyne with the s3_key. This module fetches
that file using read-only Daedalus credentials and writes it to Mnemosyne's
bucket via the standard `default_storage` backend so the rest of the pipeline
(parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
def _daedalus_s3_client():
"""Build a boto3 S3 client pointed at the Daedalus bucket."""
return boto3.client(
"s3",
endpoint_url=settings.DAEDALUS_S3_ENDPOINT_URL or None,
aws_access_key_id=settings.DAEDALUS_S3_ACCESS_KEY_ID,
aws_secret_access_key=settings.DAEDALUS_S3_SECRET_ACCESS_KEY,
region_name=settings.DAEDALUS_S3_REGION_NAME,
use_ssl=settings.DAEDALUS_S3_USE_SSL,
verify=settings.DAEDALUS_S3_VERIFY,
)
def fetch_from_daedalus(daedalus_s3_key: str) -> bytes:
"""
Read a file from the Daedalus S3 bucket.
:param daedalus_s3_key: Object key in the Daedalus bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
client = _daedalus_s3_client()
bucket = settings.DAEDALUS_S3_BUCKET_NAME
logger.debug(
"Fetching from Daedalus S3 bucket=%s key=%s", bucket, daedalus_s3_key
)
response = client.get_object(Bucket=bucket, Key=daedalus_s3_key)
data = response["Body"].read()
logger.info(
"Fetched from Daedalus S3 bucket=%s key=%s size=%d",
bucket, daedalus_s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info(
"Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data),
)
return saved_key

View File

@@ -0,0 +1,119 @@
"""
System-default model reachability probes.
Provides a cheap, bounded liveness check for the four system-default models
(embedding, chat, vision, reranker) so the embedding dashboard and the
scrape-time Prometheus collector can surface "model not responding" without
running an ingest.
The probe deliberately hits ``GET {base_url}/models`` as its primary check:
on an OpenAI-compatible router (e.g. the llama-router) this answers instantly
without loading a model, so repeated probes never burn GPU time. This mirrors
the GPU-avoidance principle in ``mcp_server/tools/health.py``.
"""
import logging
import time
from typing import Optional
import requests
logger = logging.getLogger(__name__)
# api_type values whose endpoints expose an OpenAI-compatible ``/models`` list.
_OPENAI_COMPATIBLE = {"openai", "azure", "ollama", "llama-cpp", "vllm"}
# (role, getter method name) pairs — order is the dashboard/metrics order.
ROLE_GETTERS = [
("embedding", "get_system_embedding_model"),
("chat", "get_system_chat_model"),
("vision", "get_system_vision_model"),
("reranker", "get_system_reranker_model"),
]
def probe_api(api, timeout: int = 5) -> tuple[bool, str]:
"""Check whether an ``LLMApi`` endpoint is responding.
Args:
api: ``LLMApi`` instance (provides base_url, api_key, api_type).
timeout: Per-request timeout in seconds.
Returns:
``(ok, detail)`` — ok is True if the endpoint answered acceptably;
detail is a short human-readable status (HTTP code, error, or "ok").
"""
base_url = api.base_url.rstrip("/")
headers = {}
if api.api_key:
headers["Authorization"] = f"Bearer {api.api_key}"
if api.api_type not in _OPENAI_COMPATIBLE:
# bedrock / anthropic have no equivalent cheap unauthenticated list;
# treat a reachable host as the liveness signal via a HEAD on base_url.
try:
resp = requests.head(base_url, headers=headers, timeout=timeout)
return True, f"reachable (HTTP {resp.status_code})"
except requests.RequestException as exc:
return False, type(exc).__name__
url = f"{base_url}/models"
try:
resp = requests.get(url, headers=headers, timeout=timeout)
except requests.Timeout:
return False, f"timeout after {timeout}s"
except requests.RequestException as exc:
return False, type(exc).__name__
if resp.status_code == 200:
return True, "ok"
return False, f"HTTP {resp.status_code}"
def probe_system_models(timeout: int = 5) -> list[dict]:
"""Probe all four system-default models for reachability.
Returns:
One dict per role with keys: ``role``, ``configured``, ``model_name``,
``api_name``, ``base_url``, ``ok``, ``detail``, ``latency_ms``.
For an unconfigured role, ``configured`` is False and the probe is
skipped (``ok`` is None).
"""
from llm_manager.models import LLMModel
results: list[dict] = []
for role, getter_name in ROLE_GETTERS:
model = getattr(LLMModel, getter_name)()
if model is None:
results.append(
{
"role": role,
"configured": False,
"model_name": None,
"api_name": None,
"base_url": None,
"ok": None,
"detail": "not configured",
"latency_ms": None,
}
)
continue
api = model.api
start = time.monotonic()
ok, detail = probe_api(api, timeout=timeout)
latency_ms = round((time.monotonic() - start) * 1000, 1)
results.append(
{
"role": role,
"configured": True,
"model_name": model.name,
"api_name": api.name,
"base_url": api.base_url,
"ok": ok,
"detail": detail,
"latency_ms": latency_ms,
}
)
return results

View File

@@ -0,0 +1,88 @@
"""
Cross-bucket S3 helper for ingesting files from upstream source buckets
into Mnemosyne's own bucket.
Each upstream system (Daedalus, Spelunker, ...) writes files to its own S3
bucket and posts an ingest request to Mnemosyne carrying a `source`
identifier and the object key. This module looks up that source's read
credentials in settings.SOURCE_S3_BUCKETS, fetches the file, and writes it
into Mnemosyne's bucket via the standard `default_storage` backend so the
rest of the pipeline (parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
DEFAULT_SOURCE = "daedalus"
def _source_config(source: str) -> dict:
"""
Resolve the S3 read config for an ingest source.
Unknown or blank sources fall back to DEFAULT_SOURCE for backwards
compatibility with pre-registry ingest requests.
"""
registry = settings.SOURCE_S3_BUCKETS
key = source if source in registry else DEFAULT_SOURCE
if key not in registry:
raise RuntimeError(
f"No S3 bucket configured for source '{source}' "
f"(and no '{DEFAULT_SOURCE}' fallback)."
)
return registry[key]
def _source_s3_client(config: dict):
"""Build a boto3 S3 client pointed at a source bucket."""
return boto3.client(
"s3",
endpoint_url=config["endpoint_url"] or None,
aws_access_key_id=config["access_key_id"],
aws_secret_access_key=config["secret_access_key"],
region_name=config["region_name"],
use_ssl=config["use_ssl"],
verify=config["verify"],
)
def fetch_from_source(source: str, s3_key: str) -> bytes:
"""
Read a file from an upstream source's S3 bucket.
:param source: Ingest source identifier (e.g. "daedalus", "spelunker").
:param s3_key: Object key in the source bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
config = _source_config(source)
client = _source_s3_client(config)
bucket = config["bucket_name"]
logger.debug("Fetching from source=%s bucket=%s key=%s", source, bucket, s3_key)
response = client.get_object(Bucket=bucket, Key=s3_key)
data = response["Body"].read()
logger.info(
"Fetched from source=%s bucket=%s key=%s size=%d",
source, bucket, s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info("Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data))
return saved_key

View File

@@ -347,9 +347,9 @@ def ingest_from_daedalus(self, job_id: str):
from datetime import datetime, timezone
from library.models import IngestJob, Item, Library
from library.services.daedalus_s3 import (
from library.services.source_s3 import (
copy_into_mnemosyne,
fetch_from_daedalus,
fetch_from_source,
)
from library.services.pipeline import EmbeddingPipeline
@@ -401,11 +401,11 @@ def ingest_from_daedalus(self, job_id: str):
)
_delete_item_and_chunks(prior_item_uid)
# --- 3. Fetch from Daedalus, copy into Mnemosyne bucket ---
# --- 3. Fetch from the source bucket, copy into Mnemosyne bucket ---
job.progress = "copying"
job.save(update_fields=["progress"])
data = fetch_from_daedalus(job.s3_key)
data = fetch_from_source(job.source, job.s3_key)
# --- 4. Create Item node ---
ext = _normalize_file_type(job.file_type)

View File

@@ -0,0 +1,18 @@
{% comment %}
Reachability badge for a system-default model. Expects `h` = one entry from
the `model_health` dict (keys: configured, ok, detail, latency_ms). Renders
nothing when the role is absent from model_health (probe failed entirely).
Text-only badges to match the existing dashboard palette (no emoji per house
HTML rule).
{% endcomment %}
{% if h %}
{% if not h.configured %}
<span class="badge badge-ghost badge-sm ml-2" title="No system-default model set for this role">NOT CONFIGURED</span>
{% elif h.ok %}
<span class="badge badge-success badge-sm ml-2" title="{{ h.detail }}">REACHABLE</span>
{% if h.latency_ms is not None %}<span class="text-xs opacity-50 ml-1">{{ h.latency_ms }} ms</span>{% endif %}
{% else %}
<span class="badge badge-error badge-sm ml-2" title="Probe detail: {{ h.detail }}">NOT RESPONDING</span>
<span class="text-xs opacity-60 ml-1">{{ h.detail }}</span>
{% endif %}
{% endif %}

View File

@@ -28,6 +28,7 @@
{% if system_embedding_model.supports_multimodal %}
<span class="badge badge-accent badge-sm ml-1">Multimodal</span>
{% endif %}
{% include "library/_model_health_badge.html" with h=model_health.embedding %}
{% else %}
<div class="flex items-center gap-2">
<span class="badge badge-error">NOT CONFIGURED</span>
@@ -41,6 +42,7 @@
<td>
{% if system_chat_model %}
<span class="font-semibold">{{ system_chat_model.api.name }}: {{ system_chat_model.name }}</span>
{% include "library/_model_health_badge.html" with h=model_health.chat %}
{% else %}
<span class="text-sm opacity-60">Not configured — concept extraction disabled</span>
{% endif %}
@@ -51,6 +53,7 @@
<td>
{% if system_reranker_model %}
<span class="font-semibold">{{ system_reranker_model.api.name }}: {{ system_reranker_model.name }}</span>
{% include "library/_model_health_badge.html" with h=model_health.reranker %}
{% else %}
<span class="text-sm opacity-60">Not configured — Phase 3</span>
{% endif %}
@@ -64,6 +67,7 @@
{% if system_vision_model.supports_vision %}
<span class="badge badge-accent badge-sm ml-1">Vision</span>
{% endif %}
{% include "library/_model_health_badge.html" with h=model_health.vision %}
{% else %}
<span class="text-sm opacity-60">Not configured — image analysis disabled</span>
{% endif %}

View File

@@ -48,30 +48,3 @@ class ConceptExtractionParsingTests(TestCase):
result = self.extractor._parse_concept_response(response)
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], "valid")
class SampleIndexSelectionTests(TestCase):
"""Tests for sample index selection."""
def setUp(self):
self.extractor = ConceptExtractor(MagicMock())
def test_small_total_returns_all(self):
indices = self.extractor._select_sample_indices(5, max_samples=10)
self.assertEqual(indices, [0, 1, 2, 3, 4])
def test_equal_total_returns_all(self):
indices = self.extractor._select_sample_indices(10, max_samples=10)
self.assertEqual(indices, list(range(10)))
def test_large_total_returns_max_samples(self):
indices = self.extractor._select_sample_indices(100, max_samples=10)
self.assertEqual(len(indices), 10)
# Should be evenly spaced
self.assertEqual(indices[0], 0)
self.assertEqual(indices[-1], 90)
def test_returns_integers(self):
indices = self.extractor._select_sample_indices(50, max_samples=7)
for idx in indices:
self.assertIsInstance(idx, int)

View File

@@ -48,7 +48,7 @@ class EmbeddingPipelineInitTests(TestCase):
class PipelineItemNotFoundTests(TestCase):
"""Tests for handling missing items."""
@patch("library.services.pipeline.Item")
@patch("library.models.Item")
def test_process_nonexistent_item_raises(self, mock_item_cls):
mock_item_cls.nodes.get.side_effect = Exception("Not found")
@@ -57,7 +57,7 @@ class PipelineItemNotFoundTests(TestCase):
pipeline.process_item("nonexistent-uid")
self.assertIn("Item not found", str(ctx.exception))
@patch("library.services.pipeline.Item")
@patch("library.models.Item")
def test_reprocess_nonexistent_item_raises(self, mock_item_cls):
mock_item_cls.nodes.get.side_effect = Exception("Not found")
@@ -69,9 +69,9 @@ class PipelineItemNotFoundTests(TestCase):
class PipelineNoEmbeddingModelTests(TestCase):
"""Tests for handling missing system embedding model."""
@patch("library.services.pipeline.LLMModel")
@patch("llm_manager.models.LLMModel")
@patch("library.services.pipeline.default_storage")
@patch("library.services.pipeline.DocumentParser")
@patch("library.services.parsers.DocumentParser")
def test_no_embedding_model_raises(self, mock_parser, mock_storage, mock_llm):
"""Pipeline raises ValueError if no system embedding model is configured."""
mock_llm.get_system_embedding_model.return_value = None
@@ -86,7 +86,7 @@ class PipelineNoEmbeddingModelTests(TestCase):
mock_item.chunks.all.return_value = []
mock_item.images.all.return_value = []
with patch("library.services.pipeline.Item") as mock_item_cls:
with patch("library.models.Item") as mock_item_cls:
mock_item_cls.nodes.get.return_value = mock_item
# Mock S3 read
@@ -166,11 +166,11 @@ class PipelineVisionStageTests(TestCase):
item.images.all.return_value = []
return item
@patch("library.services.pipeline.ConceptExtractor")
@patch("library.services.pipeline.EmbeddingClient")
@patch("library.services.pipeline.ContentTypeChunker")
@patch("library.services.pipeline.DocumentParser")
@patch("library.services.pipeline.LLMModel")
@patch("library.services.concepts.ConceptExtractor")
@patch("library.services.embedding_client.EmbeddingClient")
@patch("library.services.chunker.ContentTypeChunker")
@patch("library.services.parsers.DocumentParser")
@patch("llm_manager.models.LLMModel")
@patch("library.services.pipeline.default_storage")
def test_no_vision_model_marks_images_skipped(
self, mock_storage, mock_llm, mock_parser_cls,
@@ -227,12 +227,12 @@ class PipelineVisionStageTests(TestCase):
img_node.save.assert_called()
self.assertEqual(result["images_analyzed"], 0)
@patch("library.services.pipeline.VisionAnalyzer")
@patch("library.services.pipeline.ConceptExtractor")
@patch("library.services.pipeline.EmbeddingClient")
@patch("library.services.pipeline.ContentTypeChunker")
@patch("library.services.pipeline.DocumentParser")
@patch("library.services.pipeline.LLMModel")
@patch("library.services.vision.VisionAnalyzer")
@patch("library.services.concepts.ConceptExtractor")
@patch("library.services.embedding_client.EmbeddingClient")
@patch("library.services.chunker.ContentTypeChunker")
@patch("library.services.parsers.DocumentParser")
@patch("llm_manager.models.LLMModel")
@patch("library.services.pipeline.default_storage")
def test_vision_model_triggers_analysis(
self, mock_storage, mock_llm, mock_parser_cls,
@@ -287,7 +287,7 @@ class PipelineVisionStageTests(TestCase):
mock_vision_cls.assert_called_once_with(mock_vision_model, user=None)
mock_analyzer.analyze_images.assert_called_once()
@patch("library.services.pipeline.LLMModel")
@patch("llm_manager.models.LLMModel")
def test_no_images_skips_vision_entirely(self, mock_llm):
"""When there are no images, vision stage is a no-op regardless of model."""
mock_vision_model = MagicMock()
@@ -309,10 +309,10 @@ class PipelineVisionStageTests(TestCase):
patch.object(pipeline, "_store_chunks", return_value=[]), \
patch.object(pipeline, "_store_images", return_value=[]), \
patch.object(pipeline, "_associate_images_with_chunks"), \
patch("library.services.pipeline.DocumentParser") as mock_parser_cls, \
patch("library.services.pipeline.ContentTypeChunker") as mock_chunker_cls, \
patch("library.services.pipeline.EmbeddingClient"), \
patch("library.services.pipeline.VisionAnalyzer") as mock_vision_cls:
patch("library.services.parsers.DocumentParser") as mock_parser_cls, \
patch("library.services.chunker.ContentTypeChunker") as mock_chunker_cls, \
patch("library.services.embedding_client.EmbeddingClient"), \
patch("library.services.vision.VisionAnalyzer") as mock_vision_cls:
mock_parser = MagicMock()
mock_parser.parse_bytes.return_value = MagicMock(images=[], text_blocks=[])

View File

@@ -100,7 +100,7 @@ class SearchAPIResponseTest(TestCase):
self.client = APIClient()
self.client.force_authenticate(user=self.user)
@patch("library.api.views.SearchService")
@patch("library.services.search.SearchService")
def test_successful_search_response_format(self, MockService):
"""Successful search returns expected JSON structure."""
mock_response = SearchResponse(
@@ -159,7 +159,7 @@ class SearchAPIResponseTest(TestCase):
self.assertEqual(image["image_uid"], "img1")
self.assertEqual(image["image_type"], "diagram")
@patch("library.api.views.SearchService")
@patch("library.services.search.SearchService")
def test_vector_only_endpoint(self, MockService):
"""Vector-only endpoint sets correct search types."""
mock_response = SearchResponse(
@@ -184,7 +184,7 @@ class SearchAPIResponseTest(TestCase):
self.assertEqual(call_args.search_types, ["vector"])
self.assertFalse(call_args.rerank)
@patch("library.api.views.SearchService")
@patch("library.services.search.SearchService")
def test_fulltext_only_endpoint(self, MockService):
"""Fulltext-only endpoint sets correct search types."""
mock_response = SearchResponse(
@@ -208,7 +208,7 @@ class SearchAPIResponseTest(TestCase):
self.assertEqual(call_args.search_types, ["fulltext"])
self.assertFalse(call_args.rerank)
@patch("library.api.views.SearchService")
@patch("library.services.search.SearchService")
def test_reranker_skip_reason_surfaced_in_json(self, MockService):
"""``reranker_skip_reason`` propagates through the JSON API."""
mock_response = SearchResponse(

View File

@@ -48,7 +48,7 @@ class AllLibraryUidsHelperTests(TestCase):
def test_returns_empty_when_neo4j_unavailable(self):
"""Helper must not touch ``Library.nodes`` if Neo4j is down."""
with patch("library.views.neo4j_available", return_value=False):
with patch("library.utils.neo4j_available", return_value=False):
self.assertEqual(views._all_library_uids(), [])
def test_returns_every_library_uid(self):
@@ -62,7 +62,7 @@ class AllLibraryUidsHelperTests(TestCase):
fake_nodes.all.return_value = fake_libs
fake_library_cls = SimpleNamespace(nodes=fake_nodes)
with patch("library.views.neo4j_available", return_value=True), \
with patch("library.utils.neo4j_available", return_value=True), \
patch.dict("sys.modules", {"library.models": SimpleNamespace(Library=fake_library_cls)}):
result = views._all_library_uids()
@@ -83,7 +83,7 @@ class AllLibraryUidsHelperTests(TestCase):
fake_nodes.all.return_value = fake_libs
fake_library_cls = SimpleNamespace(nodes=fake_nodes)
with patch("library.views.neo4j_available", return_value=True), \
with patch("library.utils.neo4j_available", return_value=True), \
patch.dict("sys.modules", {"library.models": SimpleNamespace(Library=fake_library_cls)}):
result = views._all_library_uids()
@@ -95,7 +95,7 @@ class AllLibraryUidsHelperTests(TestCase):
fake_nodes.all.side_effect = RuntimeError("neo4j blew up")
fake_library_cls = SimpleNamespace(nodes=fake_nodes)
with patch("library.views.neo4j_available", return_value=True), \
with patch("library.utils.neo4j_available", return_value=True), \
patch.dict("sys.modules", {"library.models": SimpleNamespace(Library=fake_library_cls)}):
self.assertEqual(views._all_library_uids(), [])

View File

@@ -0,0 +1,61 @@
"""Tests for the source-bucket registry resolution."""
from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase, override_settings
from library.services import source_s3
_REGISTRY = {
"daedalus": {
"endpoint_url": "https://nyx.test:8555",
"access_key_id": "dae_key",
"secret_access_key": "dae_secret",
"bucket_name": "daedalus",
"region_name": "us-east-1",
"use_ssl": True,
"verify": True,
},
"spelunker": {
"endpoint_url": "https://nyx.test:8555",
"access_key_id": "spel_key",
"secret_access_key": "spel_secret",
"bucket_name": "spelunker",
"region_name": "us-east-1",
"use_ssl": True,
"verify": True,
},
}
@override_settings(SOURCE_S3_BUCKETS=_REGISTRY)
class SourceConfigTest(SimpleTestCase):
def test_known_source(self):
cfg = source_s3._source_config("spelunker")
self.assertEqual(cfg["bucket_name"], "spelunker")
self.assertEqual(cfg["access_key_id"], "spel_key")
def test_unknown_source_falls_back_to_daedalus(self):
cfg = source_s3._source_config("totally-unknown")
self.assertEqual(cfg["bucket_name"], "daedalus")
def test_blank_source_falls_back_to_daedalus(self):
cfg = source_s3._source_config("")
self.assertEqual(cfg["bucket_name"], "daedalus")
@patch("library.services.source_s3.boto3.client")
def test_fetch_uses_source_bucket(self, mock_boto_client):
client = MagicMock()
body = MagicMock()
body.read.return_value = b"file-bytes"
client.get_object.return_value = {"Body": body}
mock_boto_client.return_value = client
data = source_s3.fetch_from_source("spelunker", "3/readme.md")
self.assertEqual(data, b"file-bytes")
client.get_object.assert_called_once_with(
Bucket="spelunker", Key="3/readme.md"
)
# boto client built with the spelunker source credentials
_, kwargs = mock_boto_client.call_args
self.assertEqual(kwargs["aws_access_key_id"], "spel_key")

View File

@@ -13,7 +13,7 @@ from django.test import TestCase, override_settings
class EmbedItemTaskTests(TestCase):
"""Tests for the embed_item task."""
@patch("library.tasks.EmbeddingPipeline")
@patch("library.services.pipeline.EmbeddingPipeline")
def test_embed_item_success(self, mock_pipeline_cls):
from library.tasks import embed_item
@@ -31,7 +31,7 @@ class EmbedItemTaskTests(TestCase):
self.assertEqual(result["item_uid"], "test-uid-123")
mock_pipeline.process_item.assert_called_once()
@patch("library.tasks.EmbeddingPipeline")
@patch("library.services.pipeline.EmbeddingPipeline")
def test_embed_item_failure(self, mock_pipeline_cls):
from library.tasks import embed_item
@@ -49,7 +49,7 @@ class EmbedItemTaskTests(TestCase):
class ReembedItemTaskTests(TestCase):
"""Tests for the reembed_item task."""
@patch("library.tasks.EmbeddingPipeline")
@patch("library.services.pipeline.EmbeddingPipeline")
def test_reembed_item_success(self, mock_pipeline_cls):
from library.tasks import reembed_item

View File

@@ -729,6 +729,16 @@ def embedding_dashboard(request):
except Exception as exc:
logger.warning("Could not load system models: %s", exc)
# Reachability of the system-default models (keyed by role for the
# template). A probe failure must never 500 the dashboard.
context["model_health"] = {}
try:
from library.services.model_health import probe_system_models
context["model_health"] = {r["role"]: r for r in probe_system_models()}
except Exception as exc:
logger.warning("Could not probe system model health: %s", exc)
# Get item status counts and node counts from Neo4j
if neo4j_available():
context["neo4j_available"] = True

View File

@@ -241,10 +241,15 @@ else:
},
}
# --- Daedalus S3 (cross-bucket reads for ingest) ---
# Mnemosyne ingests files written to Daedalus's S3 bucket. These vars
# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME
# (Mnemosyne's own bucket) by the Celery ingest task before processing.
# --- Source S3 buckets (cross-bucket reads for ingest) ---
# Mnemosyne ingests files that upstream systems write to their own S3
# buckets. Each ingest request carries a `source` identifier; the Celery
# ingest task looks up that source's read credentials here, fetches the
# file, and copies it into AWS_STORAGE_BUCKET_NAME (Mnemosyne's own bucket)
# before processing. Add a new entry per upstream source.
#
# Backwards compatible: the "daedalus" entry is built from the existing
# DAEDALUS_S3_* env vars.
DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="")
DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="")
DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="")
@@ -253,6 +258,38 @@ DAEDALUS_S3_REGION_NAME = env("DAEDALUS_S3_REGION_NAME", default="us-east-1")
DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False)
DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True)
# Spelunker writes scraped/extracted documents to its own bucket.
SPELUNKER_S3_ENDPOINT_URL = env("SPELUNKER_S3_ENDPOINT_URL", default="")
SPELUNKER_S3_ACCESS_KEY_ID = env("SPELUNKER_S3_ACCESS_KEY_ID", default="")
SPELUNKER_S3_SECRET_ACCESS_KEY = env("SPELUNKER_S3_SECRET_ACCESS_KEY", default="")
SPELUNKER_S3_BUCKET_NAME = env("SPELUNKER_S3_BUCKET_NAME", default="spelunker")
SPELUNKER_S3_REGION_NAME = env("SPELUNKER_S3_REGION_NAME", default="us-east-1")
SPELUNKER_S3_USE_SSL = env.bool("SPELUNKER_S3_USE_SSL", default=False)
SPELUNKER_S3_VERIFY = env.bool("SPELUNKER_S3_VERIFY", default=True)
# Registry keyed by the ingest `source` field. Unknown/blank sources fall
# back to "daedalus" for backwards compatibility.
SOURCE_S3_BUCKETS = {
"daedalus": {
"endpoint_url": DAEDALUS_S3_ENDPOINT_URL,
"access_key_id": DAEDALUS_S3_ACCESS_KEY_ID,
"secret_access_key": DAEDALUS_S3_SECRET_ACCESS_KEY,
"bucket_name": DAEDALUS_S3_BUCKET_NAME,
"region_name": DAEDALUS_S3_REGION_NAME,
"use_ssl": DAEDALUS_S3_USE_SSL,
"verify": DAEDALUS_S3_VERIFY,
},
"spelunker": {
"endpoint_url": SPELUNKER_S3_ENDPOINT_URL,
"access_key_id": SPELUNKER_S3_ACCESS_KEY_ID,
"secret_access_key": SPELUNKER_S3_SECRET_ACCESS_KEY,
"bucket_name": SPELUNKER_S3_BUCKET_NAME,
"region_name": SPELUNKER_S3_REGION_NAME,
"use_ssl": SPELUNKER_S3_USE_SSL,
"verify": SPELUNKER_S3_VERIFY,
},
}
# --- Celery / RabbitMQ ---
CELERY_BROKER_URL = env(
"CELERY_BROKER_URL",

View File

@@ -32,6 +32,19 @@
# resolution at startup and returns 502 after `docker compose restart app`.
resolver 127.0.0.11 valid=10s;
# Recover the real client IP from X-Forwarded-For (set by HAProxy on Titania)
# before evaluating the RFC1918 allowlists below. nginx runs as a sidecar with
# a published port, so every proxied request arrives via Docker's NAT gateway
# (an RFC1918 address) — without this, the allowlists match that gateway and
# pass ALL external traffic, exposing /metrics and /nginx_status publicly.
# HAProxy's own health checks (e.g. to /healthz) carry no XFF and keep their
# real 10.10.x.x source, so they stay allowed.
set_real_ip_from 10.0.0.0/8;
set_real_ip_from 172.16.0.0/12;
set_real_ip_from 192.168.0.0/16;
real_ip_header X-Forwarded-For;
real_ip_recursive on;
# Preserve X-Forwarded-Proto from the upstream reverse proxy (HAProxy TLS
# termination on Titania); fall back to $scheme only if there's no upstream
# header. Inside the compose network $scheme is always `http` because HAProxy