5 Commits

Author SHA1 Message Date
dd06f923cd feat(workspaces): return 409 name_conflict instead of 500 on Library name clash
Some checks failed
CVE Scan & Docker Build / security-scan (pull_request) Successful in 3m49s
CVE Scan & Docker Build / build-and-push (pull_request) Has been cancelled
A recreate of a workspace whose Mnemosyne Library was orphaned (left behind
by a failed Daedalus delete-propagate) collides on the global Library.name
unique constraint. neomodel raised UniqueProperty unguarded, so workspace_create
500'd and ingest then 404'd forever — the queue froze silently.

Guard lib.save() and return a structured 409 with a machine code so Daedalus
can classify the failure without string-matching:
- name_conflict   — the new name-collision case
- owner_conflict, library_type_immutable — codes added to the two existing 409s

Cypher-touching paths stay covered by the manual end-to-end plan, per the
test module's stated convention.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 20:26:43 -04:00
142e9675b5 feat(library): allow admin delete of Daedalus-managed library via shared cascade
Admin/HTML library delete previously hard-blocked workspace-scoped
(Daedalus-managed) libraries, leaving no way to clear an orphaned Library
node — e.g. one left behind when a Daedalus workspace delete failed to
propagate. A recreate of that workspace then collides on the global
Library.name unique constraint and 500s, freezing ingest.

Allow the delete behind the existing confirm warning (low risk: source
content lives in Daedalus and is recreated + re-embedded on next sync),
and route both the API and HTML delete paths through one shared cascade.

- Add library/services/library_delete.delete_library_cascade(lib), keyed on
  Library uid so it covers global and workspace-scoped libraries. It removes
  Chunks, Images/ImageEmbeddings, Items, Collections, the Library, then GCs
  orphan-only Concepts (verbatim from the API view, re-keyed workspace_id->uid).
- workspace_detail_or_delete (API) now calls the shared helper.
- library_delete (HTML) no longer blocks workspace_id libraries; it calls the
  cascade instead of a bare lib.delete() (which leaked child nodes — also a
  latent bug for global libraries with content).
- Confirm-delete template shows a caution banner for Daedalus-managed libraries.

No migration: Mnemosyne library data is in Neo4j (neomodel); no schema change.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-17 19:37:58 -04:00
a90c6e7479 feat(metrics): add scrape-time system model health collector
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 3m49s
Build & Deploy Docs / build-and-deploy (push) Successful in 1m9s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m32s
Add a Prometheus custom collector that probes the four system-default
models (chat, vision, embedding, reranker) at /metrics scrape time and
emits up/down, configured, and probe-latency gauges. This complements
the ingest-pipeline counters in the Celery worker, which only move
during active ingests and cannot signal model outages on an idle queue.

- New `library/health_collector.py` registers a custom collector with
  a 55s in-process cache to avoid hammering GPU endpoints on rapid
  scrapes or across multiple gunicorn workers.
- New `library/services/model_health.py` centralises the probe logic,
  resolving system-default models via SystemSettings and dispatching
  to chat/embedding/rerank endpoints with a short timeout.
- Register the collector only in the web process (gunicorn/runserver)
  via `LibraryConfig.ready`, excluding Celery, pytest, and management
  commands to prevent duplicate registration and stray probes.
- Add unit tests covering the collector cache, metric shape, and
  per-role probe dispatch.
2026-06-17 09:06:11 -04:00
4dde063299 fix(web): trust XFF for real client IP and correct port to 23081
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 3m41s
Build & Deploy Docs / build-and-deploy (push) Successful in 1m9s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m29s
- Configure nginx `set_real_ip_from` for RFC1918 ranges and enable
  `real_ip_recursive` so allowlists evaluate the true client IP
  instead of Docker's NAT gateway, preventing public exposure of
  `/metrics` and `/nginx_status`
- Update published port from 23181 to 23081 in docker-compose
2026-06-17 06:58:36 -04:00
ec4f12d601 feat(ingest): source-bucket registry keyed on ingest source
Generalises the Daedalus-only cross-bucket fetch into a registry
(SOURCE_S3_BUCKETS) keyed on the IngestJob `source` field, so new
upstream sources (Spelunker) can ingest from their own buckets. The
ingest task now calls fetch_from_source(job.source, job.s3_key) and
falls back to "daedalus" for blank/unknown sources (backwards compatible).

Adds SPELUNKER_S3_* env vars and worker env scoping. Replaces
daedalus_s3.py with source_s3.py.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 22:30:08 -04:00
17 changed files with 674 additions and 160 deletions

View File

@@ -78,6 +78,19 @@ DAEDALUS_S3_REGION_NAME=us-east-1
DAEDALUS_S3_USE_SSL=True DAEDALUS_S3_USE_SSL=True
DAEDALUS_S3_VERIFY=True DAEDALUS_S3_VERIFY=True
# --- Spelunker S3 (cross-bucket reads for ingest, source="spelunker") ---
# Consumed by: worker only
# Spelunker scrapes web/git documents into its own bucket and posts ingest
# requests with source="spelunker". These creds should be scoped read-only
# to the Spelunker bucket in your secret manager.
SPELUNKER_S3_ENDPOINT_URL=https://nyx.helu.ca:8555
SPELUNKER_S3_ACCESS_KEY_ID=
SPELUNKER_S3_SECRET_ACCESS_KEY=
SPELUNKER_S3_BUCKET_NAME=spelunker
SPELUNKER_S3_REGION_NAME=us-east-1
SPELUNKER_S3_USE_SSL=True
SPELUNKER_S3_VERIFY=True
# --- Celery / RabbitMQ (Oberon) --------------------------------------------- # --- Celery / RabbitMQ (Oberon) ---------------------------------------------
# Consumed by: app (producer), worker (consumer). NOT mcp. # Consumed by: app (producer), worker (consumer). NOT mcp.
# Remember to percent-encode any password characters that have meaning in a # Remember to percent-encode any password characters that have meaning in a

