Files
mnemosyne/mnemosyne/library/api/views.py
Robert Helewka 93639188d3
Some checks failed
CVE Scan & Docker Build / build-and-push (push) Has been cancelled
CVE Scan & Docker Build / security-scan (push) Has been cancelled
Build & Deploy Docs / build-and-deploy (push) Successful in 1m10s
feat: rework auth model with UserToken and Daedalus/Pallas integration
- Rename MCPToken to UserToken across models, views, and tests
- Update URL names from mcp-token-* to token-*
- Add Daedalus/Pallas integration design doc (v2)
- Switch docker-compose to build local mnemosyne:local image via shared
  build config instead of pulling from git.helu.ca
2026-05-23 19:50:29 -04:00

912 lines
30 KiB
Python

"""
DRF API views for the library app.
All views are function-based per Red Panda Standards.
"""
import hashlib
import logging
import os
from django.core.files.base import ContentFile
from django.core.files.storage import default_storage
from rest_framework import status
from rest_framework.decorators import api_view, parser_classes, permission_classes
from rest_framework.parsers import FormParser, JSONParser, MultiPartParser
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from library.content_types import get_library_type_config
from .serializers import (
CollectionSerializer,
ConceptSerializer,
IngestJobSerializer,
IngestRequestSerializer,
ItemSerializer,
LibrarySerializer,
SearchRequestSerializer,
SearchResponseSerializer,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Library API
# ---------------------------------------------------------------------------
@api_view(["GET", "POST"])
@permission_classes([IsAuthenticated])
def library_list_create(request):
"""List all libraries or create a new one.
GET supports ``?include_workspace=false`` (default ``true``) to filter out
libraries that belong to a Daedalus workspace — the Daedalus library
registry uses this to mirror only the user-managed catalog.
GET supports ``?with_item_count=true`` (default ``false``) to attach
a per-library ``item_count``. Off by default because the count is a
Cypher aggregate; on for the Daedalus-side registry poll.
"""
from library.models import Library
if request.method == "GET":
include_workspace = request.GET.get("include_workspace", "true").lower() != "false"
with_count = request.GET.get("with_item_count", "false").lower() == "true"
if include_workspace:
libraries = list(Library.nodes.order_by("name"))
else:
libraries = list(Library.nodes.filter(workspace_id__isnull=True).order_by("name"))
data = LibrarySerializer(libraries, many=True).data
if with_count and libraries:
from neomodel import db
uids = [lib.uid for lib in libraries]
rows, _ = db.cypher_query(
"MATCH (l:Library) WHERE l.uid IN $uids "
"OPTIONAL MATCH (l)-[:CONTAINS]->(:Collection)-[:CONTAINS]->(i:Item) "
"RETURN l.uid, count(i)",
{"uids": uids},
)
counts = {uid: count for (uid, count) in rows}
for entry in data:
entry["item_count"] = counts.get(entry["uid"], 0)
return Response(data)
# POST — create
serializer = LibrarySerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# Populate defaults from content-type config if not provided
library_type = data["library_type"]
defaults = get_library_type_config(library_type)
lib = Library(
name=data["name"],
library_type=library_type,
description=data.get("description", ""),
chunking_config=data.get("chunking_config") or defaults["chunking_config"],
embedding_instruction=(
data.get("embedding_instruction") or defaults["embedding_instruction"]
),
reranker_instruction=(
data.get("reranker_instruction") or defaults["reranker_instruction"]
),
llm_context_prompt=(
data.get("llm_context_prompt") or defaults["llm_context_prompt"]
),
)
lib.save()
return Response(LibrarySerializer(lib).data, status=status.HTTP_201_CREATED)
@api_view(["GET", "PUT", "DELETE"])
@permission_classes([IsAuthenticated])
def library_detail(request, uid):
"""Retrieve, update, or delete a library."""
from library.models import Library
try:
lib = Library.nodes.get(uid=uid)
except Library.DoesNotExist:
return Response(
{"detail": "Library not found."}, status=status.HTTP_404_NOT_FOUND
)
if request.method == "GET":
return Response(LibrarySerializer(lib).data)
if request.method == "PUT":
serializer = LibrarySerializer(data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
for field in [
"name",
"library_type",
"description",
"chunking_config",
"embedding_instruction",
"reranker_instruction",
"llm_context_prompt",
]:
if field in data:
setattr(lib, field, data[field])
lib.save()
return Response(LibrarySerializer(lib).data)
# DELETE
lib.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# ---------------------------------------------------------------------------
# Collection API
# ---------------------------------------------------------------------------
@api_view(["GET", "POST"])
@permission_classes([IsAuthenticated])
def collection_list_create(request):
"""List all collections or create a new one."""
from library.models import Collection, Library
if request.method == "GET":
# Optionally filter by library_uid query param
library_uid = request.query_params.get("library_uid")
if library_uid:
try:
lib = Library.nodes.get(uid=library_uid)
collections = lib.collections.all()
except Library.DoesNotExist:
return Response(
{"detail": "Library not found."}, status=status.HTTP_404_NOT_FOUND
)
else:
collections = Collection.nodes.all()
serializer = CollectionSerializer(collections, many=True)
return Response(serializer.data)
# POST
serializer = CollectionSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
col = Collection(
name=data["name"],
description=data.get("description", ""),
metadata=data.get("metadata", {}),
)
col.save()
# Connect to library if library_uid provided
library_uid = data.get("library_uid")
if library_uid:
try:
lib = Library.nodes.get(uid=library_uid)
lib.collections.connect(col)
col.library.connect(lib)
except Library.DoesNotExist:
pass
return Response(CollectionSerializer(col).data, status=status.HTTP_201_CREATED)
@api_view(["GET", "PUT", "DELETE"])
@permission_classes([IsAuthenticated])
def collection_detail(request, uid):
"""Retrieve, update, or delete a collection."""
from library.models import Collection
try:
col = Collection.nodes.get(uid=uid)
except Collection.DoesNotExist:
return Response(
{"detail": "Collection not found."}, status=status.HTTP_404_NOT_FOUND
)
if request.method == "GET":
return Response(CollectionSerializer(col).data)
if request.method == "PUT":
serializer = CollectionSerializer(data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
for field in ["name", "description", "metadata"]:
if field in data:
setattr(col, field, data[field])
col.save()
return Response(CollectionSerializer(col).data)
col.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# ---------------------------------------------------------------------------
# Item API
# ---------------------------------------------------------------------------
@api_view(["GET", "POST"])
@permission_classes([IsAuthenticated])
def item_list_create(request):
"""List all items or create a new one."""
from library.models import Collection, Item
if request.method == "GET":
collection_uid = request.query_params.get("collection_uid")
if collection_uid:
try:
col = Collection.nodes.get(uid=collection_uid)
items = col.items.all()
except Collection.DoesNotExist:
return Response(
{"detail": "Collection not found."},
status=status.HTTP_404_NOT_FOUND,
)
else:
items = Item.nodes.all()
serializer = ItemSerializer(items, many=True)
return Response(serializer.data)
# POST
serializer = ItemSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
item = Item(
title=data["title"],
item_type=data.get("item_type", ""),
file_type=data.get("file_type", ""),
metadata=data.get("metadata", {}),
)
item.save()
collection_uid = data.get("collection_uid")
if collection_uid:
try:
col = Collection.nodes.get(uid=collection_uid)
col.items.connect(item)
except Collection.DoesNotExist:
pass
return Response(ItemSerializer(item).data, status=status.HTTP_201_CREATED)
@api_view(["GET", "PUT", "DELETE"])
@permission_classes([IsAuthenticated])
def item_detail(request, uid):
"""Retrieve, update, or delete an item."""
from library.models import Item
try:
item = Item.nodes.get(uid=uid)
except Item.DoesNotExist:
return Response(
{"detail": "Item not found."}, status=status.HTTP_404_NOT_FOUND
)
if request.method == "GET":
return Response(ItemSerializer(item).data)
if request.method == "PUT":
serializer = ItemSerializer(data=request.data, partial=True)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
for field in ["title", "item_type", "file_type", "metadata"]:
if field in data:
setattr(item, field, data[field])
item.save()
return Response(ItemSerializer(item).data)
item.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
# ---------------------------------------------------------------------------
# Item Upload (Phase 2)
# ---------------------------------------------------------------------------
@api_view(["POST"])
@permission_classes([IsAuthenticated])
@parser_classes([MultiPartParser, FormParser])
def item_upload(request):
"""
Upload a file to create a new Item and trigger embedding.
Expects multipart form data with:
- file: The document file
- title: Item title
- collection_uid: (optional) UID of parent collection
- auto_embed: (optional) Whether to auto-trigger embedding (default: true)
"""
from library.models import Collection, Item
uploaded_file = request.FILES.get("file")
if not uploaded_file:
return Response(
{"detail": "No file provided."}, status=status.HTTP_400_BAD_REQUEST
)
title = request.data.get("title", uploaded_file.name)
collection_uid = request.data.get("collection_uid", "")
auto_embed = request.data.get("auto_embed", "true").lower() in ("true", "1", "yes")
# Determine file type from extension
_, ext = os.path.splitext(uploaded_file.name)
file_type = ext.lstrip(".").lower()
# Read file data
file_data = uploaded_file.read()
content_hash = hashlib.sha256(file_data).hexdigest()
# Create Item node
item = Item(
title=title,
file_type=file_type,
file_size=len(file_data),
content_hash=content_hash,
embedding_status="pending",
)
item.save()
# Store file in S3
s3_key = f"items/{item.uid}/original.{file_type}"
try:
default_storage.save(s3_key, ContentFile(file_data))
item.s3_key = s3_key
item.save()
except Exception as exc:
logger.error("Failed to store file to S3: %s", exc)
item.delete()
return Response(
{"detail": f"File storage failed: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# Connect to collection if specified
if collection_uid:
try:
col = Collection.nodes.get(uid=collection_uid)
col.items.connect(item)
except Exception:
logger.warning("Collection not found: %s", collection_uid)
# Auto-trigger embedding
task_id = None
if auto_embed:
try:
from library.tasks import embed_item
task = embed_item.delay(item.uid, request.user.id)
task_id = task.id
logger.info(
"Auto-triggered embedding item_uid=%s task_id=%s",
item.uid,
task_id,
)
except Exception as exc:
logger.warning("Failed to queue embedding task: %s", exc)
return Response(
{
**ItemSerializer(item).data,
"task_id": task_id,
},
status=status.HTTP_201_CREATED,
)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def item_reembed(request, uid):
"""Trigger re-embedding for an existing Item."""
from library.models import Item
try:
item = Item.nodes.get(uid=uid)
except Item.DoesNotExist:
return Response(
{"detail": "Item not found."}, status=status.HTTP_404_NOT_FOUND
)
try:
from library.tasks import reembed_item
task = reembed_item.delay(uid, request.user.id)
return Response(
{
"detail": "Re-embedding queued.",
"item_uid": uid,
"task_id": task.id,
}
)
except Exception as exc:
logger.error("Failed to queue reembed task: %s", exc)
return Response(
{"detail": f"Failed to queue task: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def item_status(request, uid):
"""Get embedding status for an Item."""
from library.models import Item
try:
item = Item.nodes.get(uid=uid)
except Item.DoesNotExist:
return Response(
{"detail": "Item not found."}, status=status.HTTP_404_NOT_FOUND
)
return Response(
{
"uid": item.uid,
"title": item.title,
"embedding_status": item.embedding_status,
"embedding_model_name": item.embedding_model_name,
"chunk_count": item.chunk_count,
"image_count": item.image_count,
"error_message": item.error_message,
}
)
# ---------------------------------------------------------------------------
# Search API (Phase 3)
# ---------------------------------------------------------------------------
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def search(request):
"""
Full hybrid search: vector + fulltext + graph → fusion → re-ranking.
Accepts JSON body with query, optional filters, and search parameters.
Returns ranked candidates with scores and metadata.
"""
from django.conf import settings as django_settings
from library.services.search import SearchRequest, SearchService
from library.utils import all_library_uids
serializer = SearchRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# This DRF endpoint is gated by ``IsAuthenticated`` against a
# Django session, not an MCP bearer. The session is trusted;
# expose every library to the request. MCP-bearer callers go
# through ``mcp_server`` and get a narrower ``resolved_libraries``
# materialized by the auth middleware.
search_request = SearchRequest(
query=data["query"],
library_uid=data.get("library_uid") or None,
library_type=data.get("library_type") or None,
collection_uid=data.get("collection_uid") or None,
resolved_libraries=all_library_uids(),
search_types=data.get("search_types", ["vector", "fulltext", "graph"]),
limit=data.get("limit", getattr(django_settings, "SEARCH_DEFAULT_LIMIT", 20)),
vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50),
fulltext_top_k=getattr(django_settings, "SEARCH_FULLTEXT_TOP_K", 30),
rerank=data.get("rerank", True),
include_images=data.get("include_images", True),
)
service = SearchService(user=request.user)
response = service.search(search_request)
return Response(SearchResponseSerializer(response).data)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def search_vector(request):
"""Vector-only search (debugging endpoint)."""
from django.conf import settings as django_settings
from library.services.search import SearchRequest, SearchService
from library.utils import all_library_uids
serializer = SearchRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
search_request = SearchRequest(
query=data["query"],
library_uid=data.get("library_uid") or None,
library_type=data.get("library_type") or None,
collection_uid=data.get("collection_uid") or None,
resolved_libraries=all_library_uids(),
search_types=["vector"],
limit=data.get("limit", 20),
vector_top_k=getattr(django_settings, "SEARCH_VECTOR_TOP_K", 50),
rerank=False,
include_images=False,
)
service = SearchService(user=request.user)
response = service.search(search_request)
return Response(SearchResponseSerializer(response).data)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def search_fulltext(request):
"""Full-text-only search (debugging endpoint)."""
from django.conf import settings as django_settings
from library.services.search import SearchRequest, SearchService
from library.utils import all_library_uids
serializer = SearchRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
search_request = SearchRequest(
query=data["query"],
library_uid=data.get("library_uid") or None,
library_type=data.get("library_type") or None,
collection_uid=data.get("collection_uid") or None,
resolved_libraries=all_library_uids(),
search_types=["fulltext"],
limit=data.get("limit", 20),
fulltext_top_k=getattr(django_settings, "SEARCH_FULLTEXT_TOP_K", 30),
rerank=False,
include_images=False,
)
service = SearchService(user=request.user)
response = service.search(search_request)
return Response(SearchResponseSerializer(response).data)
# ---------------------------------------------------------------------------
# Concept API (Phase 3)
# ---------------------------------------------------------------------------
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def concept_list(request):
"""List or search concepts."""
from library.models import Concept
query = request.query_params.get("q", "")
limit = min(int(request.query_params.get("limit", 50)), 100)
if query:
# Search via fulltext index
try:
from neomodel import db
results, _ = db.cypher_query(
"CALL db.index.fulltext.queryNodes('concept_name_fulltext', $query) "
"YIELD node, score "
"RETURN node.uid AS uid, node.name AS name, "
" node.concept_type AS concept_type, score "
"ORDER BY score DESC LIMIT $limit",
{"query": query, "limit": limit},
)
concepts = [
{"uid": r[0], "name": r[1], "concept_type": r[2] or "", "score": r[3]}
for r in results
]
return Response({"concepts": concepts, "count": len(concepts)})
except Exception as exc:
logger.error("Concept search failed: %s", exc)
return Response(
{"detail": f"Search failed: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
else:
try:
concepts = Concept.nodes.order_by("name")[:limit]
return Response(
{
"concepts": ConceptSerializer(concepts, many=True).data,
"count": len(concepts),
}
)
except Exception as exc:
logger.error("Concept list failed: %s", exc)
return Response(
{"detail": f"Failed: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def concept_graph(request, uid):
"""Get a concept's neighborhood graph (connected concepts, chunks, items)."""
try:
from neomodel import db
# Get the concept and its connections
results, _ = db.cypher_query(
"MATCH (c:Concept {uid: $uid}) "
"OPTIONAL MATCH (c)<-[:MENTIONS]-(chunk:Chunk)<-[:HAS_CHUNK]-(item:Item) "
"OPTIONAL MATCH (c)<-[:DEPICTS]-(img:Image)<-[:HAS_IMAGE]-(img_item:Item) "
"OPTIONAL MATCH (c)-[:RELATED_TO]-(related:Concept) "
"RETURN c.uid AS uid, c.name AS name, c.concept_type AS concept_type, "
" collect(DISTINCT {uid: item.uid, title: item.title})[..20] AS items, "
" collect(DISTINCT {uid: related.uid, name: related.name, "
" concept_type: related.concept_type}) AS related_concepts, "
" count(DISTINCT chunk) AS chunk_count, "
" count(DISTINCT img) AS image_count",
{"uid": uid},
)
if not results or not results[0][0]:
return Response(
{"detail": "Concept not found."}, status=status.HTTP_404_NOT_FOUND
)
row = results[0]
# Filter out null entries from collected lists
items = [i for i in (row[3] or []) if i.get("uid")]
related = [r for r in (row[4] or []) if r.get("uid")]
return Response(
{
"uid": row[0],
"name": row[1],
"concept_type": row[2] or "",
"items": items,
"related_concepts": related,
"chunk_count": row[5] or 0,
"image_count": row[6] or 0,
}
)
except Exception as exc:
logger.error("Concept graph query failed: %s", exc)
return Response(
{"detail": f"Failed: {exc}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# ---------------------------------------------------------------------------
# Ingest API (Daedalus integration)
# ---------------------------------------------------------------------------
def _job_for_user_or_none(job_id, username):
"""Load an ``IngestJob`` visible to ``username``, else ``None``.
Visibility: the job's ``library_uid`` must resolve to a Library with
``owner_username`` either null (global) or matching ``username``.
Callers translate ``None`` into a 404 with generic wording — cross-
user reads must not disclose existence.
"""
from library.models import IngestJob, Library
try:
job = IngestJob.objects.get(pk=job_id)
except IngestJob.DoesNotExist:
return None
try:
lib = Library.nodes.get(uid=job.library_uid)
except Library.DoesNotExist:
return None
if lib.owner_username and lib.owner_username != username:
return None
return job
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def ingest_create(request):
"""
Accept a file (already in S3) for ingestion + embedding.
Daedalus calls this after committing a file to its own S3 bucket.
We resolve the target Library (by workspace_id or library_uid),
enforce idempotency on (library, source_ref, content_hash), create
an IngestJob row, and dispatch a Celery task.
Idempotency:
- Same source_ref + same content_hash → return existing completed job
(no new task dispatched).
- Same source_ref + different content_hash → dispatch a new task that
will supersede the prior Item.
- New source_ref → fresh ingest.
"""
import uuid
from library.models import IngestJob, Library
from library.tasks import ingest_from_daedalus
serializer = IngestRequestSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
# --- Resolve target Library ---
workspace_id = data.get("workspace_id") or ""
library_uid = data.get("library_uid") or ""
if workspace_id:
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
return Response(
{"detail": f"Workspace '{workspace_id}' not registered."},
status=status.HTTP_404_NOT_FOUND,
)
else:
try:
lib = Library.nodes.get(uid=library_uid)
except Library.DoesNotExist:
return Response(
{"detail": f"Library '{library_uid}' not found."},
status=status.HTTP_404_NOT_FOUND,
)
# --- Owner-scope (workspace-scoped libraries only) ---
# Global libraries (owner_username is null) stay shared; workspace
# libraries are visible only to their creating user. Cross-user
# callers get the same wording as the not-found branch above so the
# endpoint doesn't disclose existence across users.
if lib.owner_username and lib.owner_username != request.user.username:
return Response(
{"detail": f"Workspace '{workspace_id or library_uid}' not registered."},
status=status.HTTP_404_NOT_FOUND,
)
# --- Idempotency check on (library, source_ref, content_hash) ---
source_ref = data.get("source_ref") or ""
content_hash = data["content_hash"]
if source_ref:
existing = (
IngestJob.objects
.filter(
library_uid=lib.uid,
source_ref=source_ref,
content_hash=content_hash,
status__in=["pending", "processing", "completed"],
)
.order_by("-created_at")
.first()
)
if existing is not None:
logger.info(
"Ingest idempotent hit job_id=%s source_ref=%s status=%s",
existing.id, source_ref, existing.status,
)
return Response(
IngestJobSerializer(existing).data,
status=status.HTTP_200_OK,
)
# --- Create job + dispatch ---
job = IngestJob.objects.create(
id=f"job_{uuid.uuid4().hex[:24]}",
library_uid=lib.uid,
s3_key=data["s3_key"],
title=data["title"],
file_type=data.get("file_type", ""),
file_size=data.get("file_size", 0),
content_hash=content_hash,
source=data.get("source", ""),
source_ref=source_ref,
collection_uid=data.get("collection_uid", ""),
status="pending",
progress="queued",
)
try:
async_result = ingest_from_daedalus.delay(job.id)
job.celery_task_id = async_result.id
job.save(update_fields=["celery_task_id"])
except Exception as exc:
logger.error("Failed to dispatch ingest task job_id=%s: %s", job.id, exc)
job.status = "failed"
job.error = f"dispatch failed: {exc}"
job.save(update_fields=["status", "error"])
return Response(
IngestJobSerializer(job).data,
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
logger.info(
"Ingest dispatched job_id=%s library_uid=%s source_ref=%s task_id=%s",
job.id, lib.uid, source_ref, job.celery_task_id,
)
return Response(
IngestJobSerializer(job).data,
status=status.HTTP_202_ACCEPTED,
)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def ingest_job_detail(request, job_id):
"""Get the current status of an IngestJob."""
job = _job_for_user_or_none(job_id, request.user.username)
if job is None:
return Response(
{"detail": "Job not found."}, status=status.HTTP_404_NOT_FOUND
)
return Response(IngestJobSerializer(job).data)
@api_view(["POST"])
@permission_classes([IsAuthenticated])
def ingest_job_retry(request, job_id):
"""Re-dispatch a failed IngestJob."""
from library.tasks import ingest_from_daedalus
job = _job_for_user_or_none(job_id, request.user.username)
if job is None:
return Response(
{"detail": "Job not found."}, status=status.HTTP_404_NOT_FOUND
)
if job.status not in ("failed", "completed"):
return Response(
{"detail": f"Job is currently {job.status}; cannot retry."},
status=status.HTTP_409_CONFLICT,
)
job.status = "pending"
job.progress = "queued"
job.error = ""
job.save(update_fields=["status", "progress", "error"])
async_result = ingest_from_daedalus.delay(job.id)
job.celery_task_id = async_result.id
job.save(update_fields=["celery_task_id"])
logger.info("Ingest retry dispatched job_id=%s task_id=%s", job.id, async_result.id)
return Response(IngestJobSerializer(job).data, status=status.HTTP_202_ACCEPTED)
@api_view(["GET"])
@permission_classes([IsAuthenticated])
def ingest_job_list(request):
"""List recent IngestJob rows, optionally filtered by status / library_uid.
Scoped to libraries the caller owns (plus global libraries that have
no ``owner_username``). A ``library_uid`` query param the caller has
no access to silently returns an empty list — same wording as a
not-found job.
"""
from library.models import IngestJob
from library.utils import library_uids_for_user
visible_uids = library_uids_for_user(request.user.username)
qs = IngestJob.objects.filter(library_uid__in=visible_uids)
status_filter = request.query_params.get("status")
library_uid = request.query_params.get("library_uid")
limit = min(int(request.query_params.get("limit", 50)), 200)
if status_filter:
qs = qs.filter(status=status_filter)
if library_uid:
qs = qs.filter(library_uid=library_uid)
jobs = list(qs.order_by("-created_at")[:limit])
return Response(
{
"jobs": IngestJobSerializer(jobs, many=True).data,
"count": len(jobs),
}
)