Files
mnemosyne/docs/Pattern_Async-TASK_V1-00.md
Robert Helewka 99bdb4ac92 Add Themis application with custom widgets, views, and utilities
- Implemented custom form widgets for date, time, and datetime fields with DaisyUI styling.
- Created utility functions for formatting dates, times, and numbers according to user preferences.
- Developed views for profile settings, API key management, and notifications, including health check endpoints.
- Added URL configurations for Themis tests and main application routes.
- Established test cases for custom widgets to ensure proper functionality and integration.
- Defined project metadata and dependencies in pyproject.toml for package management.
2026-03-21 02:00:18 +00:00

26 KiB
Raw Permalink Blame History

Async Task Pattern v1.0.0

Defines how Spelunker Django apps implement background task processing using Celery, RabbitMQ, Memcached, and Flower — covering fire-and-forget tasks, long-running batch jobs, signal-triggered tasks, and periodic scheduled tasks.

🐾 Red Panda Approval™

This pattern follows Red Panda Approval standards.


Why a Pattern, Not a Shared Implementation

Long-running work in Spelunker spans multiple domains, each with distinct progress-tracking and state requirements:

  • A solution_library document embedding task needs to update review_status on a Document and count vector chunks created.
  • An rfp_manager batch job tracks per-question progress, per-question errors, and the Celery task ID on an RFPBatchJob record.
  • An llm_manager API-validation task iterates over all active APIs and accumulates model sync statistics.
  • A solution_library documentation-source sync task fires from a View, stores celery_task_id on a SyncJob, and reports incremental progress via a callback.

Instead, this pattern defines:

  • Required task interface — every task must have a namespaced name, a structured return dict, and structured logging.
  • Recommended job-tracking fields — most tasks that represent a significant unit of work should have a corresponding DB job record.
  • Error handling conventions — how to catch, log, and reflect failures back to the record.
  • Dispatch variants — signal-triggered, admin action, view-triggered, and periodic (Beat).
  • Infrastructure conventions — broker, result backend, serialization, and cache settings.

Required Task Interface

Every Celery task in Spelunker must:

from celery import shared_task
import logging

logger = logging.getLogger(__name__)

@shared_task(name='<app_label>.<action_name>')
def my_task(primary_id: int, user_id: int = None) -> dict:
    """One-line description of what this task does."""
    try:
        # ... do work ...
        logger.info(f"Task succeeded for {primary_id}")
        return {'success': True, 'id': primary_id}

    except Exception as e:
        logger.error(
            f"Task failed for {primary_id}: {type(e).__name__}: {e}",
            extra={'id': primary_id, 'error': str(e)},
            exc_info=True,
        )
        return {'success': False, 'id': primary_id, 'error': str(e)}
Requirement Rule
name Must be '<app_label>.<action>', e.g., 'solution_library.embed_document'
Return value Always a dict with at minimum {'success': bool}
Logging Use structured extra={} kwargs; never silence exceptions silently
Import style Use @shared_task, not direct app.task references
Idempotency Tasks must be safe to re-execute with the same arguments (broker redelivery, worker crash). Use update_or_create, check-before-write, or guard with the job record's status before re-processing.
Arguments Pass only JSON-serialisable primitives (PKs, strings, numbers). Never pass ORM instances.

Retry & Time-Limit Policy

Tasks that call external services (LLM APIs, S3, remote URLs) should declare automatic retries for transient failures. Tasks must also set time limits to prevent hung workers.

@shared_task(
    name='<app_label>.<action>',
    bind=True,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=60,        # first retry after 60 s, then 120 s, 240 s …
    retry_backoff_max=600,   # cap at 10 minutes
    retry_jitter=True,       # add randomness to avoid thundering herd
    max_retries=3,
    soft_time_limit=1800,    # raise SoftTimeLimitExceeded after 30 min
    time_limit=2100,         # hard-kill after 35 min
)
def my_task(self, primary_id: int, ...):
    ...
