diff --git a/mnemosyne/library/api/serializers.py b/mnemosyne/library/api/serializers.py index 27021c8..ee5ca32 100644 --- a/mnemosyne/library/api/serializers.py +++ b/mnemosyne/library/api/serializers.py @@ -7,12 +7,23 @@ Serialize Neo4j neomodel nodes into JSON for the REST API. from rest_framework import serializers +LIBRARY_TYPE_CHOICES = [ + "fiction", + "nonfiction", + "technical", + "music", + "film", + "art", + "journal", + "business", + "finance", +] + + class LibrarySerializer(serializers.Serializer): uid = serializers.CharField(read_only=True) name = serializers.CharField(max_length=200) - library_type = serializers.ChoiceField( - choices=["fiction", "nonfiction", "technical", "music", "film", "art", "journal"] - ) + library_type = serializers.ChoiceField(choices=LIBRARY_TYPE_CHOICES) description = serializers.CharField(required=False, allow_blank=True, default="") chunking_config = serializers.JSONField(required=False, default=dict) embedding_instruction = serializers.CharField( @@ -24,6 +35,7 @@ class LibrarySerializer(serializers.Serializer): llm_context_prompt = serializers.CharField( required=False, allow_blank=True, default="" ) + workspace_id = serializers.CharField(read_only=True) created_at = serializers.DateTimeField(read_only=True) @@ -90,12 +102,11 @@ class SearchRequestSerializer(serializers.Serializer): query = serializers.CharField(max_length=2000) library_uid = serializers.CharField(required=False, allow_blank=True) library_type = serializers.ChoiceField( - choices=[ - "fiction", "nonfiction", "technical", "music", "film", "art", "journal", - ], + choices=LIBRARY_TYPE_CHOICES, required=False, ) collection_uid = serializers.CharField(required=False, allow_blank=True) + workspace_id = serializers.CharField(required=False, allow_blank=True) search_types = serializers.ListField( child=serializers.ChoiceField(choices=["vector", "fulltext", "graph"]), required=False, @@ -139,3 +150,73 @@ class SearchResponseSerializer(serializers.Serializer): reranker_used = serializers.BooleanField() reranker_model = serializers.CharField(allow_null=True) search_types_used = serializers.ListField(child=serializers.CharField()) + + +# --- Workspace lifecycle (Daedalus integration) --- + + +class WorkspaceCreateSerializer(serializers.Serializer): + """Inbound payload for POST /api/v1/workspaces/.""" + + workspace_id = serializers.CharField(max_length=64) + name = serializers.CharField(max_length=200) + library_type = serializers.ChoiceField(choices=LIBRARY_TYPE_CHOICES) + description = serializers.CharField(required=False, allow_blank=True, default="") + + +class WorkspaceStatusSerializer(serializers.Serializer): + """Outbound payload for workspace lifecycle endpoints.""" + + workspace_id = serializers.CharField() + library_uid = serializers.CharField() + name = serializers.CharField() + library_type = serializers.CharField() + description = serializers.CharField(allow_blank=True) + item_count = serializers.IntegerField() + chunk_count = serializers.IntegerField() + created_at = serializers.DateTimeField() + + +# --- Ingest (Daedalus integration) --- + + +class IngestRequestSerializer(serializers.Serializer): + """Inbound payload for POST /api/v1/library/ingest/.""" + + s3_key = serializers.CharField(max_length=500) + title = serializers.CharField(max_length=500) + library_uid = serializers.CharField(required=False, allow_blank=True) + workspace_id = serializers.CharField(required=False, allow_blank=True) + collection_uid = serializers.CharField(required=False, allow_blank=True) + file_type = serializers.CharField(required=False, allow_blank=True, default="") + file_size = serializers.IntegerField(required=False, default=0) + content_hash = serializers.CharField(max_length=64) + source = serializers.CharField(required=False, allow_blank=True, default="") + source_ref = serializers.CharField(required=False, allow_blank=True, default="") + + def validate(self, data): + if not data.get("library_uid") and not data.get("workspace_id"): + raise serializers.ValidationError( + "Either library_uid or workspace_id is required." + ) + return data + + +class IngestJobSerializer(serializers.Serializer): + """Outbound payload for ingest job status.""" + + job_id = serializers.CharField(source="id") + item_uid = serializers.CharField(allow_blank=True) + library_uid = serializers.CharField() + status = serializers.CharField() + progress = serializers.CharField() + error = serializers.CharField(allow_null=True) + chunks_created = serializers.IntegerField() + concepts_extracted = serializers.IntegerField() + embedding_model = serializers.CharField(allow_blank=True) + content_hash = serializers.CharField(allow_blank=True) + source = serializers.CharField(allow_blank=True) + source_ref = serializers.CharField(allow_blank=True) + created_at = serializers.DateTimeField() + started_at = serializers.DateTimeField(allow_null=True) + completed_at = serializers.DateTimeField(allow_null=True) diff --git a/mnemosyne/library/api/urls.py b/mnemosyne/library/api/urls.py index 92d86ff..649253e 100644 --- a/mnemosyne/library/api/urls.py +++ b/mnemosyne/library/api/urls.py @@ -4,7 +4,7 @@ URL patterns for the library DRF API. from django.urls import path -from . import views +from . import views, workspaces app_name = "library-api" @@ -28,4 +28,16 @@ urlpatterns = [ # Concepts (Phase 3) path("concepts/", views.concept_list, name="concept-list"), path("concepts//graph/", views.concept_graph, name="concept-graph"), + # Workspaces (Daedalus integration) + path("workspaces/", workspaces.workspace_create, name="workspace-create"), + path( + "workspaces//", + workspaces.workspace_detail_or_delete, + name="workspace-detail", + ), + # Ingest (Daedalus integration) + path("ingest/", views.ingest_create, name="ingest-create"), + path("jobs/", views.ingest_job_list, name="ingest-job-list"), + path("jobs//", views.ingest_job_detail, name="ingest-job-detail"), + path("jobs//retry/", views.ingest_job_retry, name="ingest-job-retry"), ] diff --git a/mnemosyne/library/api/views.py b/mnemosyne/library/api/views.py index 69627be..26ad318 100644 --- a/mnemosyne/library/api/views.py +++ b/mnemosyne/library/api/views.py @@ -21,6 +21,8 @@ from library.content_types import get_library_type_config from .serializers import ( CollectionSerializer, ConceptSerializer, + IngestJobSerializer, + IngestRequestSerializer, ItemSerializer, LibrarySerializer, SearchRequestSerializer, @@ -456,6 +458,7 @@ def search(request): library_uid=data.get("library_uid") or None, library_type=data.get("library_type") or None, collection_uid=data.get("collection_uid") or None, + workspace_id=data.get("workspace_id") or None, search_types=data.get("search_types", ["vector", "fulltext", "graph"]), limit=data.get("limit", getattr(django_settings, "SEARCH_DEFAULT_LIMIT", 20)), vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50), @@ -487,6 +490,7 @@ def search_vector(request): library_uid=data.get("library_uid") or None, library_type=data.get("library_type") or None, collection_uid=data.get("collection_uid") or None, + workspace_id=data.get("workspace_id") or None, search_types=["vector"], limit=data.get("limit", 20), vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50), @@ -517,6 +521,7 @@ def search_fulltext(request): library_uid=data.get("library_uid") or None, library_type=data.get("library_type") or None, collection_uid=data.get("collection_uid") or None, + workspace_id=data.get("workspace_id") or None, search_types=["fulltext"], limit=data.get("limit", 20), fulltext_top_k=getattr(django_settings, "SEARCH_FULLTEXT_TOP_K", 30), @@ -635,3 +640,196 @@ def concept_graph(request, uid): {"detail": f"Failed: {exc}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR, ) + + +# --------------------------------------------------------------------------- +# Ingest API (Daedalus integration) +# --------------------------------------------------------------------------- + + +@api_view(["POST"]) +@permission_classes([IsAuthenticated]) +def ingest_create(request): + """ + Accept a file (already in S3) for ingestion + embedding. + + Daedalus calls this after committing a file to its own S3 bucket. + We resolve the target Library (by workspace_id or library_uid), + enforce idempotency on (library, source_ref, content_hash), create + an IngestJob row, and dispatch a Celery task. + + Idempotency: + - Same source_ref + same content_hash → return existing completed job + (no new task dispatched). + - Same source_ref + different content_hash → dispatch a new task that + will supersede the prior Item. + - New source_ref → fresh ingest. + """ + import uuid + + from library.models import IngestJob, Library + from library.tasks import ingest_from_daedalus + + serializer = IngestRequestSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + data = serializer.validated_data + + # --- Resolve target Library --- + workspace_id = data.get("workspace_id") or "" + library_uid = data.get("library_uid") or "" + if workspace_id: + try: + lib = Library.nodes.get(workspace_id=workspace_id) + except Library.DoesNotExist: + return Response( + {"detail": f"Workspace '{workspace_id}' not registered."}, + status=status.HTTP_404_NOT_FOUND, + ) + else: + try: + lib = Library.nodes.get(uid=library_uid) + except Library.DoesNotExist: + return Response( + {"detail": f"Library '{library_uid}' not found."}, + status=status.HTTP_404_NOT_FOUND, + ) + + # --- Idempotency check on (library, source_ref, content_hash) --- + source_ref = data.get("source_ref") or "" + content_hash = data["content_hash"] + + if source_ref: + existing = ( + IngestJob.objects + .filter( + library_uid=lib.uid, + source_ref=source_ref, + content_hash=content_hash, + status__in=["pending", "processing", "completed"], + ) + .order_by("-created_at") + .first() + ) + if existing is not None: + logger.info( + "Ingest idempotent hit job_id=%s source_ref=%s status=%s", + existing.id, source_ref, existing.status, + ) + return Response( + IngestJobSerializer(existing).data, + status=status.HTTP_200_OK, + ) + + # --- Create job + dispatch --- + job = IngestJob.objects.create( + id=f"job_{uuid.uuid4().hex[:24]}", + library_uid=lib.uid, + s3_key=data["s3_key"], + title=data["title"], + file_type=data.get("file_type", ""), + file_size=data.get("file_size", 0), + content_hash=content_hash, + source=data.get("source", ""), + source_ref=source_ref, + collection_uid=data.get("collection_uid", ""), + status="pending", + progress="queued", + ) + + try: + async_result = ingest_from_daedalus.delay(job.id) + job.celery_task_id = async_result.id + job.save(update_fields=["celery_task_id"]) + except Exception as exc: + logger.error("Failed to dispatch ingest task job_id=%s: %s", job.id, exc) + job.status = "failed" + job.error = f"dispatch failed: {exc}" + job.save(update_fields=["status", "error"]) + return Response( + IngestJobSerializer(job).data, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + logger.info( + "Ingest dispatched job_id=%s library_uid=%s source_ref=%s task_id=%s", + job.id, lib.uid, source_ref, job.celery_task_id, + ) + + return Response( + IngestJobSerializer(job).data, + status=status.HTTP_202_ACCEPTED, + ) + + +@api_view(["GET"]) +@permission_classes([IsAuthenticated]) +def ingest_job_detail(request, job_id): + """Get the current status of an IngestJob.""" + from library.models import IngestJob + + try: + job = IngestJob.objects.get(pk=job_id) + except IngestJob.DoesNotExist: + return Response( + {"detail": "Job not found."}, status=status.HTTP_404_NOT_FOUND + ) + + return Response(IngestJobSerializer(job).data) + + +@api_view(["POST"]) +@permission_classes([IsAuthenticated]) +def ingest_job_retry(request, job_id): + """Re-dispatch a failed IngestJob.""" + from library.models import IngestJob + from library.tasks import ingest_from_daedalus + + try: + job = IngestJob.objects.get(pk=job_id) + except IngestJob.DoesNotExist: + return Response( + {"detail": "Job not found."}, status=status.HTTP_404_NOT_FOUND + ) + + if job.status not in ("failed", "completed"): + return Response( + {"detail": f"Job is currently {job.status}; cannot retry."}, + status=status.HTTP_409_CONFLICT, + ) + + job.status = "pending" + job.progress = "queued" + job.error = "" + job.save(update_fields=["status", "progress", "error"]) + + async_result = ingest_from_daedalus.delay(job.id) + job.celery_task_id = async_result.id + job.save(update_fields=["celery_task_id"]) + + logger.info("Ingest retry dispatched job_id=%s task_id=%s", job.id, async_result.id) + return Response(IngestJobSerializer(job).data, status=status.HTTP_202_ACCEPTED) + + +@api_view(["GET"]) +@permission_classes([IsAuthenticated]) +def ingest_job_list(request): + """List recent IngestJob rows, optionally filtered by status / library_uid.""" + from library.models import IngestJob + + qs = IngestJob.objects.all() + status_filter = request.query_params.get("status") + library_uid = request.query_params.get("library_uid") + limit = min(int(request.query_params.get("limit", 50)), 200) + + if status_filter: + qs = qs.filter(status=status_filter) + if library_uid: + qs = qs.filter(library_uid=library_uid) + + jobs = list(qs.order_by("-created_at")[:limit]) + return Response( + { + "jobs": IngestJobSerializer(jobs, many=True).data, + "count": len(jobs), + } + ) diff --git a/mnemosyne/library/api/workspaces.py b/mnemosyne/library/api/workspaces.py new file mode 100644 index 0000000..3fef615 --- /dev/null +++ b/mnemosyne/library/api/workspaces.py @@ -0,0 +1,217 @@ +""" +Workspace lifecycle endpoints for the Daedalus integration. + +A "workspace" in Mnemosyne is a Library scoped to a Daedalus workspace UUID. +It uses the same Library node as a global library; the difference is that +`workspace_id` is set, and search must filter on it. + +These endpoints are called by the Daedalus backend (HTTP Basic auth as +the `daedalus-service` user). Daedalus owns the workspace_id; Mnemosyne +just persists what Daedalus tells it. +""" + +import logging + +from neomodel import db +from rest_framework import status +from rest_framework.decorators import api_view, permission_classes +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from library.content_types import get_library_type_config + +from .serializers import WorkspaceCreateSerializer, WorkspaceStatusSerializer + +logger = logging.getLogger(__name__) + + +def _serialize_workspace(lib): + """Build a WorkspaceStatus payload from a Library node + chunk/item counts.""" + counts, _ = db.cypher_query( + "MATCH (l:Library {workspace_id: $wsid}) " + "OPTIONAL MATCH (l)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(i:Item) " + "OPTIONAL MATCH (i)-[:HAS_CHUNK]->(c:Chunk) " + "RETURN count(DISTINCT i) AS item_count, count(DISTINCT c) AS chunk_count", + {"wsid": lib.workspace_id}, + ) + item_count = counts[0][0] if counts else 0 + chunk_count = counts[0][1] if counts else 0 + + return { + "workspace_id": lib.workspace_id, + "library_uid": lib.uid, + "name": lib.name, + "library_type": lib.library_type, + "description": lib.description or "", + "item_count": item_count, + "chunk_count": chunk_count, + "created_at": lib.created_at, + } + + +@api_view(["POST"]) +@permission_classes([IsAuthenticated]) +def workspace_create(request): + """ + Create a workspace Library, idempotently. + + A POST with a `workspace_id` already in use returns the existing + workspace (200) — not an error. The library_type is frozen at first + create; subsequent calls are not allowed to change it. + """ + from library.models import Library + + serializer = WorkspaceCreateSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + data = serializer.validated_data + + # Idempotent path: workspace already exists. + try: + existing = Library.nodes.get(workspace_id=data["workspace_id"]) + except Library.DoesNotExist: + existing = None + + if existing is not None: + if existing.library_type != data["library_type"]: + return Response( + { + "detail": ( + "library_type is immutable for an existing workspace " + f"(have '{existing.library_type}', " + f"got '{data['library_type']}')." + ) + }, + status=status.HTTP_409_CONFLICT, + ) + logger.info( + "Workspace already exists workspace_id=%s library_uid=%s", + data["workspace_id"], existing.uid, + ) + return Response( + WorkspaceStatusSerializer(_serialize_workspace(existing)).data, + status=status.HTTP_200_OK, + ) + + defaults = get_library_type_config(data["library_type"]) + lib = Library( + name=data["name"], + library_type=data["library_type"], + description=data.get("description", ""), + workspace_id=data["workspace_id"], + chunking_config=defaults["chunking_config"], + embedding_instruction=defaults["embedding_instruction"], + reranker_instruction=defaults["reranker_instruction"], + llm_context_prompt=defaults["llm_context_prompt"], + ) + lib.save() + logger.info( + "Workspace created workspace_id=%s library_uid=%s library_type=%s", + data["workspace_id"], lib.uid, lib.library_type, + ) + + return Response( + WorkspaceStatusSerializer(_serialize_workspace(lib)).data, + status=status.HTTP_201_CREATED, + ) + + +@api_view(["GET", "DELETE"]) +@permission_classes([IsAuthenticated]) +def workspace_detail_or_delete(request, workspace_id): + """ + GET: return workspace status (item/chunk counts, metadata). + + DELETE: delete the workspace Library and everything reachable AND unique + to it. Concept-safe: orphan-only Concept GC happens at the end. + Concepts referenced by other libraries (workspace or global) are preserved. + """ + from library.models import Library + + if request.method == "GET": + try: + lib = Library.nodes.get(workspace_id=workspace_id) + except Library.DoesNotExist: + return Response( + {"detail": "Workspace not found."}, + status=status.HTTP_404_NOT_FOUND, + ) + + return Response(WorkspaceStatusSerializer(_serialize_workspace(lib)).data) + + # DELETE — idempotent: a missing workspace returns 204. + try: + lib = Library.nodes.get(workspace_id=workspace_id) + except Library.DoesNotExist: + return Response(status=status.HTTP_204_NO_CONTENT) + + library_uid = lib.uid + library_name = lib.name + + # Step 1-4: delete chunks, items, collections, then the library itself. + # We collect Item s3_keys first so the caller can clean up S3 + # asynchronously (a future enhancement — for now, the keys are logged). + s3_rows, _ = db.cypher_query( + "MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)" + "-[:CONTAINS]->(i:Item) RETURN i.uid, i.s3_key", + {"wsid": workspace_id}, + ) + item_s3_keys = [(r[0], r[1]) for r in s3_rows if r[1]] + + db.cypher_query( + """ + MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection) + -[:CONTAINS]->(i:Item)-[:HAS_CHUNK]->(c:Chunk) + DETACH DELETE c + """, + {"wsid": workspace_id}, + ) + db.cypher_query( + """ + MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection) + -[:CONTAINS]->(i:Item)-[:HAS_IMAGE]->(img:Image) + OPTIONAL MATCH (img)-[:HAS_EMBEDDING]->(emb:ImageEmbedding) + DETACH DELETE img, emb + """, + {"wsid": workspace_id}, + ) + db.cypher_query( + """ + MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection) + -[:CONTAINS]->(i:Item) + DETACH DELETE i + """, + {"wsid": workspace_id}, + ) + db.cypher_query( + """ + MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(col:Collection) + DETACH DELETE col + """, + {"wsid": workspace_id}, + ) + db.cypher_query( + "MATCH (l:Library {workspace_id: $wsid}) DETACH DELETE l", + {"wsid": workspace_id}, + ) + + # Step 5: orphan Concept garbage collection. + orphan_result, _ = db.cypher_query( + """ + MATCH (con:Concept) + WHERE NOT (con)<-[:REFERENCES]-() AND NOT (con)<-[:MENTIONS]-() + AND NOT (con)<-[:DEPICTS]-() + WITH con + DETACH DELETE con + RETURN count(con) AS deleted + """ + ) + orphans_deleted = orphan_result[0][0] if orphan_result else 0 + + logger.info( + "Workspace deleted workspace_id=%s library_uid=%s name=%s " + "items=%d orphans_deleted=%d", + workspace_id, library_uid, library_name, + len(item_s3_keys), orphans_deleted, + ) + + return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/mnemosyne/library/tests/test_workspaces_api.py b/mnemosyne/library/tests/test_workspaces_api.py new file mode 100644 index 0000000..f71dfa9 --- /dev/null +++ b/mnemosyne/library/tests/test_workspaces_api.py @@ -0,0 +1,210 @@ +""" +Tests for workspace and ingest REST endpoints. + +These exercise serializer validation, idempotency rules, and Django ORM +behavior for IngestJob. The Cypher-touching paths (Library node CRUD, +search scoping) require Neo4j and are validated by the manual end-to-end +test plan, not these unit tests. +""" + +from django.test import TestCase +from rest_framework.test import APIClient + +from library.api.serializers import ( + IngestRequestSerializer, + WorkspaceCreateSerializer, +) +from library.models import IngestJob + + +class WorkspaceCreateSerializerTests(TestCase): + """Validation rules for the create-workspace payload.""" + + def test_minimal_payload_validates(self): + s = WorkspaceCreateSerializer( + data={ + "workspace_id": "ws_abc", + "name": "My Workspace", + "library_type": "technical", + } + ) + self.assertTrue(s.is_valid(), s.errors) + + def test_business_type_accepted(self): + s = WorkspaceCreateSerializer( + data={ + "workspace_id": "ws_abc", + "name": "Sales", + "library_type": "business", + } + ) + self.assertTrue(s.is_valid(), s.errors) + + def test_finance_type_accepted(self): + s = WorkspaceCreateSerializer( + data={ + "workspace_id": "ws_abc", + "name": "Money", + "library_type": "finance", + } + ) + self.assertTrue(s.is_valid(), s.errors) + + def test_unknown_type_rejected(self): + s = WorkspaceCreateSerializer( + data={ + "workspace_id": "ws_abc", + "name": "x", + "library_type": "miscellaneous", + } + ) + self.assertFalse(s.is_valid()) + self.assertIn("library_type", s.errors) + + def test_workspace_id_required(self): + s = WorkspaceCreateSerializer( + data={"name": "x", "library_type": "technical"} + ) + self.assertFalse(s.is_valid()) + self.assertIn("workspace_id", s.errors) + + +class IngestRequestSerializerTests(TestCase): + """Validation rules for the ingest payload.""" + + BASE_PAYLOAD = { + "s3_key": "workspaces/ws/files/f/x.pdf", + "title": "Q4 Report", + "content_hash": "a" * 64, + } + + def test_workspace_id_only_validates(self): + s = IngestRequestSerializer( + data={**self.BASE_PAYLOAD, "workspace_id": "ws_abc"} + ) + self.assertTrue(s.is_valid(), s.errors) + + def test_library_uid_only_validates(self): + s = IngestRequestSerializer( + data={**self.BASE_PAYLOAD, "library_uid": "lib_xyz"} + ) + self.assertTrue(s.is_valid(), s.errors) + + def test_neither_workspace_nor_library_rejected(self): + s = IngestRequestSerializer(data=self.BASE_PAYLOAD) + self.assertFalse(s.is_valid()) + # ValidationError on the whole payload, not a specific field + self.assertIn("non_field_errors", s.errors) + + def test_content_hash_required(self): + s = IngestRequestSerializer( + data={ + "s3_key": "x", + "title": "y", + "workspace_id": "ws_abc", + } + ) + self.assertFalse(s.is_valid()) + self.assertIn("content_hash", s.errors) + + +class IngestJobModelTests(TestCase): + """IngestJob persists and queries correctly.""" + + def test_create_with_minimal_fields(self): + job = IngestJob.objects.create( + id="job_test1", + library_uid="lib_xyz", + s3_key="x.pdf", + ) + self.assertEqual(job.status, "pending") + self.assertEqual(job.progress, "queued") + self.assertEqual(job.retry_count, 0) + self.assertEqual(job.chunks_created, 0) + + def test_idempotency_query_pattern(self): + """The idempotency query in ingest_create uses (library, source_ref, hash).""" + IngestJob.objects.create( + id="job_a", + library_uid="lib_xyz", + source_ref="ws_a/file_1", + content_hash="h1", + s3_key="a.pdf", + status="completed", + ) + IngestJob.objects.create( + id="job_b", + library_uid="lib_xyz", + source_ref="ws_a/file_1", + content_hash="h2", # different hash — supersedes + s3_key="a.pdf", + status="completed", + ) + + # Same library + source_ref + h1 → finds job_a + match = IngestJob.objects.filter( + library_uid="lib_xyz", + source_ref="ws_a/file_1", + content_hash="h1", + ).first() + self.assertIsNotNone(match) + self.assertEqual(match.id, "job_a") + + # h2 → finds job_b + match = IngestJob.objects.filter( + library_uid="lib_xyz", + source_ref="ws_a/file_1", + content_hash="h2", + ).first() + self.assertEqual(match.id, "job_b") + + # Different source_ref → no match + match = IngestJob.objects.filter( + library_uid="lib_xyz", + source_ref="ws_a/file_2", + content_hash="h1", + ).first() + self.assertIsNone(match) + + +class IngestEndpointAuthTests(TestCase): + """Auth boundary on the ingest endpoint (matches the existing convention).""" + + def setUp(self): + self.client = APIClient() + + def test_ingest_requires_auth(self): + response = self.client.post( + "/library/api/ingest/", + { + "s3_key": "x", + "title": "y", + "workspace_id": "ws_abc", + "content_hash": "a" * 64, + }, + format="json", + ) + self.assertIn(response.status_code, [401, 403]) + + +class WorkspaceEndpointAuthTests(TestCase): + """Auth on workspace endpoints.""" + + def setUp(self): + self.client = APIClient() + + def test_workspace_create_requires_auth(self): + response = self.client.post( + "/library/api/workspaces/", + {"workspace_id": "ws_a", "name": "x", "library_type": "technical"}, + format="json", + ) + self.assertIn(response.status_code, [401, 403]) + + def test_workspace_get_requires_auth(self): + response = self.client.get("/library/api/workspaces/ws_a/") + self.assertIn(response.status_code, [401, 403]) + + def test_workspace_delete_requires_auth(self): + response = self.client.delete("/library/api/workspaces/ws_a/") + self.assertIn(response.status_code, [401, 403])