
About
Django + Celery async task patterns — configuration, task design, beat scheduling, retries, canvas workflows, monitoring, and testing. Use when adding background jobs, scheduled tasks, or async processing to a Django app.
name: django-celery description: Django + Celery async task patterns — configuration, task design, beat scheduling, retries, canvas workflows, monitoring, and testing. Use when adding background jobs, scheduled tasks, or async processing to a Django app. origin: ECC
Django + Celery Async Task Patterns
Production-grade patterns for background task processing in Django using Celery with Redis or RabbitMQ.
When to Activate
- Adding background jobs or async processing to a Django app
- Implementing periodic/scheduled tasks
- Offloading slow operations (email, PDF generation, API calls) from request cycle
- Setting up Celery Beat for cron-like scheduling
- Debugging task failures, retries, or queue backlogs
- Writing tests for Celery tasks
Project Setup
Installation
pip install celery[redis] django-celery-results django-celery-beat
celery.py — App Entrypoint
# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
Django Settings
# config/settings/base.py
# Broker (Redis recommended for production)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
# Serialization
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# Task behavior
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # Hard limit: 30 min
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # Soft limit: sends SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # Prevent worker hoarding long tasks
CELERY_TASK_ACKS_LATE = True # Re-queue on worker crash
# Result persistence
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # Keep results 24 hours
# Beat scheduler (for periodic tasks)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# Installed apps
INSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
Running Workers
# Start worker (development)
celery -A config worker --loglevel=info
# Start beat scheduler (periodic tasks)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Combined worker + beat (dev only, never production)
celery -A config worker --beat --loglevel=info
# Production: multiple workers with concurrency
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
Task Design Patterns
Basic Task
# apps/notifications/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""Send welcome email to newly registered user."""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: user %s not found', user_id)
return # Idempotent — do not raise, task already impossible to complete
EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)
Retryable Task
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60, # seconds before first retry
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # exponential backoff
retry_backoff_max=600, # cap at 10 minutes
retry_jitter=True, # randomise to avoid thundering herd
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""Sync contact to external CRM with retry on transient failures."""
from apps.crm.services import CRMClient
try:
result = CRMClient().sync(contact_id)
return result
except CRMClient.RateLimitError as exc:
# Specific retry delay from response header
raise self.retry(exc=exc, countdown=int(exc.retry_after))
Idempotent Task Pattern
Design tasks so they can safely run multiple times with the same inputs:
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
"""Mark order as shipped — safe to run multiple times."""
from apps.orders.models import Order
updated = Order.objects.filter(
pk=order_id,
status=Order.Status.PROCESSING, # Guard: only update if not already shipped
).update(
status
Compatible Tools
Claude CodeCursor
Tags
Backend

