celery+queue+exchange+router使用方法
已于 2025年08月14日 15:02 修改
访问次数:0
好的,我来帮你把 Celery + Redis + 队列 + 交换机 + 路由 的配置和使用方法完整梳理成一份清晰流程,按概念 → 配置 → 使用 → 调试顺序总结。
一、概念梳理
| 概念 | 作用 | 说明 |
|---|---|---|
| Broker(Redis) | 任务传输 | Celery 使用 Redis 或 RabbitMQ 做任务中间件,存储待执行任务。 |
| 队列 Queue | 任务存放容器 | 任务被发送到队列里,Worker 从队列里消费。可以有默认队列,也可以自定义队列。 |
| 交换机 Exchange | 决定任务路由 | Celery 使用 AMQP 概念,如果使用 Redis,exchange 名义上存在,实际主要用 routing_key。 |
| 路由 Router | 控制任务发送到哪个队列 | 可以按任务名指定任务发到特定队列。通过 CELERY_ROUTES 配置。 |
| Worker | 执行任务 | Worker 监听指定队列,消费队列中的任务。 |
二、配置流程(Django + Celery + Redis)
1. 安装依赖
pip install celery redis
2. 在 Django 中创建 celery.py
# myproject/celery.py
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 从 Django 配置加载 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现 tasks.py 模块
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
3. Django settings.py 中配置 Celery
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
# 任务序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_SERIALIZER = 'json'
# 队列配置
from kombu import Exchange, Queue
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
)
# 路由配置(按任务名分发队列)
CELERY_ROUTES = {
'myapp.tasks.add': {'queue': 'high_priority', 'routing_key': 'high_priority'},
}
# 可选:强制导入非 tasks.py 模块
CELERY_IMPORTS = ('myapp.celery_tasks',)
4. 创建任务模块
# myapp/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
- 非 tasks.py 模块:如果模块名不是 tasks.py,必须用 CELERY_IMPORTS 或手动导入。
三、使用流程
1. 发送任务
from myapp.tasks import add
# 异步调用
add.delay(2, 3)
- Celery 根据 CELERY_ROUTES 决定任务进入哪个队列。
- 如果没有匹配路由,任务进入 CELERY_DEFAULT_QUEUE。
2. 启动 Worker
# 监听默认队列和自定义队列
python manage.py celery worker -l info -Q default,high_priority
- Worker 只会消费它监听的队列里的任务。
- 自定义队列必须显式在 Worker 中指定。
3. 调试
- 查看 Redis 队列里的任务:
redis-cli -n 0 LRANGE default 0 -1
redis-cli -n 0 LRANGE high_priority 0 -1
- 查看 Worker 日志:
python manage.py celery worker -l info -Q default,high_priority -E
- -E 打印任务事件,方便排查任务是否被 Worker 收到。
- 在 shell 查看已注册任务:
from myproject.celery import app
app.autodiscover_tasks()
list(app.tasks.keys())
四、完整使用流程图(文字版)
task.delay() --> 根据 CELERY_ROUTES 决定 queue --> Redis Broker 对应 list --> Worker 监听 queue --> 任务被消费 --> 执行任务
✅ 总结要点
- Broker:存任务,Redis/MQ
- 队列:任务容器,自定义队列必须 Worker 指定
- 路由:控制任务走哪条队列
- Worker:执行任务,必须监听目标队列
- 任务注册:默认扫描 tasks.py,非默认模块需 CELERY_IMPORTS 或手动导入
- 调试:检查 Redis 队列、Worker 日志、任务注册状态
如果你需要,我可以帮你画一张 Celery + Redis + 自定义队列 + 路由 + Worker 消费流程图,直观展示任务从发送到执行的全流程。
你希望我画吗?
评论(0)