- Implemented custom form widgets for date, time, and datetime fields with DaisyUI styling. - Created utility functions for formatting dates, times, and numbers according to user preferences. - Developed views for profile settings, API key management, and notifications, including health check endpoints. - Added URL configurations for Themis tests and main application routes. - Established test cases for custom widgets to ensure proper functionality and integration. - Defined project metadata and dependencies in pyproject.toml for package management.
26 KiB
Async Task Pattern v1.0.0
Defines how Spelunker Django apps implement background task processing using Celery, RabbitMQ, Memcached, and Flower — covering fire-and-forget tasks, long-running batch jobs, signal-triggered tasks, and periodic scheduled tasks.
🐾 Red Panda Approval™
This pattern follows Red Panda Approval standards.
Why a Pattern, Not a Shared Implementation
Long-running work in Spelunker spans multiple domains, each with distinct progress-tracking and state requirements:
- A
solution_librarydocument embedding task needs to updatereview_statuson aDocumentand count vector chunks created. - An
rfp_managerbatch job tracks per-question progress, per-question errors, and the Celery task ID on anRFPBatchJobrecord. - An
llm_managerAPI-validation task iterates over all active APIs and accumulates model sync statistics. - A
solution_librarydocumentation-source sync task fires from a View, storescelery_task_idon aSyncJob, and reports incremental progress via a callback.
Instead, this pattern defines:
- Required task interface — every task must have a namespaced name, a structured return dict, and structured logging.
- Recommended job-tracking fields — most tasks that represent a significant unit of work should have a corresponding DB job record.
- Error handling conventions — how to catch, log, and reflect failures back to the record.
- Dispatch variants — signal-triggered, admin action, view-triggered, and periodic (Beat).
- Infrastructure conventions — broker, result backend, serialization, and cache settings.
Required Task Interface
Every Celery task in Spelunker must:
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(name='<app_label>.<action_name>')
def my_task(primary_id: int, user_id: int = None) -> dict:
"""One-line description of what this task does."""
try:
# ... do work ...
logger.info(f"Task succeeded for {primary_id}")
return {'success': True, 'id': primary_id}
except Exception as e:
logger.error(
f"Task failed for {primary_id}: {type(e).__name__}: {e}",
extra={'id': primary_id, 'error': str(e)},
exc_info=True,
)
return {'success': False, 'id': primary_id, 'error': str(e)}
| Requirement | Rule |
|---|---|
name |
Must be '<app_label>.<action>', e.g., 'solution_library.embed_document' |
| Return value | Always a dict with at minimum {'success': bool} |
| Logging | Use structured extra={} kwargs; never silence exceptions silently |
| Import style | Use @shared_task, not direct app.task references |
| Idempotency | Tasks must be safe to re-execute with the same arguments (broker redelivery, worker crash). Use update_or_create, check-before-write, or guard with the job record's status before re-processing. |
| Arguments | Pass only JSON-serialisable primitives (PKs, strings, numbers). Never pass ORM instances. |
Retry & Time-Limit Policy
Tasks that call external services (LLM APIs, S3, remote URLs) should declare automatic retries for transient failures. Tasks must also set time limits to prevent hung workers.
Recommended Retry Decorator
@shared_task(
name='<app_label>.<action>',
bind=True,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=60, # first retry after 60 s, then 120 s, 240 s …
retry_backoff_max=600, # cap at 10 minutes
retry_jitter=True, # add randomness to avoid thundering herd
max_retries=3,
soft_time_limit=1800, # raise SoftTimeLimitExceeded after 30 min
time_limit=2100, # hard-kill after 35 min
)
def my_task(self, primary_id: int, ...):
...
| Setting | Purpose | Guideline |
|---|---|---|
autoretry_for |
Exception classes that trigger an automatic retry | Use for transient errors only (network, timeout). Never for ValueError or business-logic errors. |
retry_backoff |
Seconds before first retry (doubles each attempt) | 60 s is a reasonable default for external API calls. |
max_retries |
Maximum retry attempts | 3 for API calls; 0 (no retry) for user-triggered batch jobs that track their own progress. |
soft_time_limit |
Raises SoftTimeLimitExceeded — allows graceful cleanup |
Set on every task. Catch it to mark the job record as failed. |
time_limit |
Hard SIGKILL — last resort |
Set 5–10 min above soft_time_limit. |
Handling SoftTimeLimitExceeded
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(bind=True, soft_time_limit=1800, time_limit=2100, ...)
def long_running_task(self, job_id: int):
job = MyJob.objects.get(id=job_id)
try:
for item in items:
process(item)
except SoftTimeLimitExceeded:
logger.warning(f"Job {job_id} hit soft time limit — marking as failed")
job.status = 'failed'
job.completed_at = timezone.now()
job.save()
return {'success': False, 'job_id': job_id, 'error': 'Time limit exceeded'}
Note: Batch jobs in
rfp_managerdo not useautoretry_forbecause they track per-question progress and should not re-run the entire batch. Instead, individual question failures are logged and the batch continues.
Standard Values / Conventions
Task Name Registry
| App | Task name | Trigger |
|---|---|---|
solution_library |
solution_library.embed_document |
Signal / admin action |
solution_library |
solution_library.embed_documents_batch |
Admin action |
solution_library |
solution_library.sync_documentation_source |
View / admin action |
solution_library |
solution_library.sync_all_documentation_sources |
Celery Beat (periodic) |
rfp_manager |
rfp_manager.summarize_information_document |
Admin action |
rfp_manager |
rfp_manager.batch_generate_responder_answers |
View |
rfp_manager |
rfp_manager.batch_generate_reviewer_answers |
View |
llm_manager |
llm_manager.validate_all_llm_apis |
Celery Beat (periodic) |
llm_manager |
llm_manager.validate_single_api |
Admin action |
Job Status Choices (DB Job Records)
STATUS_PENDING = 'pending'
STATUS_PROCESSING = 'processing'
STATUS_COMPLETED = 'completed'
STATUS_FAILED = 'failed'
STATUS_CANCELLED = 'cancelled' # optional — used by rfp_manager
Recommended Job-Tracking Fields
Tasks that represent a significant unit of work should write their state to a DB model. These are the recommended fields:
class MyJobModel(models.Model):
# Celery linkage
celery_task_id = models.CharField(
max_length=255, blank=True,
help_text="Celery task ID for Flower monitoring"
)
# Status lifecycle
status = models.CharField(
max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING
)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
# Audit
started_by = models.ForeignKey(
User, on_delete=models.PROTECT, related_name='+'
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
# Error accumulation
errors = models.JSONField(default=list)
class Meta:
indexes = [
models.Index(fields=['celery_task_id']),
models.Index(fields=['-created_at']),
]
For batch jobs that process many items, add counter fields:
total_items = models.IntegerField(default=0)
processed_items = models.IntegerField(default=0)
successful_items = models.IntegerField(default=0)
failed_items = models.IntegerField(default=0)
def get_progress_percentage(self) -> int:
if self.total_items == 0:
return 0
return int((self.processed_items / self.total_items) * 100)
def is_stale(self, timeout_minutes: int = 30) -> bool:
"""True if stuck in pending/processing without recent updates."""
if self.status not in (self.STATUS_PENDING, self.STATUS_PROCESSING):
return False
return (timezone.now() - self.updated_at).total_seconds() > (timeout_minutes * 60)
Variant 1 — Fire-and-Forget (Signal-Triggered)
Automatically dispatch a task whenever a model record is saved. Used by solution_library to kick off embedding whenever a Document is created.
# solution_library/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.conf import settings
@receiver(post_save, sender=Document)
def trigger_document_embedding(sender, instance, created, **kwargs):
if not created:
return
if not getattr(settings, 'AUTO_EMBED_DOCUMENTS', True):
return
from solution_library.tasks import embed_document_task # avoid circular import
from django.db import transaction
def _dispatch():
try:
task = embed_document_task.delay(
document_id=instance.id,
embedding_model_id=instance.embedding_model_id or None,
user_id=None,
)
logger.info(f"Queued embedding task {task.id} for document {instance.id}")
except Exception as e:
logger.error(f"Failed to queue embedding task for document {instance.id}: {e}")
# Dispatch AFTER the transaction commits so the worker can read the row
transaction.on_commit(_dispatch)
The corresponding task updates the record's status field at start and completion:
@shared_task(name='solution_library.embed_document')
def embed_document_task(document_id: int, embedding_model_id: int = None, user_id: int = None):
document = Document.objects.get(id=document_id)
document.review_status = 'processing'
document.save(update_fields=['review_status', 'embedding_model'])
# ... perform work ...
document.review_status = 'pending'
document.save(update_fields=['review_status'])
return {'success': True, 'document_id': document_id, 'chunks_created': count}
Variant 2 — Long-Running Batch Job (View or Admin Triggered)
Used by rfp_manager for multi-hour batch RAG processing. The outer transaction creates the DB job record first, then dispatches the Celery task, passing the job's PK.
# rfp_manager/views.py (dispatch)
from django.db import transaction
job = RFPBatchJob.objects.create(
rfp=rfp,
started_by=request.user,
job_type=RFPBatchJob.JOB_TYPE_RESPONDER,
status=RFPBatchJob.STATUS_PENDING,
)
def _dispatch():
task = batch_generate_responder_answers.delay(rfp.pk, request.user.pk, job.pk)
# Save the Celery task ID for Flower cross-reference
job.celery_task_id = task.id
job.save(update_fields=['celery_task_id'])
# IMPORTANT: dispatch after the transaction commits so the worker
# can read the job row. Without this, the worker may receive the
# message before the row is visible, causing DoesNotExist.
transaction.on_commit(_dispatch)
Inside the task, use bind=True to get the Celery task ID:
@shared_task(bind=True, name='rfp_manager.batch_generate_responder_answers')
def batch_generate_responder_answers(self, rfp_id: int, user_id: int, job_id: int):
job = RFPBatchJob.objects.get(id=job_id)
job.status = RFPBatchJob.STATUS_PROCESSING
job.started_at = timezone.now()
job.celery_task_id = self.request.id # authoritative Celery ID
job.save()
for item in items_to_process:
try:
# ... process item ...
job.processed_questions += 1
job.successful_questions += 1
job.save(update_fields=['processed_questions', 'successful_questions', 'updated_at'])
except Exception as e:
job.add_error(item, str(e))
job.status = RFPBatchJob.STATUS_COMPLETED
job.completed_at = timezone.now()
job.save()
return {'success': True, 'job_id': job_id}
Variant 3 — Progress-Callback Task (View or Admin Triggered)
Used by solution_library's sync_documentation_source_task when an underlying synchronous service needs to stream incremental progress updates back to the DB.
@shared_task(bind=True, name='solution_library.sync_documentation_source')
def sync_documentation_source_task(self, source_id: int, user_id: int, job_id: int):
job = SyncJob.objects.get(id=job_id)
job.status = SyncJob.STATUS_PROCESSING
job.started_at = timezone.now()
job.celery_task_id = self.request.id
job.save(update_fields=['status', 'started_at', 'celery_task_id', 'updated_at'])
def update_progress(created, updated, skipped, processed, total):
job.documents_created = created
job.documents_updated = updated
job.documents_skipped = skipped
job.save(update_fields=['documents_created', 'documents_updated',
'documents_skipped', 'updated_at'])
result = sync_documentation_source(source_id, user_id, progress_callback=update_progress)
job.status = SyncJob.STATUS_COMPLETED if result.status == 'completed' else SyncJob.STATUS_FAILED
job.completed_at = timezone.now()
job.save()
return {'success': True, 'job_id': job_id}
Variant 4 — Periodic Task (Celery Beat)
Used by llm_manager for hourly/daily API validation and by solution_library for nightly source syncs. Schedule via django-celery-beat in Django admin (no hardcoded schedules in code).
@shared_task(name='llm_manager.validate_all_llm_apis')
def validate_all_llm_apis():
"""Periodic task: validate all active LLM APIs and refresh model lists."""
active_apis = LLMApi.objects.filter(is_active=True)
results = {'tested': 0, 'successful': 0, 'failed': 0, 'details': []}
for api in active_apis:
results['tested'] += 1
try:
result = test_llm_api(api)
if result['success']:
results['successful'] += 1
else:
results['failed'] += 1
except Exception as e:
results['failed'] += 1
logger.error(f"Error validating {api.name}: {e}", exc_info=True)
return results
@shared_task(name='solution_library.sync_all_documentation_sources')
def sync_all_sources_task():
"""Periodic task: queue a sync for every active documentation source."""
sources = DocumentationSource.objects.all()
system_user = User.objects.filter(is_superuser=True).first()
for source in sources:
# Skip if an active sync job already exists
if SyncJob.objects.filter(source=source,
status__in=[SyncJob.STATUS_PENDING,
SyncJob.STATUS_PROCESSING]).exists():
continue
job = SyncJob.objects.create(source=source, started_by=system_user,
status=SyncJob.STATUS_PENDING)
sync_documentation_source_task.delay(source.id, system_user.id, job.id)
return {'queued': queued, 'skipped': skipped}
Infrastructure Configuration
spelunker/celery.py — App Entry Point
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "spelunker.settings")
app = Celery("spelunker")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks() # auto-discovers tasks.py in every INSTALLED_APP
settings.py — Celery Settings
# Broker and result backend — supplied via environment variables
CELERY_BROKER_URL = env('CELERY_BROKER_URL') # amqp://spelunker:<pw>@rabbitmq:5672/spelunker
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') # rpc://
# Serialization — JSON only (no pickle)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = env('TIME_ZONE')
# Result expiry — critical when using rpc:// backend.
# Uncollected results accumulate in worker memory without this.
CELERY_RESULT_EXPIRES = 3600 # 1 hour; safe because we store state in DB job records
# Global time limits (can be overridden per-task with decorator args)
CELERY_TASK_SOFT_TIME_LIMIT = 1800 # 30 min soft limit → SoftTimeLimitExceeded
CELERY_TASK_TIME_LIMIT = 2100 # 35 min hard kill
# Late ack: acknowledge messages AFTER task completes, not before.
# If a worker crashes mid-task, the broker redelivers the message.
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # fetch one task at a time per worker slot
# Separate logging level for Celery vs. application code
CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO')
CELERY_TASK_ACKS_LATE: Combined with idempotent tasks, this provides at-least-once delivery. If a worker process is killed (OOM, deployment), the message returns to the queue and another worker picks it up. This is why idempotency is a hard requirement.
settings.py — Memcached (Django Cache)
Memcached is the Django HTTP-layer cache (sessions, view caching). It is not used as a Celery result backend.
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.memcached.PyMemcacheCache",
"LOCATION": env('KVDB_LOCATION'), # memcached:11211
"KEY_PREFIX": env('KVDB_PREFIX'), # spelunker
"TIMEOUT": 300,
}
}
INSTALLED_APPS — Required
INSTALLED_APPS = [
...
'django_celery_beat', # DB-backed periodic task scheduler (Beat)
...
]
docker-compose.yml — Service Topology
| Service | Image | Purpose |
|---|---|---|
rabbitmq |
rabbitmq:3-management-alpine |
AMQP message broker |
memcached |
memcached:1.6-alpine |
Django HTTP cache |
worker |
spelunker:latest |
Celery worker (--concurrency=4) |
scheduler |
spelunker:latest |
Celery Beat with DatabaseScheduler |
flower |
mher/flower:latest |
Task monitoring UI (port 5555) |
Task Routing / Queues (Recommended)
By default all tasks run in the celery default queue. For production deployments, separate CPU-heavy work from I/O-bound work:
# settings.py
CELERY_TASK_ROUTES = {
'solution_library.embed_document': {'queue': 'embedding'},
'solution_library.embed_documents_batch': {'queue': 'embedding'},
'rfp_manager.batch_generate_*': {'queue': 'batch'},
'llm_manager.validate_*': {'queue': 'default'},
}
# docker-compose.yml — separate workers per queue
worker-default:
command: celery -A spelunker worker -Q default --concurrency=4
worker-embedding:
command: celery -A spelunker worker -Q embedding --concurrency=2
worker-batch:
command: celery -A spelunker worker -Q batch --concurrency=2
This prevents a burst of embedding tasks from starving time-sensitive API validation, and lets you scale each queue independently.
Database Connection Management
Celery workers are long-lived processes. Django DB connections can become stale between tasks. Set CONN_MAX_AGE to 0 (the Django default) so connections are closed after each request cycle, or use a connection pooler like PgBouncer. Celery's worker_pool_restarts and Django's close_old_connections() (called automatically by Celery's Django fixup) handle cleanup between tasks.
Domain Extension Examples
solution_library App
Three task types: single-document embed, batch embed, and documentation-source sync. The single-document task is also triggered by a post_save signal for automatic processing on upload.
# Auto-embed on create (signal)
embed_document_task.delay(document_id=instance.id, ...)
# Manual batch from admin action
embed_documents_batch_task.delay(document_ids=[1, 2, 3], ...)
# Source sync from view (with progress callback)
sync_documentation_source_task.delay(source_id=..., user_id=..., job_id=...)
rfp_manager App
Two-stage pipeline: responder answers first, reviewer answers second. Each stage is a separate Celery batch job. Both check for an existing active job before dispatching to prevent duplicate runs.
# Guard against duplicate jobs before dispatch
if RFPBatchJob.objects.filter(
rfp=rfp,
job_type=RFPBatchJob.JOB_TYPE_RESPONDER,
status__in=[RFPBatchJob.STATUS_PENDING, RFPBatchJob.STATUS_PROCESSING]
).exists():
# surface error to user
...
# Stage 1
batch_generate_responder_answers.delay(rfp.pk, user.pk, job.pk)
# Stage 2 (after Stage 1 is complete)
batch_generate_reviewer_answers.delay(rfp.pk, user.pk, job.pk)
llm_manager App
Stateless periodic task — no DB job record needed because results are written directly to the LLMApi and LLMModel objects.
# Triggered by Celery Beat; schedule managed via django-celery-beat admin
validate_all_llm_apis.delay()
# Triggered from admin action for a single API
validate_single_api.delay(api_id=api.pk)
Anti-Patterns
- ❌ Don't use
rpc://result backend for tasks where the caller never retrieves the result — the result accumulates in memory. Spelunker mitigates this by storing state in DB job records rather than reading Celery results. Always setCELERY_RESULT_EXPIRES. - ❌ Don't pass full model instances as task arguments — pass PKs only. Celery serialises arguments as JSON; ORM objects are not JSON serialisable.
- ❌ Don't share the same
celery_task_idbetween the dispatch call and the task'sself.request.idwithout re-saving. The dispatchAsyncResult.idand the in-taskself.request.idare the same value; write it from inside the task usingbind=Trueas the authoritative source. - ❌ Don't silence exceptions with bare
except: pass— always log errors and reflect failure status onto the DB record. - ❌ Don't skip the duplicate-job guard when the task is triggered from a view or admin action. Without it, double-clicking a submit button can queue two identical jobs.
- ❌ Don't use
CELERY_TASK_SERIALIZER = 'pickle'— JSON only, to prevent arbitrary code execution via crafted task payloads. - ❌ Don't hardcode periodic task schedules in code via
app.conf.beat_schedule— usedjango_celery_beatand manage schedules in Django admin so they survive deployments. - ❌ Don't call
.delay()inside a database transaction — usetransaction.on_commit(). The worker may receive the message before the row is committed, causingDoesNotExist. - ❌ Don't write non-idempotent tasks — workers may crash and brokers may redeliver. A re-executed task must produce the same result (or safely no-op).
- ❌ Don't omit time limits — a hung external API call (LLM, S3) will block a worker slot forever. Always set
soft_time_limitandtime_limit. - ❌ Don't retry business-logic errors with
autoretry_for— only retry transient failures (network errors, timeouts). AValueErrororDoesNotExistwill never succeed on retry.
Migration / Adoption
When adding a new Celery task to an existing app:
- Create
<app>/tasks.pyusing@shared_task, not@app.task. - Name the task
'<app_label>.<action>'. - If the task is long-running, create a DB job model with the recommended fields above.
- Register the app in
INSTALLED_APPS(required forautodiscover_tasks). - For periodic tasks, add a schedule record via Django admin → Periodic Tasks (django-celery-beat) rather than in code.
- Add a test that confirms the task can be called synchronously with
CELERY_TASK_ALWAYS_EAGER = True.
Settings
# settings.py
# Required — broker and result backend
CELERY_BROKER_URL = env('CELERY_BROKER_URL') # amqp://user:pw@host:5672/vhost
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') # rpc://
# Serialization (do not change)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = env('TIME_ZONE') # must match Django TIME_ZONE
# Result expiry — prevents unbounded memory growth with rpc:// backend
CELERY_RESULT_EXPIRES = 3600 # seconds (1 hour)
# Time limits — global defaults, overridable per-task
CELERY_TASK_SOFT_TIME_LIMIT = 1800 # SoftTimeLimitExceeded after 30 min
CELERY_TASK_TIME_LIMIT = 2100 # hard SIGKILL after 35 min
# Reliability — late ack + single prefetch for at-least-once delivery
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
# Logging
CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO') # separate from app/Django level
# Optional — disable for production
# AUTO_EMBED_DOCUMENTS = True # set False to suppress signal-triggered embedding
# Optional — task routing (see Infrastructure Configuration for queue examples)
# CELERY_TASK_ROUTES = { ... }
Testing
from django.test import TestCase, override_settings
@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
class EmbedDocumentTaskTest(TestCase):
def test_happy_path(self):
"""Task embeds a document and returns success."""
# arrange: create Document, LLMModel fixtures
result = embed_document_task(document_id=doc.id)
self.assertTrue(result['success'])
self.assertGreater(result['chunks_created'], 0)
doc.refresh_from_db()
self.assertEqual(doc.review_status, 'pending')
def test_document_not_found(self):
"""Task returns success=False for a missing document ID."""
result = embed_document_task(document_id=999999)
self.assertFalse(result['success'])
self.assertIn('not found', result['error'])
def test_no_embedding_model(self):
"""Task returns success=False when no embedding model is available."""
# arrange: no LLMModel with is_system_default=True
result = embed_document_task(document_id=doc.id)
self.assertFalse(result['success'])
@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
class BatchJobTest(TestCase):
def test_job_reaches_completed_status(self):
"""Batch job transitions from pending → processing → completed."""
job = RFPBatchJob.objects.create(...)
batch_generate_responder_answers(rfp_id=rfp.pk, user_id=user.pk, job_id=job.pk)
job.refresh_from_db()
self.assertEqual(job.status, RFPBatchJob.STATUS_COMPLETED)
def test_duplicate_job_guard(self):
"""A second dispatch when a job is already active is rejected by the view."""
# arrange: one active job
response = self.client.post(dispatch_url)
self.assertContains(response, 'already running', status_code=400)