Celery动态添加task

在 Celery 中,worker 启动之后动态添加任务(即:不重启 worker 就添加新的任务函数)是一个相对高级的话题。以下是几种 实用方案



✅ 方案一:使用共享任务 @shared_task + 动态导入模块

关键点:只要你的模块能被 Celery 找到,并注册为任务,worker 是可以动态加载新任务的,无需重启 worker,但有几个前提。

✅ 操作步骤:

  1. 确保 worker 启动时有 --autoscale 或 --without-gossip 以避免任务追踪缓存问题。
  2. 使用 @shared_task 定义新任务
  3. 通过 Python 脚本/接口动态将新任务模块导入(import)到当前 Celery 的上下文


示例:

1️⃣ celery_conf.py (或 celery.py)

from celery import Celery

app = Celery('myapp')
app.config_from_object('celeryconfig')  # 加载配置

2️⃣ 动态创建任务(例如,在某个视图或脚本中)

from celery import shared_task
import types
import sys

def register_dynamic_task(task_name: str, code: str):
    # 动态定义函数
    exec(code)  # 例如:def new_task(x, y): return x + y

    func = locals().get(task_name)
    if not func:
        raise ValueError("任务函数没有定义")

    # 添加为共享任务
    shared = shared_task(func)

    # 将任务绑定到全局模块中供 Celery 使用
    mod = types.ModuleType(f"dynamic_tasks.{task_name}")
    setattr(mod, task_name, shared)
    sys.modules[f"dynamic_tasks.{task_name}"] = mod

    return shared

示例调用:

code = '''
def dynamic_add(x, y):
    return x + y
'''

register_dynamic_task("dynamic_add", code)

调用任务:

from celery import current_app

current_app.send_task("dynamic_tasks.dynamic_add", args=(1, 2))


✅ 方案二:通过 task.send_task() 调用未提前注册的任务

只要你能通过 send_task(task_name) 发送,worker 能识别并导入模块,就可以执行。

示例:

celery -A myapp worker --loglevel=info
# 假设你有 myapp/tasks/special.py 中定义了一个新任务
# special.py
from celery import shared_task

@shared_task(name='myapp.tasks.special.add')
def add(x, y):
    return x + y

添加任务(不重启 worker):

from celery import Celery

app = Celery('myapp')
app.send_task('myapp.tasks.special.add', args=(3, 4))

只要 myapp.tasks.special 是 worker 路径中可以导入的模块,就能正常运行。



🛑 注意事项

  1. Worker 启动时只会自动注册已加载模块中的任务,新的模块需要显式导入或已被某处引用。
  2. 如果你使用 --without-mingle --without-gossip --without-heartbeat 启动 worker,会避免 celery 内部的缓存影响你动态注册的任务。
  3. 在动态代码中注册任务后,调用时使用完整路径,例如 dynamic_tasks.dynamic_add。


🧩 实际用法场景

  • 多租户系统,不同租户的任务不一样。
  • 在线平台支持用户上传任务代码,运行 sandbox 中的 celery 任务。
  • AIOps / 自动化平台动态生成任务逻辑。


❓是否需要热更新?

如果你想做到“热更新任务代码”,还可以:

  • 结合文件监控(如 watchdog)+ 自动 reload task 模块
  • 使用 Redis 或数据库保存任务代码,定期动态注册


需要完整项目示例或者具体接口方式(例如 Flask/Django 中注册任务),可以告诉我,我可以继续给你写完整例子。

文章标签:

评论(0)