摘要:文檔中文文檔官方文檔定時服務與結合使用簡介是一個自帶電池的的任務隊列。追蹤任務在不同狀態間的遷移,并檢視返回值。
文檔
中文文檔
官方文檔
celery定時服務、celery與django結合使用
簡介Celery 是一個“自帶電池”的的任務隊列。它易于使用,所以你可以無視其所解決問題的復雜程度而輕松入門。它遵照最佳實踐設計,所以你的產品可以擴展,或與其他語言集成,并且它自帶了在生產環境中運行這樣一個系統所需的工具和支持。
Celery 的最基礎部分。包括:
選擇和安裝消息傳輸方式(中間人)----broker,如RabbitMQ,redis等。
RabbitMQ的安裝:sudo apt-get install rabbitmq-server
本文使用redis
官方推薦RabbitMQ
當然部分nosql也可以
安裝 Celery 并創建第一個任務
運行職程并調用任務。
追蹤任務在不同狀態間的遷移,并檢視返回值。
安裝pip install celery簡單使用 定義任務
tasks.py
from celery import Celery #第一個參數是你的celery名稱 #backen 用于存儲結果 #broker 用于存儲消息隊列 app = Celery("tasks",backend="redis://:password@host:port/db", broker="redis://:password@host:port/db") @app.task def add(x, y): return x + y
Celery 的第一個參數是當前模塊的名稱,這個參數是必須的,這樣的話名稱可以自動生成。第二個參數是中間人關鍵字參數,指定你所使用的消息中間人的 URL,此處使用了 RabbitMQ,也是默認的選項。更多可選的中間人見上面的 選擇中間人 一節。例如,對于 RabbitMQ 你可以寫 amqp://localhost ,而對于 Redis 你可以寫 redis://localhost .
你定義了一個單一任務,稱為 add ,返回兩個數字的和。
啟動celery服務步驟:
啟動任務工作者worker
講任務放入celery隊列
worker讀取隊列,并執行任務
啟動一個工作者,創建一個任務隊列
// -A 指定celery名稱,loglevel制定log級別,只有大于或等于該級別才會輸出到日志文件 celery -A tasks worker --loglevel=info
如果你沒有安裝redis庫,請先pip install redis
使用celery現在我們已經有一個celery隊列了,我門只需要將工作所需的參數放入隊列即可
from tasks import add #調用任務會返回一個 AsyncResult 實例,可用于檢查任務的狀態,等待任務完成或獲取返回值(如果任務失敗,則為異常和回溯)。 #但這個功能默認是不開啟的,你需要設置一個 Celery 的結果后端(即backen,我們在tasks.py中已經設置了,backen就是用來存儲我們的計算結果) result=add.delay(4, 4) #如果任務已經完成 if(result.ready()): #獲取任務執行結果 print(result.get(timeout=1))
常用接口
tasks.add(4,6) ---> 本地執行
tasks.add.delay(3,4) --> worker執行
t=tasks.add.delay(3,4) --> t.get() 獲取結果,或卡住,阻塞
t.ready()---> False:未執行完,True:已執行完
t.get(propagate=False) 拋出簡單異常,但程序不會停止
t.traceback 追蹤完整異常
使用配置使用配置來運行,對于正式項目來說可維護性更好。配置可以使用app.config.XXXXX_XXX="XXX"的形式如app.conf.CELERY_TASK_SERIALIZER = "json"來進行配置
配置資料
配置文件config.py
#broker BROKER_URL = "redis://:password@host:port/db" #backen CELERY_RESULT_BACKEND = "redis://:password@host:port/db" #導入任務,如tasks.py CELERY_IMPORTS = ("tasks", ) #列化任務載荷的默認的序列化方式 CELERY_TASK_SERIALIZER = "json" #結果序列化方式 CELERY_RESULT_SERIALIZER = "json" CELERY_ACCEPT_CONTENT=["json"] #時間地區與形式 CELERY_TIMEZONE = "Europe/Oslo" #時間是否使用utc形式 CELERY_ENABLE_UTC = True #設置任務的優先級或任務每分鐘最多執行次數 CELERY_ROUTES = { # 如果設置了低優先級,則可能很久都沒結果 #"tasks.add": "low-priority", #"tasks.add": {"rate_limit": "10/m"}, #"tasks.add": {"rate_limit": "10/s"}, #"*": {"rate_limit": "10/s"} } #borker池,默認是10 BROKER_POOL_LIMIT = 10 #任務過期時間,單位為s,默認為一天 CELERY_TASK_RESULT_EXPIRES = 3600 #backen緩存結果的數目,默認5000 CELERY_MAX_CACHED_RESULTS = 10000開啟服務
celery.py
from celery import Celery #指定名稱 app = Celery("mycelery") #加載配置模塊 app.config_from_object("config") if __name__=="__main__": app.start()任務定義
tasks.py
from .celery import app @app.task def add(a, b): return a + b啟動
// -l 是 --loglevel的簡寫 celery -A mycelery worker -l info執行/調用服務
from tasks import add #調用任務會返回一個 AsyncResult 實例,可用于檢查任務的狀態,等待任務完成或獲取返回值(如果任務失敗,則為異常和回溯)。 #但這個功能默認是不開啟的,你需要設置一個 Celery 的結果后端(即backen,我們在tasks.py中已經設置了,backen就是用來存儲我們的計算結果) result=add.delay(4, 4) #如果任務已經完成 if(result.ready()): #獲取任務執行結果 print(result.get(timeout = 1))分布式
啟動多個celery worker,這樣即使一個worker掛掉了其他worker也能繼續提供服務
方法一
// 啟動三個worker:w1,w2,w3 celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info // 立即停止w1,w2,即便現在有正在處理的任務 celery multi stop w1 w2 // 重啟w1 celery multi restart w1 -A project -l info // celery multi stopwait w1 w2 w3 # 待任務執行完,停止
方法二
// 啟動多個worker,但是不指定worker名字 // 你可以在同一臺機器上運行多個worker,但要為每個worker指定一個節點名字,使用--hostname或-n選項 // concurrency指定處理進程數,默認與cpu數量相同,因此一般無需指定 $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h $ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h錯誤處理
celery可以指定在發生錯誤的情況下進行自定義的處理
config.py
def my_on_failure(self, exc, task_id, args, kwargs, einfo): print("Oh no! Task failed: {0!r}".format(exc)) // 對所有類型的任務,當發生執行失敗的時候所執行的操作 CELERY_ANNOTATIONS = {"*": {"on_failure": my_on_failure}}
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/44540.html
摘要:是什么是一個由編寫的簡單靈活可靠的用來處理大量信息的分布式系統它同時提供操作和維護分布式系統所需的工具。專注于實時任務處理,支持任務調度。說白了,它是一個分布式隊列的管理工具,我們可以用提供的接口快速實現并管理一個分布式的任務隊列。 Celery 是什么? Celery 是一個由 Python 編寫的簡單、靈活、可靠的用來處理大量信息的分布式系統,它同時提供操作和維護分布式系統所需的工...
摘要:我們將窗口切換到的啟動窗口,會看到多了兩條日志這說明任務已經被調度并執行成功。本文標題為異步任務神器簡明筆記本文鏈接為參考資料使用之美分布式任務隊列的介紹思誠之道異步任務神器簡明筆記 Celery 在程序的運行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程序的運行,我們經常會采用多線程或異步任務。比如,在 Web 開發中,對新用戶的注冊,我們通常會給他發一封激活郵件,...
摘要:介紹應用舉例是一個基于開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理,如果你的業務場景中需要用到異步任務,就可以考慮使用你想對臺機器執行一條批量命令,可能會花很長時間,但你不想讓你的程序等著結果返回,? celery 1.celery介紹 1.1 celery應用舉例 Celery 是一個 基于python開發的分布式異步消息任務隊列,通過...
摘要:今天介紹一下如何在項目中使用搭建一個有兩個節點的任務隊列一個主節點一個子節點主節點發布任務,子節點收到任務并執行。 今天介紹一下如何在django項目中使用celery搭建一個有兩個節點的任務隊列(一個主節點一個子節點;主節點發布任務,子節點收到任務并執行。搭建3個或者以上的節點就類似了),使用到了celery,rabbitmq。這里不會單獨介紹celery和rabbitmq中的知識了...
閱讀 3881·2021-11-24 11:14
閱讀 3321·2021-11-22 13:53
閱讀 3883·2021-11-11 16:54
閱讀 1546·2021-10-13 09:49
閱讀 1211·2021-10-08 10:05
閱讀 3392·2021-09-22 15:57
閱讀 1754·2021-08-16 11:01
閱讀 965·2019-08-30 15:55