摘要:在使用前必須實例化,稱為或。在中發送消息時,該消息僅包含要執行的的名稱。每一個維護一個名稱和對應函數的映射,這稱為。雖然可以依賴于當前應用,但最佳實踐是將應用實例傳遞給任何需要它的對象,這個行為可以稱為。
Celery在使用前必須實例化,稱為application或app。app是線程安全的,具有不同配置、組件、task的多個Celery應用可以共存于同一個進程空間。
# 創建Celery應用 >>> from celery import Celery >>> app = Celery() >>> app
最后一行文本化顯示了Celery應用:包含應用所屬類的名稱,當前主模塊名,以及內存地址。唯一重要的信息是模塊名稱。
Main Name在Celery中發送task消息時,該消息僅包含要執行的task的名稱。每一個worker維護一個task名稱和對應函數的映射,這稱為task registry。
當定義一個task時,該task將注冊到本地:
>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.name __main__.add >>> app.tasks["__main__.add"] <@task: __main__.add>
當Celery無法檢測task函數屬于哪個模塊時,使用main模塊名生成初始task名稱。
這種方式僅適用于以下兩種場景:
定義task的模塊作為程序運行
app在python shell中創建
# tasks.py from celery import Celery app = Celery() @app.task def add(x, y): return x + y if __name__ == "__main__": app.worker_main()
如果直接運行tasks.py,task名將以__main__為前綴,但如果tasks.py被其他程序導入,task名將以tasks為前綴。如下:
>>> from tasks import add >>> add.name tasks.add
也可以直接指定主模塊名:
>>> app = Celery("tasks") >>> app.main "tasks" >>> @app.task ... def add(x, y): ... return x + y >>> add.name tasks.addConfiguration
可以通過直接設置,或使用專用配置模塊對Celery進行配置。
通過app.conf屬性查看或直接設置配置:
>>> app.conf.timezone "Europe/London" >>> app.conf.enable_utc = True
或用app.conf.update方法一次更新多個配置:
>>> app.conf.update( ... enable_utc=True, ... timezone="Europe/London", ...)config_from_object
app.config_from_object()方法從配置模塊或對象中導入配置。需要注意的是:調用config_from_object()方法將重置在這之前配置的任何設置。
使用模塊名
app.config_from_object()方法接收python模塊的完全限定名(fully qualified name)或具體到其中的某個屬性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":
from celery import Celery app = Celery() app.config_from_object("celeryconfig")
只要能夠正常執行import celeryconfig,app就能正常配置。
使用模塊對象
也可以傳入一個已導入的模塊對象,但不建議這樣做。
import celeryconfig from celery import Celery app = Celery() app.config_from_object(celeryconfig)
更推薦使用模塊名的方式,因為這樣在使用prefork pool時不需要序列化該模塊。如果在實際應用中出現配置問題或序列化錯誤,請嘗試使用模塊名的方式。
使用配置類或對象
from celery import Celery app = Celery() class Config: enable_utc = True timezone = "Europe/London" app.config_from_object(Config)config_from_envvar
app.config_from_envvar()方法從環境變量中接收配置模塊名。
import os from celery import Celery #: Set default configuration module name os.environ.setdefault("CELERY_CONFIG_MODULE", "celeryconfig") app = Celery() app.config_from_envvar("CELERY_CONFIG_MODULE")
通過環境變量指定配置模塊:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l infoCensored configuration
如果要顯示Celery配置,可能需要過濾某些敏感信息如密碼、密鑰等。Celery提供了幾種用于幫助顯示配置的實用方法。
humanize()
該方法按行返回字符串形式的配置,默認只包含改動過的配置,如果要顯示內置的默認配置,設置with_defaults參數為True:
>>> app.conf.humanize(with_defaults=False, censored=True)
table()
該方法返回字典形式的配置:
>>> app.conf.table(with_defaults=False, censored=True)
Celery不會移除所有的敏感信息,因為它使用正則表達式匹配鍵并判斷是否移除。如果用戶添加了包含敏感信息的自定義配置,可以使用Celery可能標記為敏感配置的名稱來命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。
Laziness應用實例是惰性的。
創建Celery實例只會執行以下操作:
創建用于event的logical clock instance
創建task registry
設置為當前應用(除非禁用了set_as_current參數)
調用app.on_init()回調函數(默認不執行任何操作)
app.task()裝飾器不會在task定義時立即創建task,而是在task使用時或finalized應用后創建。
下例說明了在使用task或訪問其屬性前,都不會創建task:
>>> @app.task >>> def add(x, y): ... return x + y >>> type(add)>>> add.__evaluated__() False >>> add # <-- causes repr(add) to happen <@task: __main__.add> >>> add.__evaluated__() True
應用的Finalization指顯式地調用app.finalize()方法或隱式地訪問app.tasks屬性。
finalized應用將會:
復制必須在應用間共享的task。task默認是共享的,但如果禁用了task裝飾器的shared屬性,將屬于應用私有。
評估所有待處理的task裝飾器
確保所有task綁定到當前應用。將task綁定到某個應用,以便可以從配置中讀取默認值。
Breaking the chain雖然可以依賴于當前應用,但最佳實踐是將應用實例傳遞給任何需要它的對象,這個行為可以稱為app chain。
# 依賴于當前應用(bad) from celery import current_app class Scheduler(object): def run(self): app = current_app
# 傳遞應用實例(good) class Scheduler(object): def __init__(self, app): self.app = app
在開發模式設置CELERY_TRACE_APP環境變量,可以在應用鏈斷開時拋出異常:
$ CELERY_TRACE_APP=1 celery worker -l infoAbstract Tasks
使用task()裝飾器創建的task都繼承自celery.app.task模塊的Task基類。繼承該類可以自定義task類:
from celery import Task # 或者 from celery.app.task import Task class DebugTask(Task): def __call__(self, *args, **kwargs): print("TASK STARTING: {0.name}[{0.request.id}]".format(self)) return super(DebugTask, self).__call__(*args, **kwargs)
如果要重寫__call__()方法,記得調用super。這樣在task直接調用時會執行基類的默認事件。
Task基類是特殊的,因為它并未綁定到任何特定的應用。一旦task綁定到應用,它將讀取配置以設置默認值等。
通過base參數指定基類
@app.task(base=DebugTask) def add(x, y): return x + y
通過app.Task屬性指定基類
>>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ... queue = "hipri" >>> app.Task = MyBaseTask >>> app.Task>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [ >, , , ]
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/43221.html
摘要:所以這就現實了在中使用的應用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產生請求上下文。中有和可以產生請求上下文。具體的思路還是在中重載類,通過,在的上下文環境下執行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數的使用情況。 其實我只是想把郵件發送這個動作移到Celery中執行。既然用到了Celery,那么每次發郵件都單獨開一個線程似乎有點多余,異步任務還是交給Celery吧...
摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調度。不存儲任務狀態。標識要使用的默認序列化方法的字符串。指定該任務的結果存儲后端用于此任務。 概述: ????????我們考慮一個場景,公司有一個需求,現在需要做一套web系統,而這套系統某些功能需要使用...
摘要:中常用的幾個框架有等,今天來總結一下和的不同。本文使用的環境是。文件可以加載路由信息和項目配置信息,文件負責啟動項目。以上就簡單的比較了和幾個方面的不同,它們各有優缺點,實際工作中可以根據不同的需求選擇不同的框架進行開發。 python中常用的幾個web框架有django, tornado, flask等,今天來總結一下django和tornado的不同。工作中django和torna...
摘要:基于網,分享項目的組網架構和部署。項目組網架構架構說明流項目訪問分為兩個流,通過分兩個端口暴露給外部使用數據流用戶訪問網站。通過進行配置,使用作為異步隊列來存儲任務,并將處理結果存儲在中。 基于Raindrop網,分享項目的組網架構和部署。 項目組網架構 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
摘要:主要是為了實現系統之間的雙向解耦而實現的。問題及優化隊列過長問題使用上述方案的異步非阻塞可能會依賴于的任務隊列長度,若隊列中的任務過多,則可能導致長時間等待,降低效率。 Tornado和Celery介紹 1.Tornado Tornado是一個用python編寫的一個強大的、可擴展的異步HTTP服務器,同時也是一個web開發框架。tornado是一個非阻塞式web服務器,其速度相當快。...
閱讀 3494·2019-08-30 15:53
閱讀 3409·2019-08-29 16:54
閱讀 2197·2019-08-29 16:41
閱讀 2404·2019-08-23 16:10
閱讀 3382·2019-08-23 15:04
閱讀 1351·2019-08-23 13:58
閱讀 351·2019-08-23 11:40
閱讀 2457·2019-08-23 10:26