Setting Purpose Guideline
autoretry_for Exception classes that trigger an automatic retry Use for transient errors only (network, timeout). Never for ValueError or business-logic errors.
retry_backoff Seconds before first retry (doubles each attempt) 60 s is a reasonable default for external API calls.
max_retries Maximum retry attempts 3 for API calls; 0 (no retry) for user-triggered batch jobs that track their own progress.
soft_time_limit Raises SoftTimeLimitExceeded — allows graceful cleanup Set on every task. Catch it to mark the job record as failed.
time_limit Hard SIGKILL — last resort Set 510 min above soft_time_limit.

Handling SoftTimeLimitExceeded

from celery.exceptions import SoftTimeLimitExceeded

@shared_task(bind=True, soft_time_limit=1800, time_limit=2100, ...)
def long_running_task(self, job_id: int):
    job = MyJob.objects.get(id=job_id)
    try:
        for item in items:
            process(item)
    except SoftTimeLimitExceeded:
        logger.warning(f"Job {job_id} hit soft time limit — marking as failed")
        job.status = 'failed'
        job.completed_at = timezone.now()
        job.save()
        return {'success': False, 'job_id': job_id, 'error': 'Time limit exceeded'}

Note: Batch jobs in rfp_manager do not use autoretry_for because they track per-question progress and should not re-run the entire batch. Instead, individual question failures are logged and the batch continues.


Standard Values / Conventions

Task Name Registry

App Task name Trigger
solution_library solution_library.embed_document Signal / admin action
solution_library solution_library.embed_documents_batch Admin action
solution_library solution_library.sync_documentation_source View / admin action
solution_library solution_library.sync_all_documentation_sources Celery Beat (periodic)
rfp_manager rfp_manager.summarize_information_document Admin action
rfp_manager rfp_manager.batch_generate_responder_answers View
rfp_manager rfp_manager.batch_generate_reviewer_answers View
llm_manager llm_manager.validate_all_llm_apis Celery Beat (periodic)
llm_manager llm_manager.validate_single_api Admin action

Job Status Choices (DB Job Records)

STATUS_PENDING    = 'pending'
STATUS_PROCESSING = 'processing'
STATUS_COMPLETED  = 'completed'
STATUS_FAILED     = 'failed'
STATUS_CANCELLED  = 'cancelled'   # optional — used by rfp_manager

Tasks that represent a significant unit of work should write their state to a DB model. These are the recommended fields:

class MyJobModel(models.Model):
    # Celery linkage
    celery_task_id = models.CharField(
        max_length=255, blank=True,
        help_text="Celery task ID for Flower monitoring"
    )

    # Status lifecycle
    status = models.CharField(
        max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING
    )
    started_at = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    # Audit
    started_by = models.ForeignKey(
        User, on_delete=models.PROTECT, related_name='+'
    )
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    # Error accumulation
    errors = models.JSONField(default=list)

    class Meta:
        indexes = [
            models.Index(fields=['celery_task_id']),
            models.Index(fields=['-created_at']),
        ]

For batch jobs that process many items, add counter fields:

    total_items      = models.IntegerField(default=0)
    processed_items  = models.IntegerField(default=0)
    successful_items = models.IntegerField(default=0)
    failed_items     = models.IntegerField(default=0)

    def get_progress_percentage(self) -> int:
        if self.total_items == 0:
            return 0
        return int((self.processed_items / self.total_items) * 100)

    def is_stale(self, timeout_minutes: int = 30) -> bool:
        """True if stuck in pending/processing without recent updates."""
        if self.status not in (self.STATUS_PENDING, self.STATUS_PROCESSING):
            return False
        return (timezone.now() - self.updated_at).total_seconds() > (timeout_minutes * 60)

Variant 1 — Fire-and-Forget (Signal-Triggered)

Automatically dispatch a task whenever a model record is saved. Used by solution_library to kick off embedding whenever a Document is created.

# solution_library/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.conf import settings

