摘要:是一個基于的分布式調度系統,文檔在這最近有個需求想要動態的添加任務而不用重啟服務找了一圈沒找到什么好辦法也有可能是文檔沒看仔細,所以只能自己實現囉為動態添加任務,首先我想到的是傳遞一個函數進去,讓某個特定任務去執行這個傳遞過去的函數,就像這
celery是一個基于Python的分布式調度系統,文檔在這 ,最近有個需求,想要動態的添加任務而不用重啟celery服務,找了一圈沒找到什么好辦法(也有可能是文檔沒看仔細),所以只能自己實現囉
為celery動態添加任務,首先我想到的是傳遞一個函數進去,讓某個特定任務去執行這個傳遞過去的函數,就像這樣
@app.task def execute(func, *args, **kwargs): return func(*args, **kwargs)
很可惜,會出現這樣的錯誤
kombu.exceptions.EncodeError: Object of type "function" is not JSON serializable
換一種序列化方式
@app.task(serializer="pickle") def execute(func, *args, **kwargs): return func(*args, **kwargs)
結果又出現一大串錯誤信息
ERROR/MainProcess] Pool callback raised exception: ContentDisallowed("Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)",) Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: "chord" During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: "_payload"
換一種思路
func = import_string(func)
不知道這樣是否可以,結果測試: No
哎,流年不利.
最后一直測試,一直測試,終于找到了一種辦法,直接上代碼
from importlib import import_module, reload app.conf.CELERY_IMPORTS = ["task", "task.all_task"] def import_string(import_name): import_name = str(import_name).replace(":", ".") modules = import_name.split(".") mod = import_module(modules[0]) for comp in modules[1:]: if not hasattr(mod, comp): reload(mod) mod = getattr(mod, comp) return mod @app.task def execute(func, *args, **kwargs): func = import_string(func) return func(*args, **kwargs)
項目結構是這樣的
├── celery_app.py
├── config.py
├── task
│?? ├── all_task.py
│?? ├── __init__.py
注意: 任務必須大于等于兩層目錄
以后每次添加任務都可以先添加到all_task.py里,調用時不用再重啟celery服務
# task/all_task.py def ee(c, d): return c, d, "你好" # example from celery_app import execute execute.delay("task.all_task.ee", 2, 444)
ok,另外發現celery也支持任務定時調用,就像這樣
execute.apply_async(args=["task.all_task.aa"], eta=datetime(2017, 7, 9, 8, 12, 0))
簡單實現一個任務重復調用的功能
@app.task def interval(func, seconds, args=(), task_id=None): next_run_time = current_time() + timedelta(seconds=seconds) kwargs = dict(args=(func, seconds, args), eta=next_run_time) if task_id is not None: kwargs.update(task_id=task_id) interval.apply_async(**kwargs) func = import_string(func) return func(*args)
大概意思就是先計算下次運行的時間,然后把任務添加到celery隊列里,這里有個task_id有些問題,因為假設添加了每隔3s執行一個任務,
它的task_id默認會使用uuid生成,如果想要再移除這個任務就不太方便,自定task_id可能會好一些,另外也許需要判斷task_id是否存在
AsyncResult(task_id).state
ok,再獻上一個好用的函數
from inspect import getmembers, isfunction def get_tasks(module="task"): return [{ "name": "task:{}".format(f[1].__name__), "doc": f[1].__doc__, } for f in getmembers(import_module(module), isfunction)]
就這樣.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38697.html
摘要:所以這就現實了在中使用的應用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產生請求上下文。中有和可以產生請求上下文。具體的思路還是在中重載類,通過,在的上下文環境下執行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數的使用情況。 其實我只是想把郵件發送這個動作移到Celery中執行。既然用到了Celery,那么每次發郵件都單獨開一個線程似乎有點多余,異步任務還是交給Celery吧...
摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調度。不存儲任務狀態。標識要使用的默認序列化方法的字符串。指定該任務的結果存儲后端用于此任務。 概述: ????????我們考慮一個場景,公司有一個需求,現在需要做一套web系統,而這套系統某些功能需要使用...
摘要:的簡介是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。目前支持等作為消息代理,但適用于生產環境的只有和官方推薦。任務處理完后保存狀態信息和結果,以供查詢。 celery的簡介 ??celery是一個基于分布式消息傳輸的異步任務隊列,它專注于實時處理,同時也支持任務調度。它的執行單元為任務(task),利用多線程,如Eventlet,gevent等,它們能被...
閱讀 2784·2021-09-01 10:30
閱讀 1680·2019-08-30 15:52
閱讀 965·2019-08-29 18:40
閱讀 1116·2019-08-28 18:30
閱讀 2392·2019-08-23 17:19
閱讀 1321·2019-08-23 16:25
閱讀 2700·2019-08-23 16:18
閱讀 2977·2019-08-23 13:53