View File

@@ -339,6 +339,13 @@ services:
- DAEDALUS_S3_REGION_NAME=${DAEDALUS_S3_REGION_NAME} - DAEDALUS_S3_REGION_NAME=${DAEDALUS_S3_REGION_NAME}
- DAEDALUS_S3_USE_SSL=${DAEDALUS_S3_USE_SSL} - DAEDALUS_S3_USE_SSL=${DAEDALUS_S3_USE_SSL}
- DAEDALUS_S3_VERIFY=${DAEDALUS_S3_VERIFY} - DAEDALUS_S3_VERIFY=${DAEDALUS_S3_VERIFY}
- SPELUNKER_S3_ENDPOINT_URL=${SPELUNKER_S3_ENDPOINT_URL}
- SPELUNKER_S3_ACCESS_KEY_ID=${SPELUNKER_S3_ACCESS_KEY_ID}
- SPELUNKER_S3_SECRET_ACCESS_KEY=${SPELUNKER_S3_SECRET_ACCESS_KEY}
- SPELUNKER_S3_BUCKET_NAME=${SPELUNKER_S3_BUCKET_NAME}
- SPELUNKER_S3_REGION_NAME=${SPELUNKER_S3_REGION_NAME}
- SPELUNKER_S3_USE_SSL=${SPELUNKER_S3_USE_SSL}
- SPELUNKER_S3_VERIFY=${SPELUNKER_S3_VERIFY}
# Celery / RabbitMQ # Celery / RabbitMQ
- CELERY_BROKER_URL=${CELERY_BROKER_URL} - CELERY_BROKER_URL=${CELERY_BROKER_URL}
- CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND} - CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND}
@@ -370,7 +377,7 @@ services:
retries: 3 retries: 3
start_period: 60s start_period: 60s
# ── Web: nginx reverse proxy, public port 23181 ──────────────────────────── # ── Web: nginx reverse proxy, public port 23081 ────────────────────────────
# No Django env — nginx only knows how to route. Public listener is # No Django env — nginx only knows how to route. Public listener is
# templated into the conf file by Ansible if the port ever needs to change. # templated into the conf file by Ansible if the port ever needs to change.
web: web:
@@ -383,7 +390,7 @@ services:
mcp: mcp:
condition: service_healthy condition: service_healthy
ports: ports:
- "23181:80" - "23081:80"
volumes: volumes:
- ./nginx/mnemosyne.conf:/etc/nginx/conf.d/default.conf:ro - ./nginx/mnemosyne.conf:/etc/nginx/conf.d/default.conf:ro
- static:/var/www/static:ro - static:/var/www/static:ro

View File

@@ -17,12 +17,14 @@ across users.
import logging import logging
from neomodel import db from neomodel import db
from neomodel.exceptions import UniqueProperty
from rest_framework import status from rest_framework import status
from rest_framework.decorators import api_view, permission_classes from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response from rest_framework.response import Response
from library.content_types import get_library_type_config from library.content_types import get_library_type_config
from library.services.library_delete import delete_library_cascade
from .serializers import WorkspaceCreateSerializer, WorkspaceStatusSerializer from .serializers import WorkspaceCreateSerializer, WorkspaceStatusSerializer
@@ -84,7 +86,10 @@ def workspace_create(request):
data["workspace_id"], request.user.username, data["workspace_id"], request.user.username,
) )
return Response( return Response(
{"detail": "Workspace id is already in use."}, {
"detail": "Workspace id is already in use.",
"code": "owner_conflict",
},
status=status.HTTP_409_CONFLICT, status=status.HTTP_409_CONFLICT,
) )
if existing.library_type != data["library_type"]: if existing.library_type != data["library_type"]:
@@ -94,7 +99,8 @@ def workspace_create(request):
"library_type is immutable for an existing workspace " "library_type is immutable for an existing workspace "
f"(have '{existing.library_type}', " f"(have '{existing.library_type}', "
f"got '{data['library_type']}')." f"got '{data['library_type']}')."
) ),
"code": "library_type_immutable",
}, },
status=status.HTTP_409_CONFLICT, status=status.HTTP_409_CONFLICT,
) )
@@ -119,7 +125,29 @@ def workspace_create(request):
reranker_instruction=defaults["reranker_instruction"], reranker_instruction=defaults["reranker_instruction"],
llm_context_prompt=defaults["llm_context_prompt"], llm_context_prompt=defaults["llm_context_prompt"],
) )
lib.save() try:
lib.save()
except UniqueProperty:
# Library.name is globally unique. A name collision here almost always
# means an orphaned Library survived a failed Daedalus workspace delete
# (the old node kept the name), and the recreate under a new
# workspace_id now clashes. Surface a clean 409 instead of a 500 so
# Daedalus can record + report it; the operator clears the orphan
# (admin delete) or renames the workspace.
logger.warning(
"workspace_create name_conflict workspace_id=%s name=%s",
data["workspace_id"], data["name"],
)
return Response(
{
"detail": (
f"A library named '{data['name']}' already exists in "
"Mnemosyne."
),
"code": "name_conflict",
},
status=status.HTTP_409_CONFLICT,
)
logger.info( logger.info(
"Workspace created workspace_id=%s library_uid=%s library_type=%s", "Workspace created workspace_id=%s library_uid=%s library_type=%s",
data["workspace_id"], lib.uid, lib.library_type, data["workspace_id"], lib.uid, lib.library_type,
@@ -165,74 +193,15 @@ def workspace_detail_or_delete(request, workspace_id):
if lib is None: if lib is None:
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
library_uid = lib.uid # Delete the Library and everything reachable + unique to it, plus
library_name = lib.name # orphan-Concept GC. Shared with the admin/HTML delete path.
result = delete_library_cascade(lib)
# Step 1-4: delete chunks, items, collections, then the library itself.
# We collect Item s3_keys first so the caller can clean up S3
# asynchronously (a future enhancement — for now, the keys are logged).
s3_rows, _ = db.cypher_query(
"MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)"
"-[:CONTAINS]->(i:Item) RETURN i.uid, i.s3_key",
{"wsid": workspace_id},
)
item_s3_keys = [(r[0], r[1]) for r in s3_rows if r[1]]
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)-[:HAS_CHUNK]->(c:Chunk)
DETACH DELETE c
""",
{"wsid": workspace_id},
)
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)-[:HAS_IMAGE]->(img:Image)
OPTIONAL MATCH (img)-[:HAS_EMBEDDING]->(emb:ImageEmbedding)
DETACH DELETE img, emb
""",
{"wsid": workspace_id},
)
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)
DETACH DELETE i
""",
{"wsid": workspace_id},
)
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(col:Collection)
DETACH DELETE col
""",
{"wsid": workspace_id},
)
db.cypher_query(
"MATCH (l:Library {workspace_id: $wsid}) DETACH DELETE l",
{"wsid": workspace_id},
)
# Step 5: orphan Concept garbage collection.
orphan_result, _ = db.cypher_query(
"""
MATCH (con:Concept)
WHERE NOT (con)<-[:REFERENCES]-() AND NOT (con)<-[:MENTIONS]-()
AND NOT (con)<-[:DEPICTS]-()
WITH con
DETACH DELETE con
RETURN count(con) AS deleted
"""
)
orphans_deleted = orphan_result[0][0] if orphan_result else 0
logger.info( logger.info(
"Workspace deleted workspace_id=%s library_uid=%s name=%s " "Workspace deleted workspace_id=%s library_uid=%s name=%s "
"items=%d orphans_deleted=%d", "items=%d orphans_deleted=%d",
workspace_id, library_uid, library_name, workspace_id, result["library_uid"], result["name"],
len(item_s3_keys), orphans_deleted, result["item_count"], result["orphans_deleted"],
) )
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)