@receiver(post_save, sender=Document)
def trigger_document_embedding(sender, instance, created, **kwargs):
    if not created:
        return
    if not getattr(settings, 'AUTO_EMBED_DOCUMENTS', True):
        return

    from solution_library.tasks import embed_document_task   # avoid circular import

    from django.db import transaction

    def _dispatch():
        try:
            task = embed_document_task.delay(
                document_id=instance.id,
                embedding_model_id=instance.embedding_model_id or None,
                user_id=None,
            )
            logger.info(f"Queued embedding task {task.id} for document {instance.id}")
        except Exception as e:
            logger.error(f"Failed to queue embedding task for document {instance.id}: {e}")

    # Dispatch AFTER the transaction commits so the worker can read the row
    transaction.on_commit(_dispatch)

The corresponding task updates the record's status field at start and completion:

@shared_task(name='solution_library.embed_document')
def embed_document_task(document_id: int, embedding_model_id: int = None, user_id: int = None):
    document = Document.objects.get(id=document_id)
    document.review_status = 'processing'
    document.save(update_fields=['review_status', 'embedding_model'])

    # ... perform work ...

    document.review_status = 'pending'
    document.save(update_fields=['review_status'])
    return {'success': True, 'document_id': document_id, 'chunks_created': count}

Variant 2 — Long-Running Batch Job (View or Admin Triggered)

Used by rfp_manager for multi-hour batch RAG processing. The outer transaction creates the DB job record first, then dispatches the Celery task, passing the job's PK.

# rfp_manager/views.py  (dispatch)
from django.db import transaction

job = RFPBatchJob.objects.create(
    rfp=rfp,
    started_by=request.user,
    job_type=RFPBatchJob.JOB_TYPE_RESPONDER,
    status=RFPBatchJob.STATUS_PENDING,
)

def _dispatch():
    task = batch_generate_responder_answers.delay(rfp.pk, request.user.pk, job.pk)
    # Save the Celery task ID for Flower cross-reference
    job.celery_task_id = task.id
    job.save(update_fields=['celery_task_id'])

# IMPORTANT: dispatch after the transaction commits so the worker
# can read the job row. Without this, the worker may receive the
# message before the row is visible, causing DoesNotExist.
transaction.on_commit(_dispatch)

Inside the task, use bind=True to get the Celery task ID:

@shared_task(bind=True, name='rfp_manager.batch_generate_responder_answers')
def batch_generate_responder_answers(self, rfp_id: int, user_id: int, job_id: int):
    job = RFPBatchJob.objects.get(id=job_id)
    job.status = RFPBatchJob.STATUS_PROCESSING
    job.started_at = timezone.now()
    job.celery_task_id = self.request.id   # authoritative Celery ID
    job.save()

    for item in items_to_process:
        try:
            # ... process item ...
            job.processed_questions += 1
            job.successful_questions += 1
            job.save(update_fields=['processed_questions', 'successful_questions', 'updated_at'])
        except Exception as e:
            job.add_error(item, str(e))

    job.status = RFPBatchJob.STATUS_COMPLETED
    job.completed_at = timezone.now()
    job.save()
    return {'success': True, 'job_id': job_id}

Variant 3 — Progress-Callback Task (View or Admin Triggered)

Used by solution_library's sync_documentation_source_task when an underlying synchronous service needs to stream incremental progress updates back to the DB.

@shared_task(bind=True, name='solution_library.sync_documentation_source')
def sync_documentation_source_task(self, source_id: int, user_id: int, job_id: int):
    job = SyncJob.objects.get(id=job_id)
    job.status = SyncJob.STATUS_PROCESSING
    job.started_at = timezone.now()
    job.celery_task_id = self.request.id
    job.save(update_fields=['status', 'started_at', 'celery_task_id', 'updated_at'])

    def update_progress(created, updated, skipped, processed, total):
        job.documents_created = created
        job.documents_updated = updated
        job.documents_skipped = skipped
        job.save(update_fields=['documents_created', 'documents_updated',
                                'documents_skipped', 'updated_at'])

    result = sync_documentation_source(source_id, user_id, progress_callback=update_progress)

    job.status = SyncJob.STATUS_COMPLETED if result.status == 'completed' else SyncJob.STATUS_FAILED
    job.completed_at = timezone.now()
    job.save()
    return {'success': True, 'job_id': job_id}

Variant 4 — Periodic Task (Celery Beat)

