Celery动态添加task
已于 2025年07月28日 00:02 修改
访问次数:0
在 Celery 中,worker 启动之后动态添加任务(即:不重启 worker 就添加新的任务函数)是一个相对高级的话题。以下是几种 实用方案:
✅ 方案一:使用共享任务 @shared_task + 动态导入模块
关键点:只要你的模块能被 Celery 找到,并注册为任务,worker 是可以动态加载新任务的,无需重启 worker,但有几个前提。
✅ 操作步骤:
- 确保 worker 启动时有 --autoscale 或 --without-gossip 以避免任务追踪缓存问题。
- 使用 @shared_task 定义新任务
- 通过 Python 脚本/接口动态将新任务模块导入(import)到当前 Celery 的上下文
示例:
1️⃣ celery_conf.py (或 celery.py)
from celery import Celery
app = Celery('myapp')
app.config_from_object('celeryconfig') # 加载配置
2️⃣ 动态创建任务(例如,在某个视图或脚本中)
from celery import shared_task
import types
import sys
def register_dynamic_task(task_name: str, code: str):
# 动态定义函数
exec(code) # 例如:def new_task(x, y): return x + y
func = locals().get(task_name)
if not func:
raise ValueError("任务函数没有定义")
# 添加为共享任务
shared = shared_task(func)
# 将任务绑定到全局模块中供 Celery 使用
mod = types.ModuleType(f"dynamic_tasks.{task_name}")
setattr(mod, task_name, shared)
sys.modules[f"dynamic_tasks.{task_name}"] = mod
return shared
示例调用:
code = '''
def dynamic_add(x, y):
return x + y
'''
register_dynamic_task("dynamic_add", code)
调用任务:
from celery import current_app
current_app.send_task("dynamic_tasks.dynamic_add", args=(1, 2))
✅ 方案二:通过 task.send_task() 调用未提前注册的任务
只要你能通过 send_task(task_name) 发送,worker 能识别并导入模块,就可以执行。
示例:
celery -A myapp worker --loglevel=info
# 假设你有 myapp/tasks/special.py 中定义了一个新任务
# special.py
from celery import shared_task
@shared_task(name='myapp.tasks.special.add')
def add(x, y):
return x + y
添加任务(不重启 worker):
from celery import Celery
app = Celery('myapp')
app.send_task('myapp.tasks.special.add', args=(3, 4))
只要 myapp.tasks.special 是 worker 路径中可以导入的模块,就能正常运行。
🛑 注意事项
- Worker 启动时只会自动注册已加载模块中的任务,新的模块需要显式导入或已被某处引用。
- 如果你使用 --without-mingle --without-gossip --without-heartbeat 启动 worker,会避免 celery 内部的缓存影响你动态注册的任务。
- 在动态代码中注册任务后,调用时使用完整路径,例如 dynamic_tasks.dynamic_add。
🧩 实际用法场景
- 多租户系统,不同租户的任务不一样。
- 在线平台支持用户上传任务代码,运行 sandbox 中的 celery 任务。
- AIOps / 自动化平台动态生成任务逻辑。
❓是否需要热更新?
如果你想做到“热更新任务代码”,还可以:
- 结合文件监控(如 watchdog)+ 自动 reload task 模块
- 使用 Redis 或数据库保存任务代码,定期动态注册
需要完整项目示例或者具体接口方式(例如 Flask/Django 中注册任务),可以告诉我,我可以继续给你写完整例子。
评论(0)