View File

@@ -88,6 +88,29 @@ def _should_skip_probe() -> bool:
return False return False
def _is_web_process() -> bool:
"""
True when running inside the web (gunicorn / runserver) process.
The reachability collector must only register here: ``/metrics`` is served
by the web process, and registering in the Celery worker would both probe
the GPU endpoints from a process whose metrics nobody scrapes and risk
duplicate registration. Celery launches via ``celery`` argv; management
commands are excluded above.
"""
argv0 = sys.argv[0]
if "celery" in argv0 or (len(sys.argv) >= 2 and sys.argv[1] == "celery"):
return False
if "pytest" in argv0 or "PYTEST_CURRENT_TEST" in os.environ:
return False
# gunicorn (prod) or runserver (dev).
if "gunicorn" in argv0:
return True
if len(sys.argv) >= 2 and sys.argv[1] == "runserver":
return True
return False
def _run_startup_probe(): def _run_startup_probe():
""" """
Emit ERROR/WARNING logs if the stack is misconfigured for search. Emit ERROR/WARNING logs if the stack is misconfigured for search.
@@ -199,4 +222,7 @@ class LibraryConfig(AppConfig):
verbose_name = "Library" verbose_name = "Library"
def ready(self): def ready(self):
pass if _is_web_process():
from library.health_collector import register
register()

View File

@@ -0,0 +1,99 @@
"""
Scrape-time Prometheus collector for system-default model reachability.
The ingest-pipeline counters in ``library/metrics.py`` live in the Celery
worker process and only move during an active ingest, so they cannot signal
"models down" on an idle queue. This collector runs in the **web** process
(where ``/metrics`` is served by ``django_prometheus``) and probes the four
system-default models at scrape time, emitting an up/down gauge that is
present regardless of queue activity.
Probe results are cached for a short TTL so rapid scrapes — or multiple
gunicorn workers each scraped in turn — cannot hammer the GPU endpoints.
"""
import logging
import threading
import time
from prometheus_client.core import GaugeMetricFamily
from library.services.model_health import probe_system_models
logger = logging.getLogger(__name__)
# Cache probe results so repeated scrapes don't re-probe the router. The
# value is comfortably above a 15s scrape_interval but bounded so a recovered
# model shows green within a minute.
_CACHE_TTL_SECONDS = 55
_lock = threading.Lock()
_cache: dict = {"ts": 0.0, "results": None}
def _cached_probe() -> list[dict]:
"""Return probe results, re-probing only when the cache has expired."""
now = time.monotonic()
with _lock:
if _cache["results"] is not None and (now - _cache["ts"]) < _CACHE_TTL_SECONDS:
return _cache["results"]
try:
results = probe_system_models()
except Exception as exc: # never let a probe failure break /metrics
logger.warning("Model health probe failed during scrape: %s", exc)
# Serve the stale cache if we have one; otherwise report nothing.
return _cache["results"] or []
_cache["ts"] = now
_cache["results"] = results
return results
class SystemModelHealthCollector:
"""prometheus_client custom collector for system-default model health."""
def collect(self):
results = _cached_probe()
up = GaugeMetricFamily(
"mnemosyne_system_default_model_up",
"System-default model endpoint reachable (1) or not (0)",
labels=["role", "model", "api"],
)
configured = GaugeMetricFamily(
"mnemosyne_system_default_model_configured",
"A system-default model is configured for this role (1) or not (0)",
labels=["role"],
)
latency = GaugeMetricFamily(
"mnemosyne_system_default_model_probe_latency_seconds",
"Latency of the last reachability probe for this role",
labels=["role"],
)
for r in results:
role = r["role"]
configured.add_metric([role], 1 if r["configured"] else 0)
if not r["configured"]:
continue
up.add_metric(
[role, r["model_name"] or "", r["api_name"] or ""],
1 if r["ok"] else 0,
)
if r["latency_ms"] is not None:
latency.add_metric([role], r["latency_ms"] / 1000.0)
yield configured
yield up
yield latency
def register():
"""Register the collector against the default registry (idempotent)."""
from prometheus_client import REGISTRY
# Guard against duplicate registration (autoreload, repeated ready()).
for collector in list(getattr(REGISTRY, "_collector_to_names", {})):
if isinstance(collector, SystemModelHealthCollector):
return
REGISTRY.register(SystemModelHealthCollector())
logger.info("Registered SystemModelHealthCollector on Prometheus default registry")