Used by llm_manager for hourly/daily API validation and by solution_library for nightly source syncs. Schedule via django-celery-beat in Django admin (no hardcoded schedules in code).

@shared_task(name='llm_manager.validate_all_llm_apis')
def validate_all_llm_apis():
    """Periodic task: validate all active LLM APIs and refresh model lists."""
    active_apis = LLMApi.objects.filter(is_active=True)
    results = {'tested': 0, 'successful': 0, 'failed': 0, 'details': []}

    for api in active_apis:
        results['tested'] += 1
        try:
            result = test_llm_api(api)
            if result['success']:
                results['successful'] += 1
            else:
                results['failed'] += 1
        except Exception as e:
            results['failed'] += 1
            logger.error(f"Error validating {api.name}: {e}", exc_info=True)

    return results


@shared_task(name='solution_library.sync_all_documentation_sources')
def sync_all_sources_task():
    """Periodic task: queue a sync for every active documentation source."""
    sources = DocumentationSource.objects.all()
    system_user = User.objects.filter(is_superuser=True).first()

    for source in sources:
        # Skip if an active sync job already exists
        if SyncJob.objects.filter(source=source,
                                   status__in=[SyncJob.STATUS_PENDING,
                                               SyncJob.STATUS_PROCESSING]).exists():
            continue

        job = SyncJob.objects.create(source=source, started_by=system_user,
                                     status=SyncJob.STATUS_PENDING)
        sync_documentation_source_task.delay(source.id, system_user.id, job.id)

    return {'queued': queued, 'skipped': skipped}

Infrastructure Configuration

spelunker/celery.py — App Entry Point

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "spelunker.settings")

app = Celery("spelunker")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()   # auto-discovers tasks.py in every INSTALLED_APP

settings.py — Celery Settings

# Broker and result backend — supplied via environment variables
CELERY_BROKER_URL      = env('CELERY_BROKER_URL')     # amqp://spelunker:<pw>@rabbitmq:5672/spelunker
CELERY_RESULT_BACKEND  = env('CELERY_RESULT_BACKEND')  # rpc://

# Serialization — JSON only (no pickle)
CELERY_ACCEPT_CONTENT  = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE        = env('TIME_ZONE')

# Result expiry — critical when using rpc:// backend.
# Uncollected results accumulate in worker memory without this.
CELERY_RESULT_EXPIRES  = 3600   # 1 hour; safe because we store state in DB job records

# Global time limits (can be overridden per-task with decorator args)
CELERY_TASK_SOFT_TIME_LIMIT = 1800   # 30 min soft limit → SoftTimeLimitExceeded
CELERY_TASK_TIME_LIMIT      = 2100   # 35 min hard kill

# Late ack: acknowledge messages AFTER task completes, not before.
# If a worker crashes mid-task, the broker redelivers the message.
CELERY_TASK_ACKS_LATE  = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1   # fetch one task at a time per worker slot

# Separate logging level for Celery vs. application code
CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO')

CELERY_TASK_ACKS_LATE: Combined with idempotent tasks, this provides at-least-once delivery. If a worker process is killed (OOM, deployment), the message returns to the queue and another worker picks it up. This is why idempotency is a hard requirement.

settings.py — Memcached (Django Cache)

Memcached is the Django HTTP-layer cache (sessions, view caching). It is not used as a Celery result backend.

CACHES = {
    "default": {
        "BACKEND": "django.core.cache.backends.memcached.PyMemcacheCache",
        "LOCATION": env('KVDB_LOCATION'),    # memcached:11211
        "KEY_PREFIX": env('KVDB_PREFIX'),    # spelunker
        "TIMEOUT": 300,
    }
}

INSTALLED_APPS — Required

INSTALLED_APPS = [
    ...
    'django_celery_beat',   # DB-backed periodic task scheduler (Beat)
    ...
]

docker-compose.yml — Service Topology

Service Image Purpose
rabbitmq rabbitmq:3-management-alpine AMQP message broker
memcached memcached:1.6-alpine Django HTTP cache
worker spelunker:latest Celery worker (--concurrency=4)
scheduler spelunker:latest Celery Beat with DatabaseScheduler
flower mher/flower:latest Task monitoring UI (port 5555)

