feat(api): add workspace + ingest REST endpoints for Daedalus

Adds the REST API surface that Daedalus calls to manage workspace
lifecycle and dispatch file ingestion. All endpoints under /library/api/:

  POST   /workspaces/                   create workspace (idempotent on
                                        workspace_id; library_type frozen)
  GET    /workspaces/{workspace_id}/    workspace status with item/chunk
                                        counts
  DELETE /workspaces/{workspace_id}/    delete workspace + reachable
                                        content; concept-safe (orphan-only
                                        Concept GC; concepts referenced
                                        elsewhere are preserved)

  POST   /ingest/                       queue a file for ingest. Idempotent
                                        on (library, source_ref, hash):
                                        same triple → return existing job;
                                        new hash → supersede.
  GET    /jobs/{job_id}/                poll job status
  POST   /jobs/{job_id}/retry/          re-dispatch a failed job
  GET    /jobs/?status=&library_uid=    list recent jobs

Workspace-Library lookup uses the unique workspace_id index added in the
schema commit. Concept GC runs as a separate transaction after item/chunk
delete so partial failures don't leave the global graph corrupted.

Tests cover serializer validation, IngestJob ORM behavior, the
(library, source_ref, hash) idempotency query pattern, and auth
boundaries on every new endpoint. Cypher correctness is validated by
manual end-to-end testing — no live Neo4j in unit tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-29 06:27:08 -04:00
parent c485a8560c
commit f2af28d96d
5 changed files with 725 additions and 7 deletions

View File