View File

@@ -1,70 +0,0 @@
"""
Cross-bucket S3 helper for ingesting files from the Daedalus S3 bucket
into Mnemosyne's own bucket.
Daedalus uploads files to its own bucket (configured per Daedalus deployment)
and posts an ingest request to Mnemosyne with the s3_key. This module fetches
that file using read-only Daedalus credentials and writes it to Mnemosyne's
bucket via the standard `default_storage` backend so the rest of the pipeline
(parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
def _daedalus_s3_client():
"""Build a boto3 S3 client pointed at the Daedalus bucket."""
return boto3.client(
"s3",
endpoint_url=settings.DAEDALUS_S3_ENDPOINT_URL or None,
aws_access_key_id=settings.DAEDALUS_S3_ACCESS_KEY_ID,
aws_secret_access_key=settings.DAEDALUS_S3_SECRET_ACCESS_KEY,
region_name=settings.DAEDALUS_S3_REGION_NAME,
use_ssl=settings.DAEDALUS_S3_USE_SSL,
verify=settings.DAEDALUS_S3_VERIFY,
)
def fetch_from_daedalus(daedalus_s3_key: str) -> bytes:
"""
Read a file from the Daedalus S3 bucket.
:param daedalus_s3_key: Object key in the Daedalus bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
client = _daedalus_s3_client()
bucket = settings.DAEDALUS_S3_BUCKET_NAME
logger.debug(
"Fetching from Daedalus S3 bucket=%s key=%s", bucket, daedalus_s3_key
)
response = client.get_object(Bucket=bucket, Key=daedalus_s3_key)
data = response["Body"].read()
logger.info(
"Fetched from Daedalus S3 bucket=%s key=%s size=%d",
bucket, daedalus_s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info(
"Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data),
)
return saved_key

View File

@@ -0,0 +1,108 @@
"""
Shared Library deletion cascade.
Deletes a Library node and everything reachable AND unique to it
(Collections, Items, Chunks, Images + ImageEmbeddings), then garbage-collects
Concepts that are no longer referenced by any other Library.
Keyed on the Library ``uid`` so it works for *both* global libraries
(``workspace_id`` is null) and workspace-scoped libraries. This is the single
source of truth used by:
* the Daedalus integration API (``DELETE /library/api/workspaces/{id}/``), and
* the admin/HTML delete view (``library_delete``).
Concept-safe: orphan-only Concept GC happens at the end. Concepts still
referenced by another library (workspace or global) are preserved.
"""
import logging
from neomodel import db
logger = logging.getLogger(__name__)
def delete_library_cascade(lib) -> dict:
"""Delete ``lib`` and all content reachable and unique to it.
:param lib: A ``library.models.Library`` node instance.
:returns: Dict with ``library_uid``, ``name``, ``item_count``,
``item_s3_keys`` (list of ``(uid, s3_key)`` for async S3 cleanup),
and ``orphans_deleted`` (Concept GC count).
"""
library_uid = lib.uid
library_name = lib.name
# Collect Item s3_keys first so the caller can clean up S3 asynchronously
# (a future enhancement — for now, the keys are returned/logged).
s3_rows, _ = db.cypher_query(
"MATCH (l:Library {uid: $uid})-[:CONTAINS]->(:Collection)"
"-[:CONTAINS]->(i:Item) RETURN i.uid, i.s3_key",
{"uid": library_uid},
)
item_s3_keys = [(r[0], r[1]) for r in s3_rows if r[1]]
db.cypher_query(
"""
MATCH (l:Library {uid: $uid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)-[:HAS_CHUNK]->(c:Chunk)
DETACH DELETE c
""",
{"uid": library_uid},
)
db.cypher_query(
"""
MATCH (l:Library {uid: $uid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)-[:HAS_IMAGE]->(img:Image)
OPTIONAL MATCH (img)-[:HAS_EMBEDDING]->(emb:ImageEmbedding)
DETACH DELETE img, emb
""",
{"uid": library_uid},
)
db.cypher_query(
"""
MATCH (l:Library {uid: $uid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)
DETACH DELETE i
""",
{"uid": library_uid},
)
db.cypher_query(
"""
MATCH (l:Library {uid: $uid})-[:CONTAINS]->(col:Collection)
DETACH DELETE col
""",
{"uid": library_uid},
)
db.cypher_query(
"MATCH (l:Library {uid: $uid}) DETACH DELETE l",
{"uid": library_uid},
)
# Orphan Concept garbage collection: drop Concepts no longer referenced
# by any Item (REFERENCES/MENTIONS) or Image (DEPICTS).
orphan_result, _ = db.cypher_query(
"""
MATCH (con:Concept)
WHERE NOT (con)<-[:REFERENCES]-() AND NOT (con)<-[:MENTIONS]-()
AND NOT (con)<-[:DEPICTS]-()
WITH con
DETACH DELETE con
RETURN count(con) AS deleted
"""
)
orphans_deleted = orphan_result[0][0] if orphan_result else 0
logger.info(
"Library cascade-deleted library_uid=%s name=%s items=%d orphans_deleted=%d",
library_uid, library_name, len(item_s3_keys), orphans_deleted,
)
return {
"library_uid": library_uid,
"name": library_name,
"item_count": len(item_s3_keys),
"item_s3_keys": item_s3_keys,
"orphans_deleted": orphans_deleted,
}

View File

@@ -0,0 +1,119 @@
"""
System-default model reachability probes.
Provides a cheap, bounded liveness check for the four system-default models
(embedding, chat, vision, reranker) so the embedding dashboard and the
scrape-time Prometheus collector can surface "model not responding" without
running an ingest.
The probe deliberately hits ``GET {base_url}/models`` as its primary check:
on an OpenAI-compatible router (e.g. the llama-router) this answers instantly
without loading a model, so repeated probes never burn GPU time. This mirrors
the GPU-avoidance principle in ``mcp_server/tools/health.py``.
"""
import logging
import time
from typing import Optional
import requests
logger = logging.getLogger(__name__)
# api_type values whose endpoints expose an OpenAI-compatible ``/models`` list.
_OPENAI_COMPATIBLE = {"openai", "azure", "ollama", "llama-cpp", "vllm"}
# (role, getter method name) pairs — order is the dashboard/metrics order.
ROLE_GETTERS = [
("embedding", "get_system_embedding_model"),
("chat", "get_system_chat_model"),
("vision", "get_system_vision_model"),
("reranker", "get_system_reranker_model"),
]
def probe_api(api, timeout: int = 5) -> tuple[bool, str]:
"""Check whether an ``LLMApi`` endpoint is responding.
Args:
api: ``LLMApi`` instance (provides base_url, api_key, api_type).
timeout: Per-request timeout in seconds.
Returns:
``(ok, detail)`` — ok is True if the endpoint answered acceptably;
detail is a short human-readable status (HTTP code, error, or "ok").
"""
base_url = api.base_url.rstrip("/")
headers = {}
if api.api_key:
headers["Authorization"] = f"Bearer {api.api_key}"
if api.api_type not in _OPENAI_COMPATIBLE:
# bedrock / anthropic have no equivalent cheap unauthenticated list;
# treat a reachable host as the liveness signal via a HEAD on base_url.
try:
resp = requests.head(base_url, headers=headers, timeout=timeout)
return True, f"reachable (HTTP {resp.status_code})"
except requests.RequestException as exc:
return False, type(exc).__name__
url = f"{base_url}/models"
try:
resp = requests.get(url, headers=headers, timeout=timeout)
except requests.Timeout:
return False, f"timeout after {timeout}s"
except requests.RequestException as exc:
return False, type(exc).__name__
if resp.status_code == 200:
return True, "ok"
return False, f"HTTP {resp.status_code}"
def probe_system_models(timeout: int = 5) -> list[dict]:
"""Probe all four system-default models for reachability.
Returns:
One dict per role with keys: ``role``, ``configured``, ``model_name``,
``api_name``, ``base_url``, ``ok``, ``detail``, ``latency_ms``.
For an unconfigured role, ``configured`` is False and the probe is
skipped (``ok`` is None).
"""
from llm_manager.models import LLMModel
results: list[dict] = []
for role, getter_name in ROLE_GETTERS:
model = getattr(LLMModel, getter_name)()
if model is None:
results.append(
{
"role": role,
"configured": False,
"model_name": None,
"api_name": None,
"base_url": None,
"ok": None,
"detail": "not configured",
"latency_ms": None,
}
)
continue
api = model.api
start = time.monotonic()
ok, detail = probe_api(api, timeout=timeout)
latency_ms = round((time.monotonic() - start) * 1000, 1)
results.append(
{
"role": role,
"configured": True,
"model_name": model.name,
"api_name": api.name,
"base_url": api.base_url,
"ok": ok,
"detail": detail,
"latency_ms": latency_ms,
}
)
return results

View File

@@ -0,0 +1,88 @@
"""
Cross-bucket S3 helper for ingesting files from upstream source buckets
into Mnemosyne's own bucket.
Each upstream system (Daedalus, Spelunker, ...) writes files to its own S3
bucket and posts an ingest request to Mnemosyne carrying a `source`
identifier and the object key. This module looks up that source's read
credentials in settings.SOURCE_S3_BUCKETS, fetches the file, and writes it
into Mnemosyne's bucket via the standard `default_storage` backend so the
rest of the pipeline (parsing, chunking, embedding) works unchanged.
"""
import logging
import boto3
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
logger = logging.getLogger(__name__)
DEFAULT_SOURCE = "daedalus"
def _source_config(source: str) -> dict:
"""
Resolve the S3 read config for an ingest source.
Unknown or blank sources fall back to DEFAULT_SOURCE for backwards
compatibility with pre-registry ingest requests.
"""
registry = settings.SOURCE_S3_BUCKETS
key = source if source in registry else DEFAULT_SOURCE
if key not in registry:
raise RuntimeError(
f"No S3 bucket configured for source '{source}' "
f"(and no '{DEFAULT_SOURCE}' fallback)."
)
return registry[key]
def _source_s3_client(config: dict):
"""Build a boto3 S3 client pointed at a source bucket."""
return boto3.client(
"s3",
endpoint_url=config["endpoint_url"] or None,
aws_access_key_id=config["access_key_id"],
aws_secret_access_key=config["secret_access_key"],
region_name=config["region_name"],
use_ssl=config["use_ssl"],
verify=config["verify"],
)
def fetch_from_source(source: str, s3_key: str) -> bytes:
"""
Read a file from an upstream source's S3 bucket.
:param source: Ingest source identifier (e.g. "daedalus", "spelunker").
:param s3_key: Object key in the source bucket.
:returns: File bytes.
:raises: botocore exceptions on failure (caller decides retry).
"""
config = _source_config(source)
client = _source_s3_client(config)
bucket = config["bucket_name"]
logger.debug("Fetching from source=%s bucket=%s key=%s", source, bucket, s3_key)
response = client.get_object(Bucket=bucket, Key=s3_key)
data = response["Body"].read()
logger.info(
"Fetched from source=%s bucket=%s key=%s size=%d",
source, bucket, s3_key, len(data),
)
return data
def copy_into_mnemosyne(data: bytes, mnemosyne_s3_key: str) -> str:
"""
Write bytes into Mnemosyne's S3 bucket via the default_storage backend.
:param data: File bytes (already in memory).
:param mnemosyne_s3_key: Target object key in Mnemosyne's bucket.
:returns: The actual key written (may differ from requested if
`file_overwrite=False` and the key existed).
"""
saved_key = default_storage.save(mnemosyne_s3_key, ContentFile(data))
logger.info("Wrote to Mnemosyne S3 key=%s size=%d", saved_key, len(data))
return saved_key

View File

@@ -347,9 +347,9 @@ def ingest_from_daedalus(self, job_id: str):
from datetime import datetime, timezone from datetime import datetime, timezone
from library.models import IngestJob, Item, Library from library.models import IngestJob, Item, Library
from library.services.daedalus_s3 import ( from library.services.source_s3 import (
copy_into_mnemosyne, copy_into_mnemosyne,
fetch_from_daedalus, fetch_from_source,
) )
from library.services.pipeline import EmbeddingPipeline from library.services.pipeline import EmbeddingPipeline
@@ -401,11 +401,11 @@ def ingest_from_daedalus(self, job_id: str):
) )
_delete_item_and_chunks(prior_item_uid) _delete_item_and_chunks(prior_item_uid)
# --- 3. Fetch from Daedalus, copy into Mnemosyne bucket --- # --- 3. Fetch from the source bucket, copy into Mnemosyne bucket ---
job.progress = "copying" job.progress = "copying"
job.save(update_fields=["progress"]) job.save(update_fields=["progress"])
data = fetch_from_daedalus(job.s3_key) data = fetch_from_source(job.source, job.s3_key)
# --- 4. Create Item node --- # --- 4. Create Item node ---
ext = _normalize_file_type(job.file_type) ext = _normalize_file_type(job.file_type)

