Celery使用介绍
已于 2025年04月25日 14:05 修改
访问次数:0
Celery 是一个强大的 分布式任务队列,非常适合用于处理异步任务、定时任务以及后台任务等。它支持多种消息代理(如 Redis、RabbitMQ)和结果存储(如 Redis、数据库等)。以下是 Celery 使用的一个完整总结:
🛠️ Celery 基本使用方法
1. 安装 Celery 和消息代理
pip install celery
pip install redis # 以 Redis 为例
2. 配置 Celery(celery.py)
在你的项目根目录下创建一个 celery.py 文件,设置 Celery 配置。
# your_project/celery.py
from celery import Celery
# 设置 Django 项目的默认设置
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
# 从 Django 设置加载配置(可以根据需要修改)
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现任务
app.autodiscover_tasks()
在项目的 __init__.py 文件中引入:
from .celery import app as celery_app
__all__ = ('celery_app',)
3. 配置 Django 设置(settings.py)
# settings.py
# 设置消息代理(此处以 Redis 为例)
CELERY_BROKER_URL = 'redis://localhost:6379/0'
# 任务结果存储后端(也可以使用 Redis、数据库等)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1'
# 默认时区
CELERY_TIMEZONE = 'Asia/Shanghai'
# 使用 Django Celery Beat 来定时任务
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
4. 编写 Celery 任务
创建任务文件 tasks.py,例如在 Django 应用的目录下:
# tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
5. 调用任务
5.1 延迟调用任务(delay)
使用 delay 是触发 Celery 任务的最简单方法,它会立即将任务发送到 Celery Worker。
from myapp.tasks import add
# 异步执行任务
add.delay(4, 6)
5.2 使用 apply_async (更多控制)
apply_async 允许你更细致地控制任务执行,比如延迟执行、任务过期等。
from myapp.tasks import add
from datetime import timedelta
# 延迟 10 秒后执行
add.apply_async(args=[4, 6], countdown=10)
# 在指定时间执行(例如:5分钟后)
from datetime import datetime
eta_time = datetime(2023, 5, 1, 10, 30)
add.apply_async(args=[4, 6], eta=eta_time)
6. 启动 Celery Worker
使用以下命令启动 Celery Worker,处理任务:
celery -A your_project worker --loglevel=info
6.1 启动 Celery Beat(如果需要定时任务)
celery -A your_project beat --loglevel=info
7. 定时任务(Celery Beat)
Celery Beat 允许你设置周期性任务,例如每分钟执行、每天执行等。
7.1 在 settings.py 中配置定时任务
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'every-10-seconds': {
'task': 'myapp.tasks.add',
'schedule': 10.0, # 每10秒执行一次
'args': (4, 6),
},
'every-midnight': {
'task': 'myapp.tasks.add',
'schedule': crontab(minute=0, hour=0), # 每天午夜执行
'args': (5, 5),
},
}
7.2 在 admin.py 中配置 Celery Beat
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.utils.timezone import now
# 创建周期性任务
schedule, created = IntervalSchedule.objects.get_or_create(every=10, period=IntervalSchedule.SECONDS)
PeriodicTask.objects.create(interval=schedule, name="Task every 10 seconds", task="myapp.tasks.add", args='[4, 6]')
你还可以使用 Django 管理后台(Admin)来创建和管理定时任务。
🔑 高级功能
1. 任务重试机制(retry)
Celery 支持任务失败后的重试。你可以在任务函数中设置重试逻辑。
from celery import shared_task
from celery.exceptions import MaxRetriesExceededError
@shared_task(bind=True, max_retries=3)
def my_task(self, x):
try:
# 假设任务有可能失败
result = x / 0
except ZeroDivisionError as exc:
raise self.retry(exc=exc, countdown=5) # 5秒后重试
2. 任务结果(get())
如果你需要获取任务的返回结果,可以使用 get() 方法。它会阻塞直到任务执行完毕并返回结果。
result = add.apply_async(args=[4, 6])
print(result.get()) # 获取任务结果
3. 任务路由(queue)
你可以将任务发送到特定的队列,使用不同的队列来处理不同类型的任务。
add.apply_async(args=[4, 6], queue='queue_name')
4. 任务优先级(priority)
任务可以设置优先级,值越小优先级越高。
add.apply_async(args=[4, 6], priority=5)
🧠 总结
常见任务调度流程:
- 初始化 Celery:配置消息代理和结果存储。
- 定义任务:使用 @shared_task 装饰器定义任务函数。
- 异步调用任务:使用 delay() 或 apply_async() 异步执行任务。
- 启动 Worker:启动 Celery Worker 进程来处理任务。
- 定时任务:配置 Celery Beat 来定期执行任务。
常用配置:
- 消息代理:CELERY_BROKER_URL(通常使用 Redis 或 RabbitMQ)
- 结果存储:CELERY_RESULT_BACKEND(通常使用 Redis)
- 定时任务:使用 celery-beat,可以设置周期性任务。
Celery 可以帮助你处理后台任务、延迟任务、定时任务等,适用于各种需要异步或定时执行的场景。
如果你有具体的任务场景或问题,欢迎继续提问,我可以为你提供更详细的代码示例或解决方案!
评论(0)