# Async Task Pattern v1.0.0 Defines how Spelunker Django apps implement background task processing using Celery, RabbitMQ, Memcached, and Flower β€” covering fire-and-forget tasks, long-running batch jobs, signal-triggered tasks, and periodic scheduled tasks. ## 🐾 Red Panda Approvalβ„’ This pattern follows Red Panda Approval standards. --- ## Why a Pattern, Not a Shared Implementation Long-running work in Spelunker spans multiple domains, each with distinct progress-tracking and state requirements: - A `solution_library` document embedding task needs to update `review_status` on a `Document` and count vector chunks created. - An `rfp_manager` batch job tracks per-question progress, per-question errors, and the Celery task ID on an `RFPBatchJob` record. - An `llm_manager` API-validation task iterates over all active APIs and accumulates model sync statistics. - A `solution_library` documentation-source sync task fires from a View, stores `celery_task_id` on a `SyncJob`, and reports incremental progress via a callback. Instead, this pattern defines: - **Required task interface** β€” every task must have a namespaced name, a structured return dict, and structured logging. - **Recommended job-tracking fields** β€” most tasks that represent a significant unit of work should have a corresponding DB job record. - **Error handling conventions** β€” how to catch, log, and reflect failures back to the record. - **Dispatch variants** β€” signal-triggered, admin action, view-triggered, and periodic (Beat). - **Infrastructure conventions** β€” broker, result backend, serialization, and cache settings. --- ## Required Task Interface Every Celery task in Spelunker **must**: ```python from celery import shared_task import logging logger = logging.getLogger(__name__) @shared_task(name='.') def my_task(primary_id: int, user_id: int = None) -> dict: """One-line description of what this task does.""" try: # ... do work ... logger.info(f"Task succeeded for {primary_id}") return {'success': True, 'id': primary_id} except Exception as e: logger.error( f"Task failed for {primary_id}: {type(e).__name__}: {e}", extra={'id': primary_id, 'error': str(e)}, exc_info=True, ) return {'success': False, 'id': primary_id, 'error': str(e)} ``` | Requirement | Rule | |---|---| | `name` | Must be `'.'`, e.g., `'solution_library.embed_document'` | | Return value | Always a dict with at minimum `{'success': bool}` | | Logging | Use structured `extra={}` kwargs; never silence exceptions silently | | Import style | Use `@shared_task`, not direct `app.task` references | | Idempotency | Tasks **must** be safe to re-execute with the same arguments (broker redelivery, worker crash). Use `update_or_create`, check-before-write, or guard with the job record's status before re-processing. | | Arguments | Pass only JSON-serialisable primitives (PKs, strings, numbers). Never pass ORM instances. | --- ## Retry & Time-Limit Policy Tasks that call external services (LLM APIs, S3, remote URLs) should declare automatic retries for transient failures. Tasks must also set time limits to prevent hung workers. ### Recommended Retry Decorator ```python @shared_task( name='.', bind=True, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=60, # first retry after 60 s, then 120 s, 240 s … retry_backoff_max=600, # cap at 10 minutes retry_jitter=True, # add randomness to avoid thundering herd max_retries=3, soft_time_limit=1800, # raise SoftTimeLimitExceeded after 30 min time_limit=2100, # hard-kill after 35 min ) def my_task(self, primary_id: int, ...): ... ``` | Setting | Purpose | Guideline | |---|---|---| | `autoretry_for` | Exception classes that trigger an automatic retry | Use for **transient** errors only (network, timeout). Never for `ValueError` or business-logic errors. | | `retry_backoff` | Seconds before first retry (doubles each attempt) | 60 s is a reasonable default for external API calls. | | `max_retries` | Maximum retry attempts | 3 for API calls; 0 (no retry) for user-triggered batch jobs that track their own progress. | | `soft_time_limit` | Raises `SoftTimeLimitExceeded` β€” allows graceful cleanup | Set on every task. Catch it to mark the job record as failed. | | `time_limit` | Hard `SIGKILL` β€” last resort | Set 5–10 min above `soft_time_limit`. | ### Handling `SoftTimeLimitExceeded` ```python from celery.exceptions import SoftTimeLimitExceeded @shared_task(bind=True, soft_time_limit=1800, time_limit=2100, ...) def long_running_task(self, job_id: int): job = MyJob.objects.get(id=job_id) try: for item in items: process(item) except SoftTimeLimitExceeded: logger.warning(f"Job {job_id} hit soft time limit β€” marking as failed") job.status = 'failed' job.completed_at = timezone.now() job.save() return {'success': False, 'job_id': job_id, 'error': 'Time limit exceeded'} ``` > **Note:** Batch jobs in `rfp_manager` do **not** use `autoretry_for` because they track per-question progress and should not re-run the entire batch. Instead, individual question failures are logged and the batch continues. --- ## Standard Values / Conventions ### Task Name Registry | App | Task name | Trigger | |---|---|---| | `solution_library` | `solution_library.embed_document` | Signal / admin action | | `solution_library` | `solution_library.embed_documents_batch` | Admin action | | `solution_library` | `solution_library.sync_documentation_source` | View / admin action | | `solution_library` | `solution_library.sync_all_documentation_sources` | Celery Beat (periodic) | | `rfp_manager` | `rfp_manager.summarize_information_document` | Admin action | | `rfp_manager` | `rfp_manager.batch_generate_responder_answers` | View | | `rfp_manager` | `rfp_manager.batch_generate_reviewer_answers` | View | | `llm_manager` | `llm_manager.validate_all_llm_apis` | Celery Beat (periodic) | | `llm_manager` | `llm_manager.validate_single_api` | Admin action | ### Job Status Choices (DB Job Records) ```python STATUS_PENDING = 'pending' STATUS_PROCESSING = 'processing' STATUS_COMPLETED = 'completed' STATUS_FAILED = 'failed' STATUS_CANCELLED = 'cancelled' # optional β€” used by rfp_manager ``` --- ## Recommended Job-Tracking Fields Tasks that represent a significant unit of work should write their state to a DB model. These are the recommended fields: ```python class MyJobModel(models.Model): # Celery linkage celery_task_id = models.CharField( max_length=255, blank=True, help_text="Celery task ID for Flower monitoring" ) # Status lifecycle status = models.CharField( max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING ) started_at = models.DateTimeField(null=True, blank=True) completed_at = models.DateTimeField(null=True, blank=True) # Audit started_by = models.ForeignKey( User, on_delete=models.PROTECT, related_name='+' ) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) # Error accumulation errors = models.JSONField(default=list) class Meta: indexes = [ models.Index(fields=['celery_task_id']), models.Index(fields=['-created_at']), ] ``` For batch jobs that process many items, add counter fields: ```python total_items = models.IntegerField(default=0) processed_items = models.IntegerField(default=0) successful_items = models.IntegerField(default=0) failed_items = models.IntegerField(default=0) def get_progress_percentage(self) -> int: if self.total_items == 0: return 0 return int((self.processed_items / self.total_items) * 100) def is_stale(self, timeout_minutes: int = 30) -> bool: """True if stuck in pending/processing without recent updates.""" if self.status not in (self.STATUS_PENDING, self.STATUS_PROCESSING): return False return (timezone.now() - self.updated_at).total_seconds() > (timeout_minutes * 60) ``` --- ## Variant 1 β€” Fire-and-Forget (Signal-Triggered) Automatically dispatch a task whenever a model record is saved. Used by `solution_library` to kick off embedding whenever a `Document` is created. ```python # solution_library/signals.py from django.db.models.signals import post_save from django.dispatch import receiver from django.conf import settings @receiver(post_save, sender=Document) def trigger_document_embedding(sender, instance, created, **kwargs): if not created: return if not getattr(settings, 'AUTO_EMBED_DOCUMENTS', True): return from solution_library.tasks import embed_document_task # avoid circular import from django.db import transaction def _dispatch(): try: task = embed_document_task.delay( document_id=instance.id, embedding_model_id=instance.embedding_model_id or None, user_id=None, ) logger.info(f"Queued embedding task {task.id} for document {instance.id}") except Exception as e: logger.error(f"Failed to queue embedding task for document {instance.id}: {e}") # Dispatch AFTER the transaction commits so the worker can read the row transaction.on_commit(_dispatch) ``` The corresponding task updates the record's status field at start and completion: ```python @shared_task(name='solution_library.embed_document') def embed_document_task(document_id: int, embedding_model_id: int = None, user_id: int = None): document = Document.objects.get(id=document_id) document.review_status = 'processing' document.save(update_fields=['review_status', 'embedding_model']) # ... perform work ... document.review_status = 'pending' document.save(update_fields=['review_status']) return {'success': True, 'document_id': document_id, 'chunks_created': count} ``` --- ## Variant 2 β€” Long-Running Batch Job (View or Admin Triggered) Used by `rfp_manager` for multi-hour batch RAG processing. The outer transaction creates the DB job record first, then dispatches the Celery task, passing the job's PK. ```python # rfp_manager/views.py (dispatch) from django.db import transaction job = RFPBatchJob.objects.create( rfp=rfp, started_by=request.user, job_type=RFPBatchJob.JOB_TYPE_RESPONDER, status=RFPBatchJob.STATUS_PENDING, ) def _dispatch(): task = batch_generate_responder_answers.delay(rfp.pk, request.user.pk, job.pk) # Save the Celery task ID for Flower cross-reference job.celery_task_id = task.id job.save(update_fields=['celery_task_id']) # IMPORTANT: dispatch after the transaction commits so the worker # can read the job row. Without this, the worker may receive the # message before the row is visible, causing DoesNotExist. transaction.on_commit(_dispatch) ``` Inside the task, use `bind=True` to get the Celery task ID: ```python @shared_task(bind=True, name='rfp_manager.batch_generate_responder_answers') def batch_generate_responder_answers(self, rfp_id: int, user_id: int, job_id: int): job = RFPBatchJob.objects.get(id=job_id) job.status = RFPBatchJob.STATUS_PROCESSING job.started_at = timezone.now() job.celery_task_id = self.request.id # authoritative Celery ID job.save() for item in items_to_process: try: # ... process item ... job.processed_questions += 1 job.successful_questions += 1 job.save(update_fields=['processed_questions', 'successful_questions', 'updated_at']) except Exception as e: job.add_error(item, str(e)) job.status = RFPBatchJob.STATUS_COMPLETED job.completed_at = timezone.now() job.save() return {'success': True, 'job_id': job_id} ``` --- ## Variant 3 β€” Progress-Callback Task (View or Admin Triggered) Used by `solution_library`'s `sync_documentation_source_task` when an underlying synchronous service needs to stream incremental progress updates back to the DB. ```python @shared_task(bind=True, name='solution_library.sync_documentation_source') def sync_documentation_source_task(self, source_id: int, user_id: int, job_id: int): job = SyncJob.objects.get(id=job_id) job.status = SyncJob.STATUS_PROCESSING job.started_at = timezone.now() job.celery_task_id = self.request.id job.save(update_fields=['status', 'started_at', 'celery_task_id', 'updated_at']) def update_progress(created, updated, skipped, processed, total): job.documents_created = created job.documents_updated = updated job.documents_skipped = skipped job.save(update_fields=['documents_created', 'documents_updated', 'documents_skipped', 'updated_at']) result = sync_documentation_source(source_id, user_id, progress_callback=update_progress) job.status = SyncJob.STATUS_COMPLETED if result.status == 'completed' else SyncJob.STATUS_FAILED job.completed_at = timezone.now() job.save() return {'success': True, 'job_id': job_id} ``` --- ## Variant 4 β€” Periodic Task (Celery Beat) Used by `llm_manager` for hourly/daily API validation and by `solution_library` for nightly source syncs. Schedule via django-celery-beat in Django admin (no hardcoded schedules in code). ```python @shared_task(name='llm_manager.validate_all_llm_apis') def validate_all_llm_apis(): """Periodic task: validate all active LLM APIs and refresh model lists.""" active_apis = LLMApi.objects.filter(is_active=True) results = {'tested': 0, 'successful': 0, 'failed': 0, 'details': []} for api in active_apis: results['tested'] += 1 try: result = test_llm_api(api) if result['success']: results['successful'] += 1 else: results['failed'] += 1 except Exception as e: results['failed'] += 1 logger.error(f"Error validating {api.name}: {e}", exc_info=True) return results @shared_task(name='solution_library.sync_all_documentation_sources') def sync_all_sources_task(): """Periodic task: queue a sync for every active documentation source.""" sources = DocumentationSource.objects.all() system_user = User.objects.filter(is_superuser=True).first() for source in sources: # Skip if an active sync job already exists if SyncJob.objects.filter(source=source, status__in=[SyncJob.STATUS_PENDING, SyncJob.STATUS_PROCESSING]).exists(): continue job = SyncJob.objects.create(source=source, started_by=system_user, status=SyncJob.STATUS_PENDING) sync_documentation_source_task.delay(source.id, system_user.id, job.id) return {'queued': queued, 'skipped': skipped} ``` --- ## Infrastructure Configuration ### `spelunker/celery.py` β€” App Entry Point ```python import os from celery import Celery os.environ.setdefault("DJANGO_SETTINGS_MODULE", "spelunker.settings") app = Celery("spelunker") app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks() # auto-discovers tasks.py in every INSTALLED_APP ``` ### `settings.py` β€” Celery Settings ```python # Broker and result backend β€” supplied via environment variables CELERY_BROKER_URL = env('CELERY_BROKER_URL') # amqp://spelunker:@rabbitmq:5672/spelunker CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') # rpc:// # Serialization β€” JSON only (no pickle) CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = env('TIME_ZONE') # Result expiry β€” critical when using rpc:// backend. # Uncollected results accumulate in worker memory without this. CELERY_RESULT_EXPIRES = 3600 # 1 hour; safe because we store state in DB job records # Global time limits (can be overridden per-task with decorator args) CELERY_TASK_SOFT_TIME_LIMIT = 1800 # 30 min soft limit β†’ SoftTimeLimitExceeded CELERY_TASK_TIME_LIMIT = 2100 # 35 min hard kill # Late ack: acknowledge messages AFTER task completes, not before. # If a worker crashes mid-task, the broker redelivers the message. CELERY_TASK_ACKS_LATE = True CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # fetch one task at a time per worker slot # Separate logging level for Celery vs. application code CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO') ``` > **`CELERY_TASK_ACKS_LATE`**: Combined with idempotent tasks, this provides at-least-once delivery. If a worker process is killed (OOM, deployment), the message returns to the queue and another worker picks it up. This is why idempotency is a hard requirement. ### `settings.py` β€” Memcached (Django Cache) Memcached is the Django HTTP-layer cache (sessions, view caching). It is **not** used as a Celery result backend. ```python CACHES = { "default": { "BACKEND": "django.core.cache.backends.memcached.PyMemcacheCache", "LOCATION": env('KVDB_LOCATION'), # memcached:11211 "KEY_PREFIX": env('KVDB_PREFIX'), # spelunker "TIMEOUT": 300, } } ``` ### `INSTALLED_APPS` β€” Required ```python INSTALLED_APPS = [ ... 'django_celery_beat', # DB-backed periodic task scheduler (Beat) ... ] ``` ### `docker-compose.yml` β€” Service Topology | Service | Image | Purpose | |---|---|---| | `rabbitmq` | `rabbitmq:3-management-alpine` | AMQP message broker | | `memcached` | `memcached:1.6-alpine` | Django HTTP cache | | `worker` | `spelunker:latest` | Celery worker (`--concurrency=4`) | | `scheduler` | `spelunker:latest` | Celery Beat with `DatabaseScheduler` | | `flower` | `mher/flower:latest` | Task monitoring UI (port 5555) | ### Task Routing / Queues (Recommended) By default all tasks run in the `celery` default queue. For production deployments, separate CPU-heavy work from I/O-bound work: ```python # settings.py CELERY_TASK_ROUTES = { 'solution_library.embed_document': {'queue': 'embedding'}, 'solution_library.embed_documents_batch': {'queue': 'embedding'}, 'rfp_manager.batch_generate_*': {'queue': 'batch'}, 'llm_manager.validate_*': {'queue': 'default'}, } ``` ```yaml # docker-compose.yml β€” separate workers per queue worker-default: command: celery -A spelunker worker -Q default --concurrency=4 worker-embedding: command: celery -A spelunker worker -Q embedding --concurrency=2 worker-batch: command: celery -A spelunker worker -Q batch --concurrency=2 ``` This prevents a burst of embedding tasks from starving time-sensitive API validation, and lets you scale each queue independently. ### Database Connection Management Celery workers are long-lived processes. Django DB connections can become stale between tasks. Set `CONN_MAX_AGE` to `0` (the Django default) so connections are closed after each request cycle, or use a connection pooler like PgBouncer. Celery's `worker_pool_restarts` and Django's `close_old_connections()` (called automatically by Celery's Django fixup) handle cleanup between tasks. --- ## Domain Extension Examples ### `solution_library` App Three task types: single-document embed, batch embed, and documentation-source sync. The single-document task is also triggered by a `post_save` signal for automatic processing on upload. ```python # Auto-embed on create (signal) embed_document_task.delay(document_id=instance.id, ...) # Manual batch from admin action embed_documents_batch_task.delay(document_ids=[1, 2, 3], ...) # Source sync from view (with progress callback) sync_documentation_source_task.delay(source_id=..., user_id=..., job_id=...) ``` ### `rfp_manager` App Two-stage pipeline: responder answers first, reviewer answers second. Each stage is a separate Celery batch job. Both check for an existing active job before dispatching to prevent duplicate runs. ```python # Guard against duplicate jobs before dispatch if RFPBatchJob.objects.filter( rfp=rfp, job_type=RFPBatchJob.JOB_TYPE_RESPONDER, status__in=[RFPBatchJob.STATUS_PENDING, RFPBatchJob.STATUS_PROCESSING] ).exists(): # surface error to user ... # Stage 1 batch_generate_responder_answers.delay(rfp.pk, user.pk, job.pk) # Stage 2 (after Stage 1 is complete) batch_generate_reviewer_answers.delay(rfp.pk, user.pk, job.pk) ``` ### `llm_manager` App Stateless periodic task β€” no DB job record needed because results are written directly to the `LLMApi` and `LLMModel` objects. ```python # Triggered by Celery Beat; schedule managed via django-celery-beat admin validate_all_llm_apis.delay() # Triggered from admin action for a single API validate_single_api.delay(api_id=api.pk) ``` --- ## Anti-Patterns - ❌ Don't use `rpc://` result backend for tasks where the caller never retrieves the result β€” the result accumulates in memory. Spelunker mitigates this by storing state in DB job records rather than reading Celery results. Always set `CELERY_RESULT_EXPIRES`. - ❌ Don't pass full model instances as task arguments β€” pass PKs only. Celery serialises arguments as JSON; ORM objects are not JSON serialisable. - ❌ Don't share the same `celery_task_id` between the dispatch call and the task's `self.request.id` without re-saving. The dispatch `AsyncResult.id` and the in-task `self.request.id` are the same value; write it from **inside** the task using `bind=True` as the authoritative source. - ❌ Don't silence exceptions with bare `except: pass` β€” always log errors and reflect failure status onto the DB record. - ❌ Don't skip the duplicate-job guard when the task is triggered from a view or admin action. Without it, double-clicking a submit button can queue two identical jobs. - ❌ Don't use `CELERY_TASK_SERIALIZER = 'pickle'` β€” JSON only, to prevent arbitrary code execution via crafted task payloads. - ❌ Don't hardcode periodic task schedules in code via `app.conf.beat_schedule` β€” use `django_celery_beat` and manage schedules in Django admin so they survive deployments. - ❌ Don't call `.delay()` inside a database transaction β€” use `transaction.on_commit()`. The worker may receive the message before the row is committed, causing `DoesNotExist`. - ❌ Don't write non-idempotent tasks β€” workers may crash and brokers may redeliver. A re-executed task must produce the same result (or safely no-op). - ❌ Don't omit time limits β€” a hung external API call (LLM, S3) will block a worker slot forever. Always set `soft_time_limit` and `time_limit`. - ❌ Don't retry business-logic errors with `autoretry_for` β€” only retry **transient** failures (network errors, timeouts). A `ValueError` or `DoesNotExist` will never succeed on retry. --- ## Migration / Adoption When adding a new Celery task to an existing app: 1. Create `/tasks.py` using `@shared_task`, not `@app.task`. 2. Name the task `'.'`. 3. If the task is long-running, create a DB job model with the recommended fields above. 4. Register the app in `INSTALLED_APPS` (required for `autodiscover_tasks`). 5. For periodic tasks, add a schedule record via Django admin β†’ Periodic Tasks (django-celery-beat) rather than in code. 6. Add a test that confirms the task can be called synchronously with `CELERY_TASK_ALWAYS_EAGER = True`. --- ## Settings ```python # settings.py # Required β€” broker and result backend CELERY_BROKER_URL = env('CELERY_BROKER_URL') # amqp://user:pw@host:5672/vhost CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND') # rpc:// # Serialization (do not change) CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = env('TIME_ZONE') # must match Django TIME_ZONE # Result expiry β€” prevents unbounded memory growth with rpc:// backend CELERY_RESULT_EXPIRES = 3600 # seconds (1 hour) # Time limits β€” global defaults, overridable per-task CELERY_TASK_SOFT_TIME_LIMIT = 1800 # SoftTimeLimitExceeded after 30 min CELERY_TASK_TIME_LIMIT = 2100 # hard SIGKILL after 35 min # Reliability β€” late ack + single prefetch for at-least-once delivery CELERY_TASK_ACKS_LATE = True CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Logging CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO') # separate from app/Django level # Optional β€” disable for production # AUTO_EMBED_DOCUMENTS = True # set False to suppress signal-triggered embedding # Optional β€” task routing (see Infrastructure Configuration for queue examples) # CELERY_TASK_ROUTES = { ... } ``` --- ## Testing ```python from django.test import TestCase, override_settings @override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True) class EmbedDocumentTaskTest(TestCase): def test_happy_path(self): """Task embeds a document and returns success.""" # arrange: create Document, LLMModel fixtures result = embed_document_task(document_id=doc.id) self.assertTrue(result['success']) self.assertGreater(result['chunks_created'], 0) doc.refresh_from_db() self.assertEqual(doc.review_status, 'pending') def test_document_not_found(self): """Task returns success=False for a missing document ID.""" result = embed_document_task(document_id=999999) self.assertFalse(result['success']) self.assertIn('not found', result['error']) def test_no_embedding_model(self): """Task returns success=False when no embedding model is available.""" # arrange: no LLMModel with is_system_default=True result = embed_document_task(document_id=doc.id) self.assertFalse(result['success']) @override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True) class BatchJobTest(TestCase): def test_job_reaches_completed_status(self): """Batch job transitions from pending β†’ processing β†’ completed.""" job = RFPBatchJob.objects.create(...) batch_generate_responder_answers(rfp_id=rfp.pk, user_id=user.pk, job_id=job.pk) job.refresh_from_db() self.assertEqual(job.status, RFPBatchJob.STATUS_COMPLETED) def test_duplicate_job_guard(self): """A second dispatch when a job is already active is rejected by the view.""" # arrange: one active job response = self.client.post(dispatch_url) self.assertContains(response, 'already running', status_code=400) ```