View File

@@ -0,0 +1,18 @@
{% comment %}
Reachability badge for a system-default model. Expects `h` = one entry from
the `model_health` dict (keys: configured, ok, detail, latency_ms). Renders
nothing when the role is absent from model_health (probe failed entirely).
Text-only badges to match the existing dashboard palette (no emoji per house
HTML rule).
{% endcomment %}
{% if h %}
{% if not h.configured %}
<span class="badge badge-ghost badge-sm ml-2" title="No system-default model set for this role">NOT CONFIGURED</span>
{% elif h.ok %}
<span class="badge badge-success badge-sm ml-2" title="{{ h.detail }}">REACHABLE</span>
{% if h.latency_ms is not None %}<span class="text-xs opacity-50 ml-1">{{ h.latency_ms }} ms</span>{% endif %}
{% else %}
<span class="badge badge-error badge-sm ml-2" title="Probe detail: {{ h.detail }}">NOT RESPONDING</span>
<span class="text-xs opacity-60 ml-1">{{ h.detail }}</span>
{% endif %}
{% endif %}

View File

@@ -28,6 +28,7 @@
{% if system_embedding_model.supports_multimodal %} {% if system_embedding_model.supports_multimodal %}
<span class="badge badge-accent badge-sm ml-1">Multimodal</span> <span class="badge badge-accent badge-sm ml-1">Multimodal</span>
{% endif %} {% endif %}
{% include "library/_model_health_badge.html" with h=model_health.embedding %}
{% else %} {% else %}
<div class="flex items-center gap-2"> <div class="flex items-center gap-2">
<span class="badge badge-error">NOT CONFIGURED</span> <span class="badge badge-error">NOT CONFIGURED</span>
@@ -41,6 +42,7 @@
<td> <td>
{% if system_chat_model %} {% if system_chat_model %}
<span class="font-semibold">{{ system_chat_model.api.name }}: {{ system_chat_model.name }}</span> <span class="font-semibold">{{ system_chat_model.api.name }}: {{ system_chat_model.name }}</span>
{% include "library/_model_health_badge.html" with h=model_health.chat %}
{% else %} {% else %}
<span class="text-sm opacity-60">Not configured — concept extraction disabled</span> <span class="text-sm opacity-60">Not configured — concept extraction disabled</span>
{% endif %} {% endif %}
@@ -51,6 +53,7 @@
<td> <td>
{% if system_reranker_model %} {% if system_reranker_model %}
<span class="font-semibold">{{ system_reranker_model.api.name }}: {{ system_reranker_model.name }}</span> <span class="font-semibold">{{ system_reranker_model.api.name }}: {{ system_reranker_model.name }}</span>
{% include "library/_model_health_badge.html" with h=model_health.reranker %}
{% else %} {% else %}
<span class="text-sm opacity-60">Not configured — Phase 3</span> <span class="text-sm opacity-60">Not configured — Phase 3</span>
{% endif %} {% endif %}
@@ -64,6 +67,7 @@
{% if system_vision_model.supports_vision %} {% if system_vision_model.supports_vision %}
<span class="badge badge-accent badge-sm ml-1">Vision</span> <span class="badge badge-accent badge-sm ml-1">Vision</span>
{% endif %} {% endif %}
{% include "library/_model_health_badge.html" with h=model_health.vision %}
{% else %} {% else %}
<span class="text-sm opacity-60">Not configured — image analysis disabled</span> <span class="text-sm opacity-60">Not configured — image analysis disabled</span>
{% endif %} {% endif %}