By default all tasks run in the celery default queue. For production deployments, separate CPU-heavy work from I/O-bound work:

# settings.py
CELERY_TASK_ROUTES = {
    'solution_library.embed_document':           {'queue': 'embedding'},
    'solution_library.embed_documents_batch':    {'queue': 'embedding'},
    'rfp_manager.batch_generate_*':              {'queue': 'batch'},
    'llm_manager.validate_*':                    {'queue': 'default'},
}
# docker-compose.yml — separate workers per queue
worker-default:
  command: celery -A spelunker worker -Q default --concurrency=4

worker-embedding:
  command: celery -A spelunker worker -Q embedding --concurrency=2

worker-batch:
  command: celery -A spelunker worker -Q batch --concurrency=2

This prevents a burst of embedding tasks from starving time-sensitive API validation, and lets you scale each queue independently.

Database Connection Management

Celery workers are long-lived processes. Django DB connections can become stale between tasks. Set CONN_MAX_AGE to 0 (the Django default) so connections are closed after each request cycle, or use a connection pooler like PgBouncer. Celery's worker_pool_restarts and Django's close_old_connections() (called automatically by Celery's Django fixup) handle cleanup between tasks.


Domain Extension Examples

solution_library App

Three task types: single-document embed, batch embed, and documentation-source sync. The single-document task is also triggered by a post_save signal for automatic processing on upload.

# Auto-embed on create (signal)
embed_document_task.delay(document_id=instance.id, ...)

# Manual batch from admin action
embed_documents_batch_task.delay(document_ids=[1, 2, 3], ...)

# Source sync from view (with progress callback)
sync_documentation_source_task.delay(source_id=..., user_id=..., job_id=...)

rfp_manager App

Two-stage pipeline: responder answers first, reviewer answers second. Each stage is a separate Celery batch job. Both check for an existing active job before dispatching to prevent duplicate runs.

# Guard against duplicate jobs before dispatch
if RFPBatchJob.objects.filter(
    rfp=rfp,
    job_type=RFPBatchJob.JOB_TYPE_RESPONDER,
    status__in=[RFPBatchJob.STATUS_PENDING, RFPBatchJob.STATUS_PROCESSING]
).exists():
    # surface error to user
    ...

# Stage 1
batch_generate_responder_answers.delay(rfp.pk, user.pk, job.pk)

# Stage 2 (after Stage 1 is complete)
batch_generate_reviewer_answers.delay(rfp.pk, user.pk, job.pk)

llm_manager App

Stateless periodic task — no DB job record needed because results are written directly to the LLMApi and LLMModel objects.

# Triggered by Celery Beat; schedule managed via django-celery-beat admin
validate_all_llm_apis.delay()

# Triggered from admin action for a single API
validate_single_api.delay(api_id=api.pk)

Anti-Patterns

  • Don't use rpc:// result backend for tasks where the caller never retrieves the result — the result accumulates in memory. Spelunker mitigates this by storing state in DB job records rather than reading Celery results. Always set CELERY_RESULT_EXPIRES.
  • Don't pass full model instances as task arguments — pass PKs only. Celery serialises arguments as JSON; ORM objects are not JSON serialisable.
  • Don't share the same celery_task_id between the dispatch call and the task's self.request.id without re-saving. The dispatch AsyncResult.id and the in-task self.request.id are the same value; write it from inside the task using bind=True as the authoritative source.
  • Don't silence exceptions with bare except: pass — always log errors and reflect failure status onto the DB record.
  • Don't skip the duplicate-job guard when the task is triggered from a view or admin action. Without it, double-clicking a submit button can queue two identical jobs.
  • Don't use CELERY_TASK_SERIALIZER = 'pickle' — JSON only, to prevent arbitrary code execution via crafted task payloads.
  • Don't hardcode periodic task schedules in code via app.conf.beat_schedule — use django_celery_beat and manage schedules in Django admin so they survive deployments.
  • Don't call .delay() inside a database transaction — use transaction.on_commit(). The worker may receive the message before the row is committed, causing DoesNotExist.
  • Don't write non-idempotent tasks — workers may crash and brokers may redeliver. A re-executed task must produce the same result (or safely no-op).
  • Don't omit time limits — a hung external API call (LLM, S3) will block a worker slot forever. Always set soft_time_limit and time_limit.
  • Don't retry business-logic errors with autoretry_for — only retry transient failures (network errors, timeouts). A ValueError or DoesNotExist will never succeed on retry.

Migration / Adoption

When adding a new Celery task to an existing app:

  1. Create <app>/tasks.py using @shared_task, not @app.task.
  2. Name the task '<app_label>.<action>'.
  3. If the task is long-running, create a DB job model with the recommended fields above.
  4. Register the app in INSTALLED_APPS (required for autodiscover_tasks).
  5. For periodic tasks, add a schedule record via Django admin → Periodic Tasks (django-celery-beat) rather than in code.
  6. Add a test that confirms the task can be called synchronously with CELERY_TASK_ALWAYS_EAGER = True.

Settings

# settings.py

# Required — broker and result backend
CELERY_BROKER_URL     = env('CELERY_BROKER_URL')      # amqp://user:pw@host:5672/vhost
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')  # rpc://

# Serialization (do not change)
CELERY_ACCEPT_CONTENT   = ['json']
CELERY_TASK_SERIALIZER  = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE         = env('TIME_ZONE')             # must match Django TIME_ZONE

# Result expiry — prevents unbounded memory growth with rpc:// backend
CELERY_RESULT_EXPIRES   = 3600                        # seconds (1 hour)

# Time limits — global defaults, overridable per-task
CELERY_TASK_SOFT_TIME_LIMIT = 1800                    # SoftTimeLimitExceeded after 30 min
CELERY_TASK_TIME_LIMIT      = 2100                    # hard SIGKILL after 35 min

# Reliability — late ack + single prefetch for at-least-once delivery
CELERY_TASK_ACKS_LATE              = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

# Logging
CELERY_LOGGING_LEVEL = env('CELERY_LOGGING_LEVEL', default='INFO')  # separate from app/Django level

# Optional — disable for production
# AUTO_EMBED_DOCUMENTS = True  # set False to suppress signal-triggered embedding

# Optional — task routing (see Infrastructure Configuration for queue examples)
# CELERY_TASK_ROUTES = { ... }

Testing

from django.test import TestCase, override_settings


@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
class EmbedDocumentTaskTest(TestCase):
    def test_happy_path(self):
        """Task embeds a document and returns success."""
        # arrange: create Document, LLMModel fixtures
        result = embed_document_task(document_id=doc.id)
        self.assertTrue(result['success'])
        self.assertGreater(result['chunks_created'], 0)
        doc.refresh_from_db()
        self.assertEqual(doc.review_status, 'pending')

    def test_document_not_found(self):
        """Task returns success=False for a missing document ID."""
        result = embed_document_task(document_id=999999)
        self.assertFalse(result['success'])
        self.assertIn('not found', result['error'])

    def test_no_embedding_model(self):
        """Task returns success=False when no embedding model is available."""
        # arrange: no LLMModel with is_system_default=True
        result = embed_document_task(document_id=doc.id)
        self.assertFalse(result['success'])


@override_settings(CELERY_TASK_ALWAYS_EAGER=True, CELERY_TASK_EAGER_PROPAGATES=True)
class BatchJobTest(TestCase):
    def test_job_reaches_completed_status(self):
        """Batch job transitions from pending → processing → completed."""
        job = RFPBatchJob.objects.create(...)
        batch_generate_responder_answers(rfp_id=rfp.pk, user_id=user.pk, job_id=job.pk)
        job.refresh_from_db()
        self.assertEqual(job.status, RFPBatchJob.STATUS_COMPLETED)

    def test_duplicate_job_guard(self):
        """A second dispatch when a job is already active is rejected by the view."""
        # arrange: one active job
        response = self.client.post(dispatch_url)
        self.assertContains(response, 'already running', status_code=400)