Celery使用介绍

Celery 是一个强大的 分布式任务队列,非常适合用于处理异步任务、定时任务以及后台任务等。它支持多种消息代理(如 Redis、RabbitMQ)和结果存储(如 Redis、数据库等)。以下是 Celery 使用的一个完整总结:



🛠️ Celery 基本使用方法

1. 安装 Celery 和消息代理

pip install celery
pip install redis  # 以 Redis 为例


2. 配置 Celery(celery.py

在你的项目根目录下创建一个 celery.py 文件,设置 Celery 配置。

# your_project/celery.py
from celery import Celery

# 设置 Django 项目的默认设置
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')

# 从 Django 设置加载配置(可以根据需要修改)
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现任务
app.autodiscover_tasks()

在项目的 __init__.py 文件中引入:

from .celery import app as celery_app

__all__ = ('celery_app',)


3. 配置 Django 设置(settings.py

# settings.py

# 设置消息代理(此处以 Redis 为例)
CELERY_BROKER_URL = 'redis://localhost:6379/0'

# 任务结果存储后端(也可以使用 Redis、数据库等)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'

# 默认时区
CELERY_TIMEZONE = 'Asia/Shanghai'

# 使用 Django Celery Beat 来定时任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'


4. 编写 Celery 任务

创建任务文件 tasks.py,例如在 Django 应用的目录下:

# tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y


5. 调用任务

5.1 延迟调用任务(delay

使用 delay 是触发 Celery 任务的最简单方法,它会立即将任务发送到 Celery Worker。

from myapp.tasks import add

# 异步执行任务
add.delay(4, 6)

5.2 使用 apply_async (更多控制)

apply_async 允许你更细致地控制任务执行,比如延迟执行、任务过期等。

from myapp.tasks import add
from datetime import timedelta

# 延迟 10 秒后执行
add.apply_async(args=[4, 6], countdown=10)

# 在指定时间执行(例如:5分钟后)
from datetime import datetime
eta_time = datetime(2023, 5, 1, 10, 30)
add.apply_async(args=[4, 6], eta=eta_time)


6. 启动 Celery Worker

使用以下命令启动 Celery Worker,处理任务:

celery -A your_project worker --loglevel=info

6.1 启动 Celery Beat(如果需要定时任务)

celery -A your_project beat --loglevel=info


7. 定时任务(Celery Beat)

Celery Beat 允许你设置周期性任务,例如每分钟执行、每天执行等。

7.1 在 settings.py 中配置定时任务

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'every-10-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': 10.0,  # 每10秒执行一次
        'args': (4, 6),
    },
    'every-midnight': {
        'task': 'myapp.tasks.add',
        'schedule': crontab(minute=0, hour=0),  # 每天午夜执行
        'args': (5, 5),
    },
}

7.2 在 admin.py 中配置 Celery Beat

from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.utils.timezone import now

# 创建周期性任务
schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(interval=schedule, name="Task every 10 seconds", task="myapp.tasks.add", args='[4, 6]')

你还可以使用 Django 管理后台(Admin)来创建和管理定时任务。



🔑 高级功能

1. 任务重试机制retry

Celery 支持任务失败后的重试。你可以在任务函数中设置重试逻辑。

from celery import shared_task
from celery.exceptions import MaxRetriesExceededError

@shared_task(bind=True, max_retries=3)
def my_task(self, x):
    try:
        # 假设任务有可能失败
        result = x / 0
    except ZeroDivisionError as exc:
        raise self.retry(exc=exc, countdown=5)  # 5秒后重试

2. 任务结果get()

如果你需要获取任务的返回结果,可以使用 get() 方法。它会阻塞直到任务执行完毕并返回结果。

result = add.apply_async(args=[4, 6])
print(result.get())  # 获取任务结果

3. 任务路由queue

你可以将任务发送到特定的队列,使用不同的队列来处理不同类型的任务。

add.apply_async(args=[4, 6], queue='queue_name')

4. 任务优先级priority

任务可以设置优先级,值越小优先级越高。

add.apply_async(args=[4, 6], priority=5)


🧠 总结

常见任务调度流程:

  1. 初始化 Celery:配置消息代理和结果存储。
  2. 定义任务:使用 @shared_task 装饰器定义任务函数。
  3. 异步调用任务:使用 delay() 或 apply_async() 异步执行任务。
  4. 启动 Worker:启动 Celery Worker 进程来处理任务。
  5. 定时任务:配置 Celery Beat 来定期执行任务。

常用配置:

  • 消息代理:CELERY_BROKER_URL(通常使用 Redis 或 RabbitMQ)
  • 结果存储:CELERY_RESULT_BACKEND(通常使用 Redis)
  • 定时任务:使用 celery-beat,可以设置周期性任务。

Celery 可以帮助你处理后台任务、延迟任务、定时任务等,适用于各种需要异步或定时执行的场景。

如果你有具体的任务场景或问题,欢迎继续提问,我可以为你提供更详细的代码示例或解决方案!

文章标签:

评论(0)