View File

@@ -12,6 +12,18 @@
<div class="alert alert-warning mb-6"> <div class="alert alert-warning mb-6">
<span>Are you sure you want to delete <strong>{{ library.name }}</strong>? This action cannot be undone.</span> <span>Are you sure you want to delete <strong>{{ library.name }}</strong>? This action cannot be undone.</span>
</div> </div>
{% if library.workspace_id %}
<div class="alert alert-error mb-6">
<span>
<strong>This Library is managed by Daedalus</strong>
(workspace <code>{{ library.workspace_id }}</code>).
Deleting it here removes its embedded content from Mnemosyne, but the
source files still live in Daedalus — it will be <strong>recreated and
re-embedded on the next Daedalus sync</strong>. Use this to clear an
orphaned Library that is blocking workspace re-registration.
</span>
</div>
{% endif %}
<form method="post"> <form method="post">
{% csrf_token %} {% csrf_token %}
<div class="flex gap-2"> <div class="flex gap-2">

View File

@@ -0,0 +1,61 @@
"""Tests for the source-bucket registry resolution."""
from unittest.mock import MagicMock, patch
from django.test import SimpleTestCase, override_settings
from library.services import source_s3
_REGISTRY = {
"daedalus": {
"endpoint_url": "https://nyx.test:8555",
"access_key_id": "dae_key",
"secret_access_key": "dae_secret",
"bucket_name": "daedalus",
"region_name": "us-east-1",
"use_ssl": True,
"verify": True,
},
"spelunker": {
"endpoint_url": "https://nyx.test:8555",
"access_key_id": "spel_key",
"secret_access_key": "spel_secret",
"bucket_name": "spelunker",
"region_name": "us-east-1",
"use_ssl": True,
"verify": True,
},
}
@override_settings(SOURCE_S3_BUCKETS=_REGISTRY)
class SourceConfigTest(SimpleTestCase):
def test_known_source(self):
cfg = source_s3._source_config("spelunker")
self.assertEqual(cfg["bucket_name"], "spelunker")
self.assertEqual(cfg["access_key_id"], "spel_key")
def test_unknown_source_falls_back_to_daedalus(self):
cfg = source_s3._source_config("totally-unknown")
self.assertEqual(cfg["bucket_name"], "daedalus")
def test_blank_source_falls_back_to_daedalus(self):
cfg = source_s3._source_config("")
self.assertEqual(cfg["bucket_name"], "daedalus")
@patch("library.services.source_s3.boto3.client")
def test_fetch_uses_source_bucket(self, mock_boto_client):
client = MagicMock()
body = MagicMock()
body.read.return_value = b"file-bytes"
client.get_object.return_value = {"Body": body}
mock_boto_client.return_value = client
data = source_s3.fetch_from_source("spelunker", "3/readme.md")
self.assertEqual(data, b"file-bytes")
client.get_object.assert_called_once_with(
Bucket="spelunker", Key="3/readme.md"
)
# boto client built with the spelunker source credentials
_, kwargs = mock_boto_client.call_args
self.assertEqual(kwargs["aws_access_key_id"], "spel_key")

