- Implemented custom form widgets for date, time, and datetime fields with DaisyUI styling. - Created utility functions for formatting dates, times, and numbers according to user preferences. - Developed views for profile settings, API key management, and notifications, including health check endpoints. - Added URL configurations for Themis tests and main application routes. - Established test cases for custom widgets to ensure proper functionality and integration. - Defined project metadata and dependencies in pyproject.toml for package management.
674 lines
26 KiB
Markdown
674 lines
26 KiB
Markdown
# Async Task Pattern v1.0.0
|
||
|
||
Defines how Spelunker Django apps implement background task processing using Celery, RabbitMQ, Memcached, and Flower — covering fire-and-forget tasks, long-running batch jobs, signal-triggered tasks, and periodic scheduled tasks.
|
||
|
||
## 🐾 Red Panda Approval™
|
||
|
||
This pattern follows Red Panda Approval standards.
|
||
|
||
---
|
||
|
||
## Why a Pattern, Not a Shared Implementation
|
||
|
||
Long-running work in Spelunker spans multiple domains, each with distinct progress-tracking and state requirements:
|
||
|
||
- A `solution_library` document embedding task needs to update `review_status` on a `Document` and count vector chunks created.
|
||
- An `rfp_manager` batch job tracks per-question progress, per-question errors, and the Celery task ID on an `RFPBatchJob` record.
|
||
- An `llm_manager` API-validation task iterates over all active APIs and accumulates model sync statistics.
|
||
- A `solution_library` documentation-source sync task fires from a View, stores `celery_task_id` on a `SyncJob`, and reports incremental progress via a callback.
|
||
|
||
Instead, this pattern defines:
|
||
|
||
- **Required task interface** — every task must have a namespaced name, a structured return dict, and structured logging.
|
||
- **Recommended job-tracking fields** — most tasks that represent a significant unit of work should have a corresponding DB job record.
|
||
- **Error handling conventions** — how to catch, log, and reflect failures back to the record.
|
||
- **Dispatch variants** — signal-triggered, admin action, view-triggered, and periodic (Beat).
|
||
- **Infrastructure conventions** — broker, result backend, serialization, and cache settings.
|
||
|
||
---
|
||
|
||
## Required Task Interface
|
||
|
||
Every Celery task in Spelunker **must**:
|
||
|
||
```python
|
||
from celery import shared_task
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
@shared_task(name='<app_label>.<action_name>')
|
||
def my_task(primary_id: int, user_id: int = None) -> dict:
|
||
"""One-line description of what this task does."""
|
||
try:
|
||
# ... do work ...
|
||
logger.info(f"Task succeeded for {primary_id}")
|
||
return {'success': True, 'id': primary_id}
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Task failed for {primary_id}: {type(e).__name__}: {e}",
|
||
extra={'id': primary_id, 'error': str(e)},
|
||
exc_info=True,
|
||
)
|
||
return {'success': False, 'id': primary_id, 'error': str(e)}
|
||
```
|
||
|
||
| Requirement | Rule |
|
||
|---|---|
|
||
| `name` | Must be `'<app_label>.<action>'`, e.g., `'solution_library.embed_document'` |
|
||
| Return value | Always a dict with at minimum `{'success': bool}` |
|
||
| Logging | Use structured `extra={}` kwargs; never silence exceptions silently |
|
||
| Import style | Use `@shared_task`, not direct `app.task` references |
|
||
| Idempotency | Tasks **must** be safe to re-execute with the same arguments (broker redelivery, worker crash). Use `update_or_create`, check-before-write, or guard with the job record's status before re-processing. |
|
||
| Arguments | Pass only JSON-serialisable primitives (PKs, strings, numbers). Never pass ORM instances. |
|
||
|
||
---
|
||
|
||
## Retry & Time-Limit Policy
|
||
|
||
Tasks that call external services (LLM APIs, S3, remote URLs) should declare automatic retries for transient failures. Tasks must also set time limits to prevent hung workers.
|
||
|
||
### Recommended Retry Decorator
|
||
|
||
```python
|
||
@shared_task(
|
||
name='<app_label>.<action>',
|
||
bind=True,
|
||
autoretry_for=(ConnectionError, TimeoutError),
|
||
retry_backoff=60, # first retry after 60 s, then 120 s, 240 s …
|
||
retry_backoff_max=600, # cap at 10 minutes
|
||
retry_jitter=True, # add randomness to avoid thundering herd
|
||
max_retries=3,
|
||
soft_time_limit=1800, # raise SoftTimeLimitExceeded after 30 min
|
||
time_limit=2100, # hard-kill after 35 min
|
||
)
|
||
def my_task(self, primary_id: int, ...):
|
||
...
|
||
```
|
||
|
||
| Setting | Purpose | Guideline |
|
||
|---|---|---|
|
||
| `autoretry_for` | Exception classes that trigger an automatic retry | Use for **transient** errors only (network, timeout). Never for `ValueError` or business-logic errors. |
|
||
| `retry_backoff` | Seconds before first retry (doubles each attempt) | 60 s is a reasonable default for external API calls. |
|
||
| `max_retries` | Maximum retry attempts | 3 for API calls; 0 (no retry) for user-triggered batch jobs that track their own progress. |
|
||
| `soft_time_limit` | Raises `SoftTimeLimitExceeded` — allows graceful cleanup | Set on every task. Catch it to mark the job record as failed. |
|
||
| `time_limit` | Hard `SIGKILL` — last resort | Set 5–10 min above `soft_time_limit`. |
|
||
|
||
### Handling `SoftTimeLimitExceeded`
|
||
|
||
```python
|
||
from celery.exceptions import SoftTimeLimitExceeded
|
||
|
||
@shared_task(bind=True, soft_time_limit=1800, time_limit=2100, ...)
|
||
def long_running_task(self, job_id: int):
|
||
job = MyJob.objects.get(id=job_id)
|
||
try:
|
||
for item in items:
|
||
process(item)
|
||
except SoftTimeLimitExceeded:
|
||
logger.warning(f"Job {job_id} hit soft time limit — marking as failed")
|
||
job.status = 'failed'
|
||
job.completed_at = timezone.now()
|
||
job.save()
|
||
return {'success': False, 'job_id': job_id, 'error': 'Time limit exceeded'}
|
||
```
|
||
|
||
> **Note:** Batch jobs in `rfp_manager` do **not** use `autoretry_for` because they track per-question progress and should not re-run the entire batch. Instead, individual question failures are logged and the batch continues.
|
||
|
||
---
|
||
|
||
## Standard Values / Conventions
|
||
|
||
### Task Name Registry
|
||
|
||
| App | Task name | Trigger |
|
||
|---|---|---|
|
||
| `solution_library` | `solution_library.embed_document` | Signal / admin action |
|
||
| `solution_library` | `solution_library.embed_documents_batch` | Admin action |
|
||
| `solution_library` | `solution_library.sync_documentation_source` | View / admin action |
|
||
| `solution_library` | `solution_library.sync_all_documentation_sources` | Celery Beat (periodic) |
|
||
| `rfp_manager` | `rfp_manager.summarize_information_document` | Admin action |
|
||
| `rfp_manager` | `rfp_manager.batch_generate_responder_answers` | View |
|
||
| `rfp_manager` | `rfp_manager.batch_generate_reviewer_answers` | View |
|
||
| `llm_manager` | `llm_manager.validate_all_llm_apis` | Celery Beat (periodic) |
|
||
| `llm_manager` | `llm_manager.validate_single_api` | Admin action |
|
||
|
||
### Job Status Choices (DB Job Records)
|
||
|
||
```python
|
||
STATUS_PENDING = 'pending'
|
||
STATUS_PROCESSING = 'processing'
|
||
STATUS_COMPLETED = 'completed'
|
||
STATUS_FAILED = 'failed'
|
||
STATUS_CANCELLED = 'cancelled' # optional — used by rfp_manager
|
||
```
|
||
|
||
---
|
||
|
||
## Recommended Job-Tracking Fields
|
||
|
||
Tasks that represent a significant unit of work should write their state to a DB model. These are the recommended fields:
|
||
|
||
```python
|
||
class MyJobModel(models.Model):
|
||
# Celery linkage
|
||
celery_task_id = models.CharField(
|
||
max_length=255, blank=True,
|
||
help_text="Celery task ID for Flower monitoring"
|
||
)
|
||
|
||
# Status lifecycle
|
||
status = models.CharField(
|
||
max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING
|
||
)
|
||
started_at = models.DateTimeField(null=True, blank=True)
|
||
completed_at = models.DateTimeField(null=True, blank=True)
|
||
|
||
# Audit
|
||
started_by = models.ForeignKey(
|
||
User, on_delete=models.PROTECT, related_name='+'
|
||
)
|
||
created_at = models.DateTimeField(auto_now_add=True)
|
||
updated_at = models.DateTimeField(auto_now=True)
|
||
|
||
# Error accumulation
|
||
errors = models.JSONField(default=list)
|
||
|
||
class Meta:
|
||
indexes = [
|
||
models.Index(fields=['celery_task_id']),
|
||
models.Index(fields=['-created_at']),
|
||
]
|
||
```
|
||
|
||
For batch jobs that process many items, add counter fields:
|
||
|
||
```python
|
||
total_items = models.IntegerField(default=0)
|
||
processed_items = models.IntegerField(default=0)
|
||
successful_items = models.IntegerField(default=0)
|
||
failed_items = models.IntegerField(default=0)
|
||
|
||
def get_progress_percentage(self) -> int:
|
||
if self.total_items == 0:
|
||
return 0
|
||
return int((self.processed_items / self.total_items) * 100)
|
||
|
||
def is_stale(self, timeout_minutes: int = 30) -> bool:
|
||
"""True if stuck in pending/processing without recent updates."""
|
||
if self.status not in (self.STATUS_PENDING, self.STATUS_PROCESSING):
|
||
return False
|
||
return (timezone.now() - self.updated_at).total_seconds() > (timeout_minutes * 60)
|
||
```
|
||
|
||
---
|
||
|
||
## Variant 1 — Fire-and-Forget (Signal-Triggered)
|
||
|
||
Automatically dispatch a task whenever a model record is saved. Used by `solution_library` to kick off embedding whenever a `Document` is created.
|
||
|
||
```python
|
||
# solution_library/signals.py
|
||
from django.db.models.signals import post_save
|
||
from django.dispatch import receiver
|
||
from django.conf import settings
|
||
|
||
@receiver(post_save, sender=Document)
|
||
def trigger_document_embedding(sender, instance, created, **kwargs):
|
||
if not created:
|
||
return
|
||
if not getattr(settings, 'AUTO_EMBED_DOCUMENTS', True):
|
||
return
|
||
|
||
from solution_library.tasks import embed_document_task # avoid circular import
|
||
|
||
from django.db import transaction
|
||
|
||
def _dispatch():
|
||
try:
|
||
task = embed_document_task.delay(
|
||
document_id=instance.id,
|
||
embedding_model_id=instance.embedding_model_id or None,
|
||
user_id=None,
|
||
)
|
||
logger.info(f"Queued embedding task {task.id} for document {instance.id}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to queue embedding task for document {instance.id}: {e}")
|
||
|
||
# Dispatch AFTER the transaction commits so the worker can read the row
|
||
transaction.on_commit(_dispatch)
|
||
```
|
||
|
||
The corresponding task updates the record's status field at start and completion:
|
||
|
||
```python
|
||
@shared_task(name='solution_library.embed_document')
|
||
def embed_document_task(document_id: int, embedding_model_id: int = None, user_id: int = None):
|
||
document = Document.objects.get(id=document_id)
|
||
document.review_status = 'processing'
|
||
document.save(update_fields=['review_status', 'embedding_model'])
|
||
|
||
# ... perform work ...
|
||
|
||
document.review_status = 'pending'
|
||
document.save(update_fields=['review_status'])
|
||
return {'success': True, 'document_id': document_id, 'chunks_created': count}
|
||
```
|
||
|
||
---
|
||
|
||
## Variant 2 — Long-Running Batch Job (View or Admin Triggered)
|
||
|
||
Used by `rfp_manager` for multi-hour batch RAG processing. The outer transaction creates the DB job record first, then dispatches the Celery task, passing the job's PK.
|
||
|
||
```python
|
||
# rfp_manager/views.py (dispatch)
|
||
from django.db import transaction
|
||
|
||
job = RFPBatchJob.objects.create(
|
||
rfp=rfp,
|
||
started_by=request.user,
|
||
job_type=RFPBatchJob.JOB_TYPE_RESPONDER,
|
||
status=RFPBatchJob.STATUS_PENDING,
|
||
)
|
||
|
||
def _dispatch():
|
||
task = batch_generate_responder_answers.delay(rfp.pk, request.user.pk, job.pk)
|
||
# Save the Celery task ID for Flower cross-reference
|
||
job.celery_task_id = task.id
|
||
job.save(update_fields=['celery_task_id'])
|
||
|
||
# IMPORTANT: dispatch after the transaction commits so the worker
|
||
# can read the job row. Without this, the worker may receive the
|
||
# message before the row is visible, causing DoesNotExist.
|
||
transaction.on_commit(_dispatch)
|
||
```
|
||
|
||
Inside the task, use `bind=True` to get the Celery task ID:
|
||
|
||
```python
|
||
@shared_task(bind=True, name='rfp_manager.batch_generate_responder_answers')
|
||
def batch_generate_responder_answers(self, rfp_id: int, user_id: int, job_id: int):
|
||
job = RFPBatchJob.objects.get(id=job_id)
|
||
job.status = RFPBatchJob.STATUS_PROCESSING
|
||
job.started_at = timezone.now()
|
||
job.celery_task_id = self.request.id # authoritative Celery ID
|
||
job.save()
|
||
|
||
for item in items_to_process:
|
||
try:
|
||
# ... process item ...
|
||
job.processed_questions += 1
|
||
job.successful_questions += 1
|
||
job.save(update_fields=['processed_questions', 'successful_questions', 'updated_at'])
|
||
except Exception as e:
|
||
job.add_error(item, str(e))
|
||
|
||
job.status = RFPBatchJob.STATUS_COMPLETED
|
||
job.completed_at = timezone.now()
|
||
job.save()
|
||
return {'success': True, 'job_id': job_id}
|
||
```
|
||
|
||
---
|
||
|
||
## Variant 3 — Progress-Callback Task (View or Admin Triggered)
|
||
|
||
Used by `solution_library`'s `sync_documentation_source_task` when an underlying synchronous service needs to stream incremental progress updates back to the DB.
|
||
|
||
```python
|
||
@shared_task(bind=True, name='solution_library.sync_documentation_source')
|
||
def sync_documentation_source_task(self, source_id: int, user_id: int, job_id: int):
|
||
job = SyncJob.objects.get(id=job_id)
|
||
job.status = SyncJob.STATUS_PROCESSING
|
||
job.started_at = timezone.now()
|
||
job.celery_task_id = self.request.id
|
||
job.save(update_fields=['status', 'started_at', 'celery_task_id', 'updated_at'])
|
||
|
||
def update_progress(created, updated, skipped, processed, total):
|
||
job.documents_created = created
|
||
job.documents_updated = updated
|
||
job.documents_skipped = skipped
|
||
job.save(update_fields=['documents_created', 'documents_updated',
|
||
'documents_skipped', 'updated_at'])
|
||
|
||
result = sync_documentation_source(source_id, user_id, progress_callback=update_progress)
|
||
|
||
job.status = SyncJob.STATUS_COMPLETED if result.status == 'completed' else SyncJob.STATUS_FAILED
|
||
job.completed_at = timezone.now()
|
||
job.save()
|
||
return {'success': True, 'job_id': job_id}
|
||
```
|
||
|
||
---
|
||
|
||
## Variant 4 — Periodic Task (Celery Beat)
|
||
|
||
Used by `llm_manager` for hourly/daily API validation and by `solution_library` for nightly source syncs. Schedule via django-celery-beat in Django admin (no hardcoded schedules in code).
|
||
|
||
```python
|
||
@shared_task(name='llm_manager.validate_all_llm_apis')
|
||
def validate_all_llm_apis():
|
||
"""Periodic task: validate all active LLM APIs and refresh model lists."""
|
||
active_apis = LLMApi.objects.filter(is_active=True)
|
||
results = {'tested': 0, 'successful': 0, 'failed': 0, 'details': []}
|
||
|
||
for api in active_apis:
|
||
results['tested'] += 1
|
||
try:
|
||
result = test_llm_api(api)
|
||
if result['success']:
|
||
results['successful'] += 1
|
||
else:
|
||
results['failed'] += 1
|
||
except Exception as e:
|
||
results['failed'] += 1
|
||
logger.error(f"Error validating {api.name}: {e}", exc_info=True)
|
||
|
||
return results
|
||
|
||
|
||
@shared_task(name='solution_library.sync_all_documentation_sources')
|
||
def sync_all_sources_task():
|
||
"""Periodic task: queue a sync for every active documentation source."""
|
||
sources = DocumentationSource.objects.all()
|
||
system_user = User.objects.filter(is_superuser=True).first()
|
||
|
||
for source in sources:
|
||
# Skip if an active sync job already exists
|
||
if SyncJob.objects.filter(source=source,
|
||
status__in=[SyncJob.STATUS_PENDING,
|
||
SyncJob.STATUS_PROCESSING]).exists():
|
||
continue
|
||
|
||
job = SyncJob.objects.create(source=source, started_by=system_user,
|
||
status=SyncJob.STATUS_PENDING)
|
||
sync_documentation_source_task.delay(source.id, system_user.id, job.id)
|
||
|
||
return {'queued': queued, 'skipped': skipped}
|
||
```
|
||
|
||
---
|
||
|
||
## Infrastructure Configuration
|
||
|
||
### `spelunker/celery.py` — App Entry Point
|
||
|
||
```python
|
||
import os
|
||
from celery import Celery
|
||
|
||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "spelunker.settings")
|
||
|
||
app = Celery("spelunker")
|
||
app.config_from_object("django.conf:settings", namespace="CELERY")
|
||
app.autodiscover_tasks() # auto-discovers tasks.py in every INSTALLED_APP
|
||
```
|
||
|
||
### `settings.py` — Celery Settings
|
||
|
||
```python
|
||
# Broker and result backend — supplied via environment variables
|
||
CELERY_BROKER_URL = env('CELERY_BROKER_URL') # amqp://spelunker:<pw>@rabbitmq:5672/spelunker
|
||
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') # rpc://
|
||
|
||
# Serialization — JSON only (no pickle)
|
||
CELERY_ACCEPT_CONTENT = ['json']
|
||
CELERY_TASK_SERIALIZER = 'json'
|
||
CELERY_RESULT_SERIALIZER = 'json'
|
||
CELERY_TIMEZONE = env('TIME_ZONE')
|
||
|
||
# Result expiry — critical when using rpc:// backend.
|
||
# Uncollected results accumulate in worker memory without this.
|
||
CELERY_RESULT_EXPIRES = 3600 # 1 hour; safe because we store state in DB job records
|
||
|
||
# Global time limits (can be overridden per-task with decorator args)
|
||
CELERY_TASK_SOFT_TIME_LIMIT = 1800 # 30 min soft limit → SoftTimeLimitExceeded
|
||
CELERY_TASK_TIME_LIMIT = 2100 # 35 min hard kill
|
||
|
||
# Late ack: acknowledge messages AFTER task completes, not before.
|
||
# If a worker crashes mid-task, the broker redelivers the message.
|
||
CELERY_TASK_ACKS_LATE = True
|
||
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # fetch one task at a time per worker slot
|
||
|
||
# Separate logging level for Celery vs. application code
|
||
CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO')
|
||
```
|
||
|
||
> **`CELERY_TASK_ACKS_LATE`**: Combined with idempotent tasks, this provides at-least-once delivery. If a worker process is killed (OOM, deployment), the message returns to the queue and another worker picks it up. This is why idempotency is a hard requirement.
|
||
|
||
### `settings.py` — Memcached (Django Cache)
|
||
|
||
Memcached is the Django HTTP-layer cache (sessions, view caching). It is **not** used as a Celery result backend.
|
||
|
||
```python
|
||
CACHES = {
|
||
"default": {
|
||
"BACKEND": "django.core.cache.backends.memcached.PyMemcacheCache",
|
||
"LOCATION": env('KVDB_LOCATION'), # memcached:11211
|
||
"KEY_PREFIX": env('KVDB_PREFIX'), # spelunker
|
||
"TIMEOUT": 300,
|
||
}
|
||
}
|
||
```
|
||
|
||
### `INSTALLED_APPS` — Required
|
||
|
||
```python
|
||
INSTALLED_APPS = [
|
||
...
|
||
'django_celery_beat', # DB-backed periodic task scheduler (Beat)
|
||
...
|
||
]
|
||
```
|
||
|
||
### `docker-compose.yml` — Service Topology
|
||
|
||
| Service | Image | Purpose |
|
||
|---|---|---|
|
||
| `rabbitmq` | `rabbitmq:3-management-alpine` | AMQP message broker |
|
||
| `memcached` | `memcached:1.6-alpine` | Django HTTP cache |
|
||
| `worker` | `spelunker:latest` | Celery worker (`--concurrency=4`) |
|
||
| `scheduler` | `spelunker:latest` | Celery Beat with `DatabaseScheduler` |
|
||
| `flower` | `mher/flower:latest` | Task monitoring UI (port 5555) |
|
||
|
||
### Task Routing / Queues (Recommended)
|
||
|
||
By default all tasks run in the `celery` default queue. For production deployments, separate CPU-heavy work from I/O-bound work:
|
||
|
||
```python
|
||
# settings.py
|
||
CELERY_TASK_ROUTES = {
|
||
'solution_library.embed_document': {'queue': 'embedding'},
|
||
'solution_library.embed_documents_batch': {'queue': 'embedding'},
|
||
'rfp_manager.batch_generate_*': {'queue': 'batch'},
|
||
'llm_manager.validate_*': {'queue': 'default'},
|
||
}
|
||
```
|
||
|
||
```yaml
|
||
# docker-compose.yml — separate workers per queue
|
||
worker-default:
|
||
command: celery -A spelunker worker -Q default --concurrency=4
|
||
|
||
worker-embedding:
|
||
command: celery -A spelunker worker -Q embedding --concurrency=2
|
||
|
||
worker-batch:
|
||
command: celery -A spelunker worker -Q batch --concurrency=2
|
||
```
|
||
|
||
This prevents a burst of embedding tasks from starving time-sensitive API validation, and lets you scale each queue independently.
|
||
|
||
### Database Connection Management
|
||
|
||
Celery workers are long-lived processes. Django DB connections can become stale between tasks. Set `CONN_MAX_AGE` to `0` (the Django default) so connections are closed after each request cycle, or use a connection pooler like PgBouncer. Celery's `worker_pool_restarts` and Django's `close_old_connections()` (called automatically by Celery's Django fixup) handle cleanup between tasks.
|
||
|
||
---
|
||
|
||
## Domain Extension Examples
|
||
|
||
### `solution_library` App
|
||
|
||
Three task types: single-document embed, batch embed, and documentation-source sync. The single-document task is also triggered by a `post_save` signal for automatic processing on upload.
|
||
|
||
```python
|
||
# Auto-embed on create (signal)
|
||
embed_document_task.delay(document_id=instance.id, ...)
|
||
|
||
# Manual batch from admin action
|
||
embed_documents_batch_task.delay(document_ids=[1, 2, 3], ...)
|
||
|
||
# Source sync from view (with progress callback)
|
||
sync_documentation_source_task.delay(source_id=..., user_id=..., job_id=...)
|
||
```
|
||
|
||
### `rfp_manager` App
|
||
|
||
Two-stage pipeline: responder answers first, reviewer answers second. Each stage is a separate Celery batch job. Both check for an existing active job before dispatching to prevent duplicate runs.
|
||
|
||
```python
|
||
# Guard against duplicate jobs before dispatch
|
||
if RFPBatchJob.objects.filter(
|
||
rfp=rfp,
|
||
job_type=RFPBatchJob.JOB_TYPE_RESPONDER,
|
||
status__in=[RFPBatchJob.STATUS_PENDING, RFPBatchJob.STATUS_PROCESSING]
|
||
).exists():
|
||
# surface error to user
|
||
...
|
||
|
||
# Stage 1
|
||
batch_generate_responder_answers.delay(rfp.pk, user.pk, job.pk)
|
||
|
||
# Stage 2 (after Stage 1 is complete)
|
||
batch_generate_reviewer_answers.delay(rfp.pk, user.pk, job.pk)
|
||
```
|
||
|
||
### `llm_manager` App
|
||
|
||
Stateless periodic task — no DB job record needed because results are written directly to the `LLMApi` and `LLMModel` objects.
|
||
|
||
```python
|
||
# Triggered by Celery Beat; schedule managed via django-celery-beat admin
|
||
validate_all_llm_apis.delay()
|
||
|
||
# Triggered from admin action for a single API
|
||
validate_single_api.delay(api_id=api.pk)
|
||
```
|
||
|
||
---
|
||
|
||
## Anti-Patterns
|
||
|
||
- ❌ Don't use `rpc://` result backend for tasks where the caller never retrieves the result — the result accumulates in memory. Spelunker mitigates this by storing state in DB job records rather than reading Celery results. Always set `CELERY_RESULT_EXPIRES`.
|
||
- ❌ Don't pass full model instances as task arguments — pass PKs only. Celery serialises arguments as JSON; ORM objects are not JSON serialisable.
|
||
- ❌ Don't share the same `celery_task_id` between the dispatch call and the task's `self.request.id` without re-saving. The dispatch `AsyncResult.id` and the in-task `self.request.id` are the same value; write it from **inside** the task using `bind=True` as the authoritative source.
|
||
- ❌ Don't silence exceptions with bare `except: pass` — always log errors and reflect failure status onto the DB record.
|
||
- ❌ Don't skip the duplicate-job guard when the task is triggered from a view or admin action. Without it, double-clicking a submit button can queue two identical jobs.
|
||
- ❌ Don't use `CELERY_TASK_SERIALIZER = 'pickle'` — JSON only, to prevent arbitrary code execution via crafted task payloads.
|
||
- ❌ Don't hardcode periodic task schedules in code via `app.conf.beat_schedule` — use `django_celery_beat` and manage schedules in Django admin so they survive deployments.
|
||
- ❌ Don't call `.delay()` inside a database transaction — use `transaction.on_commit()`. The worker may receive the message before the row is committed, causing `DoesNotExist`.
|
||
- ❌ Don't write non-idempotent tasks — workers may crash and brokers may redeliver. A re-executed task must produce the same result (or safely no-op).
|
||
- ❌ Don't omit time limits — a hung external API call (LLM, S3) will block a worker slot forever. Always set `soft_time_limit` and `time_limit`.
|
||
- ❌ Don't retry business-logic errors with `autoretry_for` — only retry **transient** failures (network errors, timeouts). A `ValueError` or `DoesNotExist` will never succeed on retry.
|
||
|
||
---
|
||
|
||
## Migration / Adoption
|
||
|
||
When adding a new Celery task to an existing app:
|
||
|
||
1. Create `<app>/tasks.py` using `@shared_task`, not `@app.task`.
|
||
2. Name the task `'<app_label>.<action>'`.
|
||
3. If the task is long-running, create a DB job model with the recommended fields above.
|
||
4. Register the app in `INSTALLED_APPS` (required for `autodiscover_tasks`).
|
||
5. For periodic tasks, add a schedule record via Django admin → Periodic Tasks (django-celery-beat) rather than in code.
|
||
6. Add a test that confirms the task can be called synchronously with `CELERY_TASK_ALWAYS_EAGER = True`.
|
||
|
||
---
|
||
|
||
## Settings
|
||
|
||
```python
|
||
# settings.py
|
||
|
||
# Required — broker and result backend
|
||
CELERY_BROKER_URL = env('CELERY_BROKER_URL') # amqp://user:pw@host:5672/vhost
|
||
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') # rpc://
|
||
|
||
# Serialization (do not change)
|
||
CELERY_ACCEPT_CONTENT = ['json']
|
||
CELERY_TASK_SERIALIZER = 'json'
|
||
CELERY_RESULT_SERIALIZER = 'json'
|
||
CELERY_TIMEZONE = env('TIME_ZONE') # must match Django TIME_ZONE
|
||
|
||
# Result expiry — prevents unbounded memory growth with rpc:// backend
|
||
CELERY_RESULT_EXPIRES = 3600 # seconds (1 hour)
|
||
|
||
# Time limits — global defaults, overridable per-task
|
||
CELERY_TASK_SOFT_TIME_LIMIT = 1800 # SoftTimeLimitExceeded after 30 min
|
||
CELERY_TASK_TIME_LIMIT = 2100 # hard SIGKILL after 35 min
|
||
|
||
# Reliability — late ack + single prefetch for at-least-once delivery
|
||
CELERY_TASK_ACKS_LATE = True
|
||
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
|
||
|
||
# Logging
|
||
CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO') # separate from app/Django level
|
||
|
||
# Optional — disable for production
|
||
# AUTO_EMBED_DOCUMENTS = True # set False to suppress signal-triggered embedding
|
||
|
||
# Optional — task routing (see Infrastructure Configuration for queue examples)
|
||
# CELERY_TASK_ROUTES = { ... }
|
||
```
|
||
|
||
---
|
||
|
||
## Testing
|
||
|
||
```python
|
||
from django.test import TestCase, override_settings
|
||
|
||
|
||
@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
|
||
class EmbedDocumentTaskTest(TestCase):
|
||
def test_happy_path(self):
|
||
"""Task embeds a document and returns success."""
|
||
# arrange: create Document, LLMModel fixtures
|
||
result = embed_document_task(document_id=doc.id)
|
||
self.assertTrue(result['success'])
|
||
self.assertGreater(result['chunks_created'], 0)
|
||
doc.refresh_from_db()
|
||
self.assertEqual(doc.review_status, 'pending')
|
||
|
||
def test_document_not_found(self):
|
||
"""Task returns success=False for a missing document ID."""
|
||
result = embed_document_task(document_id=999999)
|
||
self.assertFalse(result['success'])
|
||
self.assertIn('not found', result['error'])
|
||
|
||
def test_no_embedding_model(self):
|
||
"""Task returns success=False when no embedding model is available."""
|
||
# arrange: no LLMModel with is_system_default=True
|
||
result = embed_document_task(document_id=doc.id)
|
||
self.assertFalse(result['success'])
|
||
|
||
|
||
@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
|
||
class BatchJobTest(TestCase):
|
||
def test_job_reaches_completed_status(self):
|
||
"""Batch job transitions from pending → processing → completed."""
|
||
job = RFPBatchJob.objects.create(...)
|
||
batch_generate_responder_answers(rfp_id=rfp.pk, user_id=user.pk, job_id=job.pk)
|
||
job.refresh_from_db()
|
||
self.assertEqual(job.status, RFPBatchJob.STATUS_COMPLETED)
|
||
|
||
def test_duplicate_job_guard(self):
|
||
"""A second dispatch when a job is already active is rejected by the view."""
|
||
# arrange: one active job
|
||
response = self.client.post(dispatch_url)
|
||
self.assertContains(response, 'already running', status_code=400)
|
||
```
|