
关于
Django + Celery 异步任务模式——配置、任务设计、Beat 调度、重试、Canvas 工作流、监控和测试。适用于向 Django 应用添加后台作业、定时任务或异步处理。
name: django-celery description: Django + Celery 异步任务模式——配置、任务设计、定时调度、重试、Canvas 工作流、监控和测试。在向 Django 应用添加后台作业、定时任务或异步处理时使用。 origin: ECC
Django + Celery 异步任务模式
在 Django 中使用 Celery 配合 Redis 或 RabbitMQ 进行生产级后台任务处理的模式。
何时激活
- 向 Django 应用添加后台作业或异步处理
- 实现周期性/定时任务
- 将慢操作(邮件、PDF 生成、API 调用)从请求周期中卸载
- 设置 Celery Beat 进行类 cron 调度
- 调试任务失败、重试或队列积压
- 为 Celery 任务编写测试
项目设置
安装
pip install celery[redis] django-celery-results django-celery-beat
celery.py — 应用入口
# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.development')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks() # Discovers tasks.py in each INSTALLED_APP
@app.task(bind=True, ignore_result=True)
def debug_task(self):
print(f'Request: {self.request!r}')
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
Django 设置
# config/settings/base.py
# Broker(生产环境推荐 Redis)
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='django-db')
# 序列化
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 任务行为
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 硬限制:30 分钟
CELERY_TASK_SOFT_TIME_LIMIT = 25 * 60 # 软限制:发送 SoftTimeLimitExceeded
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # 防止 worker 囤积长任务
CELERY_TASK_ACKS_LATE = True # worker 崩溃时重新入队
# 结果持久化
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 保留结果 24 小时
# Beat 调度器(周期性任务)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 已安装应用
INSTALLED_APPS += [
'django_celery_results',
'django_celery_beat',
]
运行 Worker
# 启动 worker(开发环境)
celery -A config worker --loglevel=info
# 启动 beat 调度器(周期性任务)
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 合并 worker + beat(仅开发环境,生产环境禁止)
celery -A config worker --beat --loglevel=info
# 生产环境:多 worker 并发
celery -A config worker --loglevel=warning --concurrency=4 -Q default,high_priority
任务设计模式
基本任务
# apps/notifications/tasks.py
from celery import shared_task
import logging
logger = logging.getLogger(__name__)
@shared_task(name='notifications.send_welcome_email')
def send_welcome_email(user_id: int) -> None:
"""Send welcome email to newly registered user."""
from apps.users.models import User
from apps.notifications.services import EmailService
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.warning('send_welcome_email: user %s not found', user_id)
return
EmailService.send_welcome(user)
logger.info('Welcome email sent to user %s', user_id)
可重试任务
@shared_task(
bind=True,
name='integrations.sync_to_crm',
max_retries=5,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def sync_contact_to_crm(self, contact_id: int) -> dict:
"""Sync contact to external CRM with retry on transient failures."""
from apps.crm.services import CRMClient
try:
result = CRMClient().sync(contact_id)
return result
except CRMClient.RateLimitError as exc:
raise self.retry(exc=exc, countdown=int(exc.retry_after))
幂等任务模式
设计任务使其可以安全地使用相同输入多次运行:
@shared_task(name='orders.mark_shipped')
def mark_order_shipped(order_id: int, tracking_number: str) -> None:
"""Mark order as shipped - safe to run multiple times."""
from apps.orders.models import Order
updated = Order.objects.filter(
pk=order_id,
status=Order.Status.PROCESSING,
).update(
status=Order.Status.SHIPPED,
tracking_number=tracking_number,
)
if not updated:
logger.info('Order %s already shipped or not in PROCESSING', order_id)
兼容工具
Claude CodeCursor
标签
后端开发