View File

@@ -319,20 +319,20 @@ def library_delete(request, uid):
messages.error(request, f"Library not found: {e}") messages.error(request, f"Library not found: {e}")
return redirect("library:library-list") return redirect("library:library-list")
# Daedalus owns the lifecycle of workspace-scoped libraries — they can # Daedalus owns the lifecycle of workspace-scoped libraries. Deleting one
# only be deleted via DELETE /library/api/workspaces/{workspace_id}/. # here is allowed but discouraged: the confirm page warns that Daedalus
# Block the human delete path so a stray click can't desync state. # still holds the source content and will recreate + re-embed it on the
if lib.workspace_id: # next sync. The risk is low (no data loss — only re-embedding cost), and
messages.error( # this is the supported escape hatch for clearing an orphaned Library that
request, # blocks workspace re-registration.
f'"{lib.name}" is managed by Daedalus workspace '
f"{lib.workspace_id}. Delete it from Daedalus, not here.",
)
return redirect("library:library-detail", uid=uid)
if request.method == "POST": if request.method == "POST":
name = lib.name name = lib.name
lib.delete() # Use the shared cascade so child nodes (Collections/Items/Chunks/
# Images) and orphan Concepts are removed too — a bare lib.delete()
# would leak them.
from .services.library_delete import delete_library_cascade
delete_library_cascade(lib)
messages.success(request, f'Library "{name}" deleted.') messages.success(request, f'Library "{name}" deleted.')
return redirect("library:library-list") return redirect("library:library-list")
return render(request, "library/library_confirm_delete.html", {"library": lib}) return render(request, "library/library_confirm_delete.html", {"library": lib})
@@ -729,6 +729,16 @@ def embedding_dashboard(request):
except Exception as exc: except Exception as exc:
logger.warning("Could not load system models: %s", exc) logger.warning("Could not load system models: %s", exc)
# Reachability of the system-default models (keyed by role for the
# template). A probe failure must never 500 the dashboard.
context["model_health"] = {}
try:
from library.services.model_health import probe_system_models
context["model_health"] = {r["role"]: r for r in probe_system_models()}
except Exception as exc:
logger.warning("Could not probe system model health: %s", exc)
# Get item status counts and node counts from Neo4j # Get item status counts and node counts from Neo4j
if neo4j_available(): if neo4j_available():
context["neo4j_available"] = True context["neo4j_available"] = True