@@ -7,12 +7,23 @@ Serialize Neo4j neomodel nodes into JSON for the REST API.
from rest_framework import serializers from rest_framework import serializers
LIBRARY_TYPE_CHOICES = [
"fiction",
"nonfiction",
"technical",
"music",
"film",
"art",
"journal",
"business",
"finance",
]
class LibrarySerializer(serializers.Serializer): class LibrarySerializer(serializers.Serializer):
uid = serializers.CharField(read_only=True) uid = serializers.CharField(read_only=True)
name = serializers.CharField(max_length=200) name = serializers.CharField(max_length=200)
library_type = serializers.ChoiceField( library_type = serializers.ChoiceField(choices=LIBRARY_TYPE_CHOICES)
choices=["fiction", "nonfiction", "technical", "music", "film", "art", "journal"]
)
description = serializers.CharField(required=False, allow_blank=True, default="") description = serializers.CharField(required=False, allow_blank=True, default="")
chunking_config = serializers.JSONField(required=False, default=dict) chunking_config = serializers.JSONField(required=False, default=dict)
embedding_instruction = serializers.CharField( embedding_instruction = serializers.CharField(
@@ -24,6 +35,7 @@ class LibrarySerializer(serializers.Serializer):
llm_context_prompt = serializers.CharField( llm_context_prompt = serializers.CharField(
required=False, allow_blank=True, default="" required=False, allow_blank=True, default=""
) )
workspace_id = serializers.CharField(read_only=True)
created_at = serializers.DateTimeField(read_only=True) created_at = serializers.DateTimeField(read_only=True)
@@ -90,12 +102,11 @@ class SearchRequestSerializer(serializers.Serializer):
query = serializers.CharField(max_length=2000) query = serializers.CharField(max_length=2000)
library_uid = serializers.CharField(required=False, allow_blank=True) library_uid = serializers.CharField(required=False, allow_blank=True)
library_type = serializers.ChoiceField( library_type = serializers.ChoiceField(
choices=[ choices=LIBRARY_TYPE_CHOICES,
"fiction", "nonfiction", "technical", "music", "film", "art", "journal",
],
required=False, required=False,
) )
collection_uid = serializers.CharField(required=False, allow_blank=True) collection_uid = serializers.CharField(required=False, allow_blank=True)
workspace_id = serializers.CharField(required=False, allow_blank=True)
search_types = serializers.ListField( search_types = serializers.ListField(
child=serializers.ChoiceField(choices=["vector", "fulltext", "graph"]), child=serializers.ChoiceField(choices=["vector", "fulltext", "graph"]),
required=False, required=False,
@@ -139,3 +150,73 @@ class SearchResponseSerializer(serializers.Serializer):
reranker_used = serializers.BooleanField() reranker_used = serializers.BooleanField()
reranker_model = serializers.CharField(allow_null=True) reranker_model = serializers.CharField(allow_null=True)
search_types_used = serializers.ListField(child=serializers.CharField()) search_types_used = serializers.ListField(child=serializers.CharField())
# --- Workspace lifecycle (Daedalus integration) ---
class WorkspaceCreateSerializer(serializers.Serializer):
"""Inbound payload for POST /api/v1/workspaces/."""
workspace_id = serializers.CharField(max_length=64)
name = serializers.CharField(max_length=200)
library_type = serializers.ChoiceField(choices=LIBRARY_TYPE_CHOICES)
description = serializers.CharField(required=False, allow_blank=True, default="")
class WorkspaceStatusSerializer(serializers.Serializer):
"""Outbound payload for workspace lifecycle endpoints."""
workspace_id = serializers.CharField()
library_uid = serializers.CharField()
name = serializers.CharField()
library_type = serializers.CharField()
description = serializers.CharField(allow_blank=True)
item_count = serializers.IntegerField()
chunk_count = serializers.IntegerField()
created_at = serializers.DateTimeField()
# --- Ingest (Daedalus integration) ---
class IngestRequestSerializer(serializers.Serializer):
"""Inbound payload for POST /api/v1/library/ingest/."""
s3_key = serializers.CharField(max_length=500)
title = serializers.CharField(max_length=500)
library_uid = serializers.CharField(required=False, allow_blank=True)
workspace_id = serializers.CharField(required=False, allow_blank=True)
collection_uid = serializers.CharField(required=False, allow_blank=True)
file_type = serializers.CharField(required=False, allow_blank=True, default="")
file_size = serializers.IntegerField(required=False, default=0)
content_hash = serializers.CharField(max_length=64)
source = serializers.CharField(required=False, allow_blank=True, default="")
source_ref = serializers.CharField(required=False, allow_blank=True, default="")
def validate(self, data):
if not data.get("library_uid") and not data.get("workspace_id"):
raise serializers.ValidationError(
"Either library_uid or workspace_id is required."
)
return data
class IngestJobSerializer(serializers.Serializer):
"""Outbound payload for ingest job status."""
job_id = serializers.CharField(source="id")
item_uid = serializers.CharField(allow_blank=True)
library_uid = serializers.CharField()
status = serializers.CharField()
progress = serializers.CharField()
error = serializers.CharField(allow_null=True)
chunks_created = serializers.IntegerField()
concepts_extracted = serializers.IntegerField()
embedding_model = serializers.CharField(allow_blank=True)
content_hash = serializers.CharField(allow_blank=True)
source = serializers.CharField(allow_blank=True)
source_ref = serializers.CharField(allow_blank=True)
created_at = serializers.DateTimeField()
started_at = serializers.DateTimeField(allow_null=True)
completed_at = serializers.DateTimeField(allow_null=True)

View File

@@ -4,7 +4,7 @@ URL patterns for the library DRF API.
from django.urls import path from django.urls import path
from . import views from . import views, workspaces
app_name = "library-api" app_name = "library-api"
@@ -28,4 +28,16 @@ urlpatterns = [
# Concepts (Phase 3) # Concepts (Phase 3)
path("concepts/", views.concept_list, name="concept-list"), path("concepts/", views.concept_list, name="concept-list"),
path("concepts/<str:uid>/graph/", views.concept_graph, name="concept-graph"), path("concepts/<str:uid>/graph/", views.concept_graph, name="concept-graph"),
# Workspaces (Daedalus integration)
path("workspaces/", workspaces.workspace_create, name="workspace-create"),
path(
"workspaces/<str:workspace_id>/",
workspaces.workspace_detail_or_delete,
name="workspace-detail",
),
# Ingest (Daedalus integration)
path("ingest/", views.ingest_create, name="ingest-create"),
path("jobs/", views.ingest_job_list, name="ingest-job-list"),
path("jobs/<str:job_id>/", views.ingest_job_detail, name="ingest-job-detail"),
path("jobs/<str:job_id>/retry/", views.ingest_job_retry, name="ingest-job-retry"),
] ]

View File

@@ -21,6 +21,8 @@ from library.content_types import get_library_type_config
from .serializers import ( from .serializers import (
CollectionSerializer, CollectionSerializer,
ConceptSerializer, ConceptSerializer,
IngestJobSerializer,
IngestRequestSerializer,
ItemSerializer, ItemSerializer,
LibrarySerializer, LibrarySerializer,
SearchRequestSerializer, SearchRequestSerializer,
@@ -456,6 +458,7 @@ def search(request):
library_uid=data.get("library_uid") or None, library_uid=data.get("library_uid") or None,
library_type=data.get("library_type") or None, library_type=data.get("library_type") or None,
collection_uid=data.get("collection_uid") or None, collection_uid=data.get("collection_uid") or None,
workspace_id=data.get("workspace_id") or None,
search_types=data.get("search_types", ["vector", "fulltext", "graph"]), search_types=data.get("search_types", ["vector", "fulltext", "graph"]),
limit=data.get("limit", getattr(django_settings, "SEARCH_DEFAULT_LIMIT", 20)), limit=data.get("limit", getattr(django_settings, "SEARCH_DEFAULT_LIMIT", 20)),
vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50), vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50),
@@ -487,6 +490,7 @@ def search_vector(request):
library_uid=data.get("library_uid") or None, library_uid=data.get("library_uid") or None,
library_type=data.get("library_type") or None, library_type=data.get("library_type") or None,
collection_uid=data.get("collection_uid") or None, collection_uid=data.get("collection_uid") or None,
workspace_id=data.get("workspace_id") or None,
search_types=["vector"], search_types=["vector"],
limit=data.get("limit", 20), limit=data.get("limit", 20),
vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50), vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50),
@@ -517,6 +521,7 @@ def search_fulltext(request):
library_uid=data.get("library_uid") or None, library_uid=data.get("library_uid") or None,
library_type=data.get("library_type") or None, library_type=data.get("library_type") or None,
collection_uid=data.get("collection_uid") or None, collection_uid=data.get("collection_uid") or None,
workspace_id=data.get("workspace_id") or None,
search_types=["fulltext"], search_types=["fulltext"],
limit=data.get("limit", 20), limit=data.get("limit", 20),
fulltext_top_k=getattr(django_settings, "SEARCH_FULLTEXT_TOP_K", 30), fulltext_top_k=getattr(django_settings, "SEARCH_FULLTEXT_TOP_K", 30),
@@ -635,3 +640,196 @@ def concept_graph(request, uid):
{"detail": f"Failed: {exc}"}, {"detail": f"Failed: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR, status=status.HTTP_500_INTERNAL_SERVER_ERROR,
) )
# ---------------------------------------------------------------------------
# Ingest API (Daedalus integration)
# ---------------------------------------------------------------------------
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def ingest_create(request):
"""
Accept a file (already in S3) for ingestion + embedding.
Daedalus calls this after committing a file to its own S3 bucket.
We resolve the target Library (by workspace_id or library_uid),
enforce idempotency on (library, source_ref, content_hash), create
an IngestJob row, and dispatch a Celery task.
Idempotency:
- Same source_ref + same content_hash → return existing completed job
(no new task dispatched).
- Same source_ref + different content_hash → dispatch a new task that
will supersede the prior Item.
- New source_ref → fresh ingest.
"""
import uuid
from library.models import IngestJob, Library
from library.tasks import ingest_from_daedalus
serializer = IngestRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# --- Resolve target Library ---
workspace_id = data.get("workspace_id") or ""
library_uid = data.get("library_uid") or ""
if workspace_id:
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
return Response(
{"detail": f"Workspace '{workspace_id}' not registered."},
status=status.HTTP_404_NOT_FOUND,
)
else:
try:
lib = Library.nodes.get(uid=library_uid)
except Library.DoesNotExist:
return Response(
{"detail": f"Library '{library_uid}' not found."},
status=status.HTTP_404_NOT_FOUND,
)
# --- Idempotency check on (library, source_ref, content_hash) ---
source_ref = data.get("source_ref") or ""
content_hash = data["content_hash"]
if source_ref:
existing = (
IngestJob.objects
.filter(
library_uid=lib.uid,
source_ref=source_ref,
content_hash=content_hash,
status__in=["pending", "processing", "completed"],
)
.order_by("-created_at")
.first()
)
if existing is not None:
logger.info(
"Ingest idempotent hit job_id=%s source_ref=%s status=%s",
existing.id, source_ref, existing.status,
)
return Response(
IngestJobSerializer(existing).data,
status=status.HTTP_200_OK,
)
# --- Create job + dispatch ---
job = IngestJob.objects.create(
id=f"job_{uuid.uuid4().hex[:24]}",
library_uid=lib.uid,
s3_key=data["s3_key"],
title=data["title"],
file_type=data.get("file_type", ""),
file_size=data.get("file_size", 0),
content_hash=content_hash,
source=data.get("source", ""),
source_ref=source_ref,
collection_uid=data.get("collection_uid", ""),
status="pending",
progress="queued",
)
try:
async_result = ingest_from_daedalus.delay(job.id)
job.celery_task_id = async_result.id
job.save(update_fields=["celery_task_id"])
except Exception as exc:
logger.error("Failed to dispatch ingest task job_id=%s: %s", job.id, exc)
job.status = "failed"
job.error = f"dispatch failed: {exc}"
job.save(update_fields=["status", "error"])
return Response(
IngestJobSerializer(job).data,
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
logger.info(
"Ingest dispatched job_id=%s library_uid=%s source_ref=%s task_id=%s",
job.id, lib.uid, source_ref, job.celery_task_id,
)
return Response(
IngestJobSerializer(job).data,
status=status.HTTP_202_ACCEPTED,
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def ingest_job_detail(request, job_id):
"""Get the current status of an IngestJob."""
from library.models import IngestJob
try:
job = IngestJob.objects.get(pk=job_id)
except IngestJob.DoesNotExist:
return Response(
{"detail": "Job not found."}, status=status.HTTP_404_NOT_FOUND
)
return Response(IngestJobSerializer(job).data)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def ingest_job_retry(request, job_id):
"""Re-dispatch a failed IngestJob."""
from library.models import IngestJob
from library.tasks import ingest_from_daedalus
try:
job = IngestJob.objects.get(pk=job_id)
except IngestJob.DoesNotExist:
return Response(
{"detail": "Job not found."}, status=status.HTTP_404_NOT_FOUND
)
if job.status not in ("failed", "completed"):
return Response(
{"detail": f"Job is currently {job.status}; cannot retry."},
status=status.HTTP_409_CONFLICT,
)
job.status = "pending"
job.progress = "queued"
job.error = ""
job.save(update_fields=["status", "progress", "error"])
async_result = ingest_from_daedalus.delay(job.id)
job.celery_task_id = async_result.id
job.save(update_fields=["celery_task_id"])
logger.info("Ingest retry dispatched job_id=%s task_id=%s", job.id, async_result.id)
return Response(IngestJobSerializer(job).data, status=status.HTTP_202_ACCEPTED)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def ingest_job_list(request):
"""List recent IngestJob rows, optionally filtered by status / library_uid."""
from library.models import IngestJob
qs = IngestJob.objects.all()
status_filter = request.query_params.get("status")
library_uid = request.query_params.get("library_uid")
limit = min(int(request.query_params.get("limit", 50)), 200)
if status_filter:
qs = qs.filter(status=status_filter)
if library_uid:
qs = qs.filter(library_uid=library_uid)
jobs = list(qs.order_by("-created_at")[:limit])
return Response(
{
"jobs": IngestJobSerializer(jobs, many=True).data,
"count": len(jobs),
}
)

View File

@@ -0,0 +1,217 @@
"""
Workspace lifecycle endpoints for the Daedalus integration.
A "workspace" in Mnemosyne is a Library scoped to a Daedalus workspace UUID.
It uses the same Library node as a global library; the difference is that
`workspace_id` is set, and search must filter on it.
These endpoints are called by the Daedalus backend (HTTP Basic auth as
the `daedalus-service` user). Daedalus owns the workspace_id; Mnemosyne
just persists what Daedalus tells it.
"""
import logging
from neomodel import db
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from library.content_types import get_library_type_config
from .serializers import WorkspaceCreateSerializer, WorkspaceStatusSerializer
logger = logging.getLogger(__name__)
def _serialize_workspace(lib):
"""Build a WorkspaceStatus payload from a Library node + chunk/item counts."""
counts, _ = db.cypher_query(
"MATCH (l:Library {workspace_id: $wsid}) "
"OPTIONAL MATCH (l)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(i:Item) "
"OPTIONAL MATCH (i)-[:HAS_CHUNK]->(c:Chunk) "
"RETURN count(DISTINCT i) AS item_count, count(DISTINCT c) AS chunk_count",
{"wsid": lib.workspace_id},
)
item_count = counts[0][0] if counts else 0
chunk_count = counts[0][1] if counts else 0
return {
"workspace_id": lib.workspace_id,
"library_uid": lib.uid,
"name": lib.name,
"library_type": lib.library_type,
"description": lib.description or "",
"item_count": item_count,
"chunk_count": chunk_count,
"created_at": lib.created_at,
}
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def workspace_create(request):
"""
Create a workspace Library, idempotently.
A POST with a `workspace_id` already in use returns the existing
workspace (200) — not an error. The library_type is frozen at first
create; subsequent calls are not allowed to change it.
"""
from library.models import Library
serializer = WorkspaceCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# Idempotent path: workspace already exists.
try:
existing = Library.nodes.get(workspace_id=data["workspace_id"])
except Library.DoesNotExist:
existing = None
if existing is not None:
if existing.library_type != data["library_type"]:
return Response(
{
"detail": (
"library_type is immutable for an existing workspace "
f"(have '{existing.library_type}', "
f"got '{data['library_type']}')."
)
},
status=status.HTTP_409_CONFLICT,
)
logger.info(
"Workspace already exists workspace_id=%s library_uid=%s",
data["workspace_id"], existing.uid,
)
return Response(
WorkspaceStatusSerializer(_serialize_workspace(existing)).data,
status=status.HTTP_200_OK,
)
defaults = get_library_type_config(data["library_type"])
lib = Library(
name=data["name"],
library_type=data["library_type"],
description=data.get("description", ""),
workspace_id=data["workspace_id"],
chunking_config=defaults["chunking_config"],
embedding_instruction=defaults["embedding_instruction"],
reranker_instruction=defaults["reranker_instruction"],
llm_context_prompt=defaults["llm_context_prompt"],
)
lib.save()
logger.info(
"Workspace created workspace_id=%s library_uid=%s library_type=%s",
data["workspace_id"], lib.uid, lib.library_type,
)
return Response(
WorkspaceStatusSerializer(_serialize_workspace(lib)).data,
status=status.HTTP_201_CREATED,
)
@api_view(["GET", "DELETE"])
@permission_classes([IsAuthenticated])
def workspace_detail_or_delete(request, workspace_id):
"""
GET: return workspace status (item/chunk counts, metadata).
DELETE: delete the workspace Library and everything reachable AND unique
to it. Concept-safe: orphan-only Concept GC happens at the end.
Concepts referenced by other libraries (workspace or global) are preserved.
"""
from library.models import Library
if request.method == "GET":
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
return Response(
{"detail": "Workspace not found."},
status=status.HTTP_404_NOT_FOUND,
)
return Response(WorkspaceStatusSerializer(_serialize_workspace(lib)).data)
# DELETE — idempotent: a missing workspace returns 204.
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
return Response(status=status.HTTP_204_NO_CONTENT)
library_uid = lib.uid
library_name = lib.name
# Step 1-4: delete chunks, items, collections, then the library itself.
# We collect Item s3_keys first so the caller can clean up S3
# asynchronously (a future enhancement — for now, the keys are logged).
s3_rows, _ = db.cypher_query(
"MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)"
"-[:CONTAINS]->(i:Item) RETURN i.uid, i.s3_key",
{"wsid": workspace_id},
)
item_s3_keys = [(r[0], r[1]) for r in s3_rows if r[1]]
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)-[:HAS_CHUNK]->(c:Chunk)
DETACH DELETE c
""",
{"wsid": workspace_id},
)
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)-[:HAS_IMAGE]->(img:Image)
OPTIONAL MATCH (img)-[:HAS_EMBEDDING]->(emb:ImageEmbedding)
DETACH DELETE img, emb
""",
{"wsid": workspace_id},
)
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(:Collection)
-[:CONTAINS]->(i:Item)
DETACH DELETE i
""",
{"wsid": workspace_id},
)
db.cypher_query(
"""
MATCH (l:Library {workspace_id: $wsid})-[:CONTAINS]->(col:Collection)
DETACH DELETE col
""",
{"wsid": workspace_id},
)
db.cypher_query(
"MATCH (l:Library {workspace_id: $wsid}) DETACH DELETE l",
{"wsid": workspace_id},
)
# Step 5: orphan Concept garbage collection.
orphan_result, _ = db.cypher_query(
"""
MATCH (con:Concept)
WHERE NOT (con)<-[:REFERENCES]-() AND NOT (con)<-[:MENTIONS]-()
AND NOT (con)<-[:DEPICTS]-()
WITH con
DETACH DELETE con
RETURN count(con) AS deleted
"""
)
orphans_deleted = orphan_result[0][0] if orphan_result else 0
logger.info(
"Workspace deleted workspace_id=%s library_uid=%s name=%s "
"items=%d orphans_deleted=%d",
workspace_id, library_uid, library_name,
len(item_s3_keys), orphans_deleted,
)
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@@ -0,0 +1,210 @@
"""
Tests for workspace and ingest REST endpoints.
These exercise serializer validation, idempotency rules, and Django ORM
behavior for IngestJob. The Cypher-touching paths (Library node CRUD,
search scoping) require Neo4j and are validated by the manual end-to-end
test plan, not these unit tests.
"""
from django.test import TestCase
from rest_framework.test import APIClient
from library.api.serializers import (
IngestRequestSerializer,
WorkspaceCreateSerializer,
)
from library.models import IngestJob
class WorkspaceCreateSerializerTests(TestCase):
"""Validation rules for the create-workspace payload."""
def test_minimal_payload_validates(self):
s = WorkspaceCreateSerializer(
data={
"workspace_id": "ws_abc",
"name": "My Workspace",
"library_type": "technical",
}
)
self.assertTrue(s.is_valid(), s.errors)
def test_business_type_accepted(self):
s = WorkspaceCreateSerializer(
data={
"workspace_id": "ws_abc",
"name": "Sales",
"library_type": "business",
}
)
self.assertTrue(s.is_valid(), s.errors)
def test_finance_type_accepted(self):
s = WorkspaceCreateSerializer(
data={
"workspace_id": "ws_abc",
"name": "Money",
"library_type": "finance",
}
)
self.assertTrue(s.is_valid(), s.errors)
def test_unknown_type_rejected(self):
s = WorkspaceCreateSerializer(
data={
"workspace_id": "ws_abc",
"name": "x",
"library_type": "miscellaneous",
}
)
self.assertFalse(s.is_valid())
self.assertIn("library_type", s.errors)
def test_workspace_id_required(self):
s = WorkspaceCreateSerializer(
data={"name": "x", "library_type": "technical"}
)
self.assertFalse(s.is_valid())
self.assertIn("workspace_id", s.errors)
class IngestRequestSerializerTests(TestCase):
"""Validation rules for the ingest payload."""
BASE_PAYLOAD = {
"s3_key": "workspaces/ws/files/f/x.pdf",
"title": "Q4 Report",
"content_hash": "a" * 64,
}
def test_workspace_id_only_validates(self):
s = IngestRequestSerializer(
data={**self.BASE_PAYLOAD, "workspace_id": "ws_abc"}
)
self.assertTrue(s.is_valid(), s.errors)
def test_library_uid_only_validates(self):
s = IngestRequestSerializer(
data={**self.BASE_PAYLOAD, "library_uid": "lib_xyz"}
)
self.assertTrue(s.is_valid(), s.errors)
def test_neither_workspace_nor_library_rejected(self):
s = IngestRequestSerializer(data=self.BASE_PAYLOAD)
self.assertFalse(s.is_valid())
# ValidationError on the whole payload, not a specific field
self.assertIn("non_field_errors", s.errors)
def test_content_hash_required(self):
s = IngestRequestSerializer(
data={
"s3_key": "x",
"title": "y",
"workspace_id": "ws_abc",
}
)
self.assertFalse(s.is_valid())
self.assertIn("content_hash", s.errors)
class IngestJobModelTests(TestCase):
"""IngestJob persists and queries correctly."""
def test_create_with_minimal_fields(self):
job = IngestJob.objects.create(
id="job_test1",
library_uid="lib_xyz",
s3_key="x.pdf",
)
self.assertEqual(job.status, "pending")
self.assertEqual(job.progress, "queued")
self.assertEqual(job.retry_count, 0)
self.assertEqual(job.chunks_created, 0)
def test_idempotency_query_pattern(self):
"""The idempotency query in ingest_create uses (library, source_ref, hash)."""
IngestJob.objects.create(
id="job_a",
library_uid="lib_xyz",
source_ref="ws_a/file_1",
content_hash="h1",
s3_key="a.pdf",
status="completed",
)
IngestJob.objects.create(
id="job_b",
library_uid="lib_xyz",
source_ref="ws_a/file_1",
content_hash="h2", # different hash — supersedes
s3_key="a.pdf",
status="completed",
)
# Same library + source_ref + h1 → finds job_a
match = IngestJob.objects.filter(
library_uid="lib_xyz",
source_ref="ws_a/file_1",
content_hash="h1",
).first()
self.assertIsNotNone(match)
self.assertEqual(match.id, "job_a")
# h2 → finds job_b
match = IngestJob.objects.filter(
library_uid="lib_xyz",
source_ref="ws_a/file_1",
content_hash="h2",
).first()
self.assertEqual(match.id, "job_b")
# Different source_ref → no match
match = IngestJob.objects.filter(
library_uid="lib_xyz",
source_ref="ws_a/file_2",
content_hash="h1",
).first()
self.assertIsNone(match)
class IngestEndpointAuthTests(TestCase):
"""Auth boundary on the ingest endpoint (matches the existing convention)."""
def setUp(self):
self.client = APIClient()
def test_ingest_requires_auth(self):
response = self.client.post(
"/library/api/ingest/",
{
"s3_key": "x",
"title": "y",
"workspace_id": "ws_abc",
"content_hash": "a" * 64,
},
format="json",
)
self.assertIn(response.status_code, [401, 403])
class WorkspaceEndpointAuthTests(TestCase):
"""Auth on workspace endpoints."""
def setUp(self):
self.client = APIClient()
def test_workspace_create_requires_auth(self):
response = self.client.post(
"/library/api/workspaces/",
{"workspace_id": "ws_a", "name": "x", "library_type": "technical"},
format="json",
)
self.assertIn(response.status_code, [401, 403])
def test_workspace_get_requires_auth(self):
response = self.client.get("/library/api/workspaces/ws_a/")
self.assertIn(response.status_code, [401, 403])
def test_workspace_delete_requires_auth(self):
response = self.client.delete("/library/api/workspaces/ws_a/")
self.assertIn(response.status_code, [401, 403])