celery+queue+exchange+router使用方法

好的,我来帮你把 Celery + Redis + 队列 + 交换机 + 路由 的配置和使用方法完整梳理成一份清晰流程,按概念 → 配置 → 使用 → 调试顺序总结。



一、概念梳理

概念作用说明
Broker(Redis)任务传输Celery 使用 Redis 或 RabbitMQ 做任务中间件,存储待执行任务。
队列 Queue任务存放容器任务被发送到队列里,Worker 从队列里消费。可以有默认队列,也可以自定义队列。
交换机 Exchange决定任务路由Celery 使用 AMQP 概念,如果使用 Redis,exchange 名义上存在,实际主要用 routing_key。
路由 Router控制任务发送到哪个队列可以按任务名指定任务发到特定队列。通过 CELERY_ROUTES 配置。
Worker执行任务Worker 监听指定队列,消费队列中的任务。


二、配置流程(Django + Celery + Redis)

1. 安装依赖

pip install celery redis


2. 在 Django 中创建 celery.py

# myproject/celery.py
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 从 Django 配置加载 Celery 配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现 tasks.py 模块
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


3. Django settings.py 中配置 Celery

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

# 任务序列化
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_SERIALIZER = 'json'

# 队列配置
from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
)

# 路由配置(按任务名分发队列)
CELERY_ROUTES = {
    'myapp.tasks.add': {'queue': 'high_priority', 'routing_key': 'high_priority'},
}

# 可选:强制导入非 tasks.py 模块
CELERY_IMPORTS = ('myapp.celery_tasks',)


4. 创建任务模块

# myapp/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y
  • 非 tasks.py 模块:如果模块名不是 tasks.py,必须用 CELERY_IMPORTS 或手动导入。


三、使用流程

1. 发送任务

from myapp.tasks import add

# 异步调用
add.delay(2, 3)
  • Celery 根据 CELERY_ROUTES 决定任务进入哪个队列。
  • 如果没有匹配路由,任务进入 CELERY_DEFAULT_QUEUE。


2. 启动 Worker

# 监听默认队列和自定义队列
python manage.py celery worker -l info -Q default,high_priority
  • Worker 只会消费它监听的队列里的任务。
  • 自定义队列必须显式在 Worker 中指定。


3. 调试

  1. 查看 Redis 队列里的任务:
redis-cli -n 0 LRANGE default 0 -1
redis-cli -n 0 LRANGE high_priority 0 -1
  1. 查看 Worker 日志:
python manage.py celery worker -l info -Q default,high_priority -E
  • -E 打印任务事件,方便排查任务是否被 Worker 收到。
  1. 在 shell 查看已注册任务:
from myproject.celery import app
app.autodiscover_tasks()
list(app.tasks.keys())


四、完整使用流程图(文字版)

task.delay() --> 根据 CELERY_ROUTES 决定 queue --> Redis Broker 对应 list --> Worker 监听 queue --> 任务被消费 --> 执行任务


✅ 总结要点

  1. Broker:存任务,Redis/MQ
  2. 队列:任务容器,自定义队列必须 Worker 指定
  3. 路由:控制任务走哪条队列
  4. Worker:执行任务,必须监听目标队列
  5. 任务注册:默认扫描 tasks.py,非默认模块需 CELERY_IMPORTS 或手动导入
  6. 调试:检查 Redis 队列、Worker 日志、任务注册状态


如果你需要,我可以帮你画一张 Celery + Redis + 自定义队列 + 路由 + Worker 消费流程图,直观展示任务从发送到执行的全流程。

你希望我画吗?

文章标签:

评论(0)