View File

@@ -241,10 +241,15 @@ else:
}, },
} }
# --- Daedalus S3 (cross-bucket reads for ingest) --- # --- Source S3 buckets (cross-bucket reads for ingest) ---
# Mnemosyne ingests files written to Daedalus's S3 bucket. These vars # Mnemosyne ingests files that upstream systems write to their own S3
# configure read access; the file is copied into AWS_STORAGE_BUCKET_NAME # buckets. Each ingest request carries a `source` identifier; the Celery
# (Mnemosyne's own bucket) by the Celery ingest task before processing. # ingest task looks up that source's read credentials here, fetches the
# file, and copies it into AWS_STORAGE_BUCKET_NAME (Mnemosyne's own bucket)
# before processing. Add a new entry per upstream source.
#
# Backwards compatible: the "daedalus" entry is built from the existing
# DAEDALUS_S3_* env vars.
DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="") DAEDALUS_S3_ENDPOINT_URL = env("DAEDALUS_S3_ENDPOINT_URL", default="")
DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="") DAEDALUS_S3_ACCESS_KEY_ID = env("DAEDALUS_S3_ACCESS_KEY_ID", default="")
DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="") DAEDALUS_S3_SECRET_ACCESS_KEY = env("DAEDALUS_S3_SECRET_ACCESS_KEY", default="")
@@ -253,6 +258,38 @@ DAEDALUS_S3_REGION_NAME = env("DAEDALUS_S3_REGION_NAME", default="us-east-1")
DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False) DAEDALUS_S3_USE_SSL = env.bool("DAEDALUS_S3_USE_SSL", default=False)
DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True) DAEDALUS_S3_VERIFY = env.bool("DAEDALUS_S3_VERIFY", default=True)
# Spelunker writes scraped/extracted documents to its own bucket.
SPELUNKER_S3_ENDPOINT_URL = env("SPELUNKER_S3_ENDPOINT_URL", default="")
SPELUNKER_S3_ACCESS_KEY_ID = env("SPELUNKER_S3_ACCESS_KEY_ID", default="")
SPELUNKER_S3_SECRET_ACCESS_KEY = env("SPELUNKER_S3_SECRET_ACCESS_KEY", default="")
SPELUNKER_S3_BUCKET_NAME = env("SPELUNKER_S3_BUCKET_NAME", default="spelunker")
SPELUNKER_S3_REGION_NAME = env("SPELUNKER_S3_REGION_NAME", default="us-east-1")
SPELUNKER_S3_USE_SSL = env.bool("SPELUNKER_S3_USE_SSL", default=False)
SPELUNKER_S3_VERIFY = env.bool("SPELUNKER_S3_VERIFY", default=True)
# Registry keyed by the ingest `source` field. Unknown/blank sources fall
# back to "daedalus" for backwards compatibility.
SOURCE_S3_BUCKETS = {
"daedalus": {
"endpoint_url": DAEDALUS_S3_ENDPOINT_URL,
"access_key_id": DAEDALUS_S3_ACCESS_KEY_ID,
"secret_access_key": DAEDALUS_S3_SECRET_ACCESS_KEY,
"bucket_name": DAEDALUS_S3_BUCKET_NAME,
"region_name": DAEDALUS_S3_REGION_NAME,
"use_ssl": DAEDALUS_S3_USE_SSL,
"verify": DAEDALUS_S3_VERIFY,
},
"spelunker": {
"endpoint_url": SPELUNKER_S3_ENDPOINT_URL,
"access_key_id": SPELUNKER_S3_ACCESS_KEY_ID,
"secret_access_key": SPELUNKER_S3_SECRET_ACCESS_KEY,
"bucket_name": SPELUNKER_S3_BUCKET_NAME,
"region_name": SPELUNKER_S3_REGION_NAME,
"use_ssl": SPELUNKER_S3_USE_SSL,
"verify": SPELUNKER_S3_VERIFY,
},
}
# --- Celery / RabbitMQ --- # --- Celery / RabbitMQ ---
CELERY_BROKER_URL = env( CELERY_BROKER_URL = env(
"CELERY_BROKER_URL", "CELERY_BROKER_URL",

View File

@@ -32,6 +32,19 @@
# resolution at startup and returns 502 after `docker compose restart app`. # resolution at startup and returns 502 after `docker compose restart app`.
resolver 127.0.0.11 valid=10s; resolver 127.0.0.11 valid=10s;
# Recover the real client IP from X-Forwarded-For (set by HAProxy on Titania)
# before evaluating the RFC1918 allowlists below. nginx runs as a sidecar with
# a published port, so every proxied request arrives via Docker's NAT gateway
# (an RFC1918 address) — without this, the allowlists match that gateway and
# pass ALL external traffic, exposing /metrics and /nginx_status publicly.
# HAProxy's own health checks (e.g. to /healthz) carry no XFF and keep their
# real 10.10.x.x source, so they stay allowed.
set_real_ip_from 10.0.0.0/8;
set_real_ip_from 172.16.0.0/12;
set_real_ip_from 192.168.0.0/16;
real_ip_header X-Forwarded-For;
real_ip_recursive on;
# Preserve X-Forwarded-Proto from the upstream reverse proxy (HAProxy TLS # Preserve X-Forwarded-Proto from the upstream reverse proxy (HAProxy TLS
# termination on Titania); fall back to $scheme only if there's no upstream # termination on Titania); fall back to $scheme only if there's no upstream
# header. Inside the compose network $scheme is always `http` because HAProxy # header. Inside the compose network $scheme is always `http` because HAProxy