摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調度。不存儲任務狀態。標識要使用的默認序列化方法的字符串。指定該任務的結果存儲后端用于此任務。
????????我們考慮一個場景,公司有一個需求,現在需要做一套web系統,而這套系統某些功能需要使用一些開源工具的sdk和api,或是運行一些耗時比較大的任務(單個大任務下可能有多個小任務),需要一段時間才能提供執行結果,而前端同事要求不能讓用戶在頁面等待,需要馬上提供一個返回結果給他,任務執行完后可以拿到最終結果,并且用戶退出web界面或瀏覽器異常關閉之后,再次返回界面,執行的過程不會中斷,并且支持多用戶同時執行不同操作的需要。
? ? ? ? 很明顯,這是一個-異步多線程-的場景,在Python中可以想到的有:
????????1.引入Asyncio模塊,利用多協程實現。
????????2.使用Threading模塊,自己編寫線程任務,線程等待,睡眠,釋放線程的過程。
????????3.使用異步框架,例如Cerely、Tornado、Twisted等等,裝飾異步任務。
? ? ? ? 這里邊最便捷且開發效率最高的應該是使用異步框架,咱們選擇使用Celery來實現這個需求。
? ? ? ? 截圖與描述來自celery官網:Celery - Distributed Task Queue — Celery 5.2.0 documentation
????????Celery 是一個簡單、靈活且可靠的分布式系統,用于處理大量消息,同時為操作提供維護此類系統所需的工具。
????????它是一個專注于實時處理的任務隊列,同時也支持任務調度。
????????Celery 擁有龐大而多樣化的用戶和貢獻者社區,您應該加入我們的 IRC?或我們的郵件列表。
????????Celery 是開源的,并在BSD 許可下獲得許可。
????????我們除了需要Celery做異步任務的處理,還需要一個中間件來充當消費者,并保存最終的任務處理結果(消費結果),這里有很多中間件可以選,例如常用的消息中間件,rabbitmq,kafka等,還可以使用mysql,redis等作為消費者并保存消費結果(因為最終的處理結果要返回給前端同事),樓主最終選擇了redis。
????????這里不再贅述windows下安裝redis步驟,只介紹linux下安裝redis與配置,我的機器是centos7.6:
????????yum方式安裝(注意:這樣安裝的redis不是最新版本的,如有對版本要求比較高的,建議去官網下載源碼包去手動安裝,官網地址:Redis,最新版本:6.2.6)
yum -y install redis
????????安裝完成之后配置redis.conf文件:
vi /etc/redis.conf
????????修改這一行,改成 0.0.0.0,這樣別的應用和組件才可以訪問到redis的服務與端口:
????????同理,redis的默認端口也可以在此配置里修改:
????????還有一些關閉匿名訪問,設置密碼等配置的修改,項目若要上到公網環境下,建議配置。
????????啟動并測試redis服務功能是否正常:
????????啟動redis:????????
redis-cli -h 0.0.0.0
????????測試redis:
1 redis> set name "zzz"2 3 OK4 5 redis> get name6 7 "zzz"
????????記住,代碼并沒有實際引用redis,但也需要安裝redis模塊,否則會報錯。(redis模塊版本不要太高,高了也會報錯,這些坑都是樓主親自趟過的,我這里使用2.10.6)
pip install redis==2.10.6
????????windos和linux下都可以使用pip安裝:
?pip install celery==3.1.25
? ? ? ? 我的項目目錄:(celeryconfig.py與__init__.py文件為celery與redis配置文件):
??????????
????????在項目中先創建一個名為config的python目錄,并在__init__.py中導入celery模塊并配置:
__init__.py:
from celery import Celery,platformsplatforms.C_FORCE_ROOT = Trueapp = Celery("prod") # 創建 Celery 實例app.config_from_object("kernel.config.celeryconfig") # 通過 Celery 實例加載配置模塊
????????platforms.C_FORCE_ROOT = True 這個配置一定要有,否則會報權限問題。
????????在config目錄下的celeryconfig.py中配置任務隊列消費者與消費結果保存在redis的地址:
celeryconfig.py:
## celery配置BROKER_URL = "redis://redis-host:6379/1" # 指定 Broker消費者,我們使用redis 1號數據庫CELERY_RESULT_BACKEND = "redis://redis-host:6379/2" # 指定 Backend,最終消費結果,我們使用redis 2號數據庫CELERY_TIMEZONE = "Asia/Shanghai" # 指定時區,默認是 UTCCELERY_IMPORTS = ( # 指定導入的任務模塊 "kernel.views.api" ## 異步任務代碼文件路徑即可)
????????至此,前期需要的工具準備工作全部完畢,我們開始我們的開發任務。
? ? ? ? 樓主因為主要負責后端這塊,這里選擇使用flask來寫,整體的項目模塊與版本,大概羅列下:
????????????????????????Python 3.5.4
????????????????????????Mysql ?5.5.64????????
????????????????????????Celery==3.1.25
????????????????????????Flask==1.1.4
????????????????????????Redis==2.10.6
????????這時我們與前端同事再次詳細溝通了下,初步約定如下:
? ? ? ? 1.前端通過form表單傳數據給后端,格式為json,分析:需要解析json數據。
????????2.因為存在長耗時的任務,要求一旦前端請求過來,后端要馬上返回一個中間結果給前端(這樣解決了前端頁面等待的問題),分析:需要馬上提供一個返回結果。
? ? ? ? 3.前端最終要拿到任務的最終執行結果,分析:我們需要把長耗時異步任務的最終結果推送給前端,需要任務代碼最后推送執行結果。(自己先定義回調接口去測試)
????????項目名稱-kernel-view-api.py,與celery配置下的任務模塊對應。
?api.py:
# -*- coding: utf-8 -*-import json, sysimport loggingimport requestsimport datetime,pymysqlimport os,subprocessfrom flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Responsefrom kernel.models.playbook import PlayBook_filefrom kernel.utils import render_response, Retvalfrom kernel.models import dbfrom sqlalchemy import or_,textimport gitlab ## 導入gitlab模塊from kernel.config import app, cmdb_config,hcacp_configimport pymysql,uuid,hashlib,timefrom datetime import timezonebp = Blueprint("test", __name__) ## 藍圖自己定義,這里只是實例化log = logging.getLogger(__name__) ## 日志自己定義,這里只是實例化class status: ## 定義一些狀態碼 success = 0 warning = 1 pending = 2 faild = -1## 回調接口@bp.route("/test/callback/", methods=["GET", "POST"])def ansible_aaa(): data1 = request.get_data(as_text=True) # data2 = json.loads(data1) log.info(data1) return data1@bp.route("/test/add/", methods=["POST", "GET"])def devops_add(): """ 獲取form表單json數據 """ # return True try: data = request.get_data() _data = json.loads((str(data, "utf-8"))) print(_data) except Exception as requestdata_except: log.error("獲取表單數據異常,異常原因:%s" % requestdata_except) return render_response(status.faild, u"獲取表單數據異常,異常原因:%s" % requestdata_except, {}) ## 獲取標識tag的結果 try: """ 工單json數據要帶工單標識符select_tag: create_project:新建項目申請工單 """ select_tag = _data.get("select_tag") except Exception as request_select_tag_except: log.error("獲取表單需求標識select_tag異常,異常原因:%s" % request_select_tag_except) return render_response(status.faild, u"獲取表單需求標識select_tag異常,異常原因:%s" % request_select_tag_except, {}) try: """ !--當參數select_tag == create_project 時,建立項目--! """ if select_tag == "create_project": projname = _data.get("projname") add_project_result = add_project.delay(cmdb_config, _data) return render_response(status.pending, u"devops系統添加項目工單任務執行中--pending--", {"項目中文名稱": projname}) except Exception as do_celery_job_except: log.error("執行異步celery任務異常,異常原因:%s" % do_celery_job_except) return render_response(status.faild, u"執行異步celery任務異常,異常原因:%s" % do_celery_job_except, {})
這里代表前端請求過來之后,馬上返回一個執行結果,滿足需求2:
在devops_add接口里執行異步任務:? ? ? ??
????????add_project_result = add_project.delay(cmdb_config, _data)
官網的示例:
? ? ? ? ## 1.擴號里為異步任務所需的參數
????????## 2.add_project_result?是異步任務執行的對象,包含很多屬性方法,下邊介紹一些常用的:
????????獲取任務結果和狀態:
????????add_project_result =?task.apply_async()
????????add_project_result.ready()?????#?查看任務狀態,返回布爾值,??任務執行完成,?返回?True,?否則返回?False.
????????add_project_result.wait()??????#?會阻塞等待任務完成,?返回任務執行結果,很少使用;
????????add_project_result.get(timeout=1)???????#?獲取任務執行結果,可以設置等待時間,如果超時但任務未完成返回None;
????????add_project_result.result??????#?任務執行結果,未完成返回None;
????????add_project_result.state???????#?PENDING,?START,?SUCCESS,任務當前的狀態
????????add_project_result.status??????#?PENDING,?START,?SUCCESS,任務當前的狀態
????????add_project_result.successful??#?任務成功返回true
????????add_project_result.traceback??#?如果任務拋出了一個異常,可以獲取原始的回溯信息
? ? ?
????????項目名稱-kernel-view-api.py
api.py
解釋:
????????因為要滿足需求3,把最終異步耗時任務的真正結果給到前端,所以我們需要在異步任務里寫一個回調的操作。
?????????header = {"Content-Type": "application/json"}? ## 構造請求頭和數據類型
????????_json = {"status": sttaus.faild, "msg": u"失敗", "data": {}}? ## 失敗就返回給前端json類型失敗
????????_json = {"status": sttaus.success, "msg": u"成功", "data": {}}? ## 成功就返回給前端json類型成功
????????requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 帶參回調請求
# -*- coding: utf-8 -*-import json, sysimport loggingimport requestsimport datetime,pymysqlimport os,subprocessfrom flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Responsefrom kernel.utils import render_response, Retvalfrom datetime import timezonefrom kernel.config import * ## 導入config目錄下的celery配置bp = Blueprint("test", __name__) ## 藍圖自己定義,這里只是實例化log = logging.getLogger(__name__) ## 日志自己定義,這里只是實例化class status: ## 定義一些狀態碼 success = 0 warning = 1 pending = 2 faild = -1## 示例函數:一個添加信息函數,前端給我們json數據,后端接受之后去插入數據庫,完成操作并告訴前端@app.task ## celery添加項目任務def add_project(mysql_config, _data): try: ## 系統添加項目信息工單 projname = _data.get("projname") ## 項目名稱,必填 prodesc= _data.get("prodesc") ## 項目描述,必填 projctime = datetime.datetime.now() ## 項目發布時間 callback_url = _data.get("callback_url") ## 回調接口地址 except Exception as describe_form_except: log.error("解析表單數據出現異常,異常原因:%s" % describe_form_except) header = {"Content-Type": "application/json"} ## 回調接口請求頭 _json = {"status": status.faild, "msg": u"失敗", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) try: # 獲取數據庫連接 conn = pymysql.connect(cmdb_config.server, cmdb_config.user, cmdb_config.password, database=cmdb_config.db) # 返回連接 cursor = conn.cursor() except Exception as connect_except: log.error("系統數據庫連接出現異常,異常原因:%s" % connect_except) _json = {"status": status.faild, "msg": u"失敗", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) try: proj_sql = "insert into project_tb_project (projname,prodesc,projctime) VALUES ("{}","{}","{}");".format(projname, prodesc, projctime) cursor.execute(proj_sql) conn.commit() _json = {"status": status.success, "msg": u"成功", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 任務執行完成之后調用回調接口,返回任務執行成功結果 log.info("系統建項目工單執行成功,%s" % proj_sql) except Exception as do_add_project_except: _json = {"status": status.faild, "msg": u"失敗", "data": {}} requests.post(callback_url, headers=header, data=json.dumps(_json)) log.error("執行添加項目工單異常,異常原因:%s" % do_add_project_except) ## 任務執行完成之后調用回調接口,返回任務執行失敗結果
????????樓主用的最簡單,沒有在task里寫一些屬性,類似下邊的這種方式還可以給task添加一些屬性:
????????@app.task(name="test",bind=True,base=BaseTask)
? ? ? ?補充介紹下異步task有的一些屬性:
????????TASK的一般屬性:
????????Task.name:任務名稱;
????????Task.request:當前任務的信息;
????????Task.max_retries:設置重試的最大次數
????????Task.throws:預期錯誤類的可選元組,不應被視為實際錯誤,而是結果失敗;
????????Task.rate_limit:設置此任務類型的速率限制
????????Task.time_limit:此任務的硬限時(以秒為單位)。
????????Task.ignore_result:不存儲任務狀態。默認False;
????????Task.store_errors_even_if_ignored:如果True,即使任務配置為忽略結果,也會存儲錯誤。
????????Task.serializer:標識要使用的默認序列化方法的字符串。
????????Task.compression:標識要使用的默認壓縮方案的字符串。默認為task_compression設置。
????????Task.backend:指定該任務的結果存儲后端用于此任務。
????????Task.acks_late:如果設置True為此任務的消息將在任務執行后確認?,而不是在執行任務之前(默認行為),即默認任務執行之前就會發送確認;
????????Task.track_started:如果True任務在工作人員執行任務時將其狀態報告為“已啟動”。默認是False;
我們啟動celery來看下celery里在執行任務的過程中有什么變化:
(1)啟動項目:
樓主用的是gunicorn工具啟動,配置多線程:
gunicorn.conf
????????workers = 16? ?## 多線程配置
????????bind = "0.0.0.0:7777"
????????proc_name = "websocket(項目名稱)"
????????limit_request_field_size = 0
????????limit_request_line = 0
????????log_level = "error"
????????debug = True
????????chdir = "/data/websocket" ## 項目目錄
????????啟動命令:gunicorn -c? /項目目錄/gunicorn.conf kernel:app
(2)啟動celery:
????????cd 到項目目錄下,執行 celery -A kernel.views.api worker -l info ?
(3)使用postman調用接口:
????????可以看到直接先返回我們狀態碼2-等待狀態:
(4)從日志看異步任務執行過程:
????????1.會先在celery里出現一個異步任務,并生成一個異步任務的task-id號:
????????2.redis去查看是否已有task任務,task-id號是一致的:
????????用add_project_result保存異步任務執行結果的對象,最終的結果是在redis中,我們也可以去redis里去拿,redis保存的結果。
????????我們用的redis 2號數據庫,select 2 號數據庫,keys * 查看redis是否已有任務
????????任務最終的執行結果(celery日志里也可以看到,在redis里也可以看到,celery日志看的更直觀,succeded代表異步任務執行成功):
????????3. 查看項目日志,狀態碼為1,是回調接口打印出來的,代表返回給回調接口最終結果是成功。
? ? ? ? 4.最終去數據庫看下新添加記錄是否已有,這里就不截圖了,記錄插入成功,異步任務執行成功,也滿足了開始我們溝通的三個需求。
? ? ? ? 5.前端同學給你豎起了大拇指,直呼你牛!
? ? ? ? ??
?
??????????????????????????????????????????????????????????????????????????????????????????????????
?
?
? ? ? ? celery還可以用來做定時任務,感興趣的伙伴們可以去官網或者其他途徑去研究下,樓主第一次寫這么大的博客,有些地方我描述不清楚的或者您沒太看懂的可以私信我答疑解惑,我的微信zcw576020095,熱愛python,熱愛運維,一起加油!
????????
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/123089.html
摘要:基于網,分享項目的組網架構和部署。項目組網架構架構說明流項目訪問分為兩個流,通過分兩個端口暴露給外部使用數據流用戶訪問網站。通過進行配置,使用作為異步隊列來存儲任務,并將處理結果存儲在中。 基于Raindrop網,分享項目的組網架構和部署。 項目組網架構 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
摘要:所以這就現實了在中使用的應用上下文。要引入請求上下文,需要考慮這兩個問題如何在中產生請求上下文。中有和可以產生請求上下文。具體的思路還是在中重載類,通過,在的上下文環境下執行。將他們傳入,生成偽造的請求上下文可以覆蓋大多數的使用情況。 其實我只是想把郵件發送這個動作移到Celery中執行。既然用到了Celery,那么每次發郵件都單獨開一個線程似乎有點多余,異步任務還是交給Celery吧...
摘要:基于的爬蟲分布式爬蟲管理平臺,支持多種編程語言以及多種爬蟲框架。后臺程序會自動發現這些爬蟲項目并儲存到數據庫中。每一個節點需要啟動應用來支持爬蟲部署。任務將以環境變量的形式存在于爬蟲任務運行的進程中,并以此來關聯抓取數據。 Crawlab 基于Celery的爬蟲分布式爬蟲管理平臺,支持多種編程語言以及多種爬蟲框架。 Github: https://github.com/tikazyq/...
摘要:我們將窗口切換到的啟動窗口,會看到多了兩條日志這說明任務已經被調度并執行成功。本文標題為異步任務神器簡明筆記本文鏈接為參考資料使用之美分布式任務隊列的介紹思誠之道異步任務神器簡明筆記 Celery 在程序的運行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程序的運行,我們經常會采用多線程或異步任務。比如,在 Web 開發中,對新用戶的注冊,我們通常會給他發一封激活郵件,...
摘要:介紹應用舉例是一個基于開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理,如果你的業務場景中需要用到異步任務,就可以考慮使用你想對臺機器執行一條批量命令,可能會花很長時間,但你不想讓你的程序等著結果返回,? celery 1.celery介紹 1.1 celery應用舉例 Celery 是一個 基于python開發的分布式異步消息任務隊列,通過...
閱讀 1827·2021-11-11 16:55
閱讀 1452·2019-08-30 15:54
閱讀 769·2019-08-29 15:34
閱讀 2253·2019-08-29 13:11
閱讀 2908·2019-08-26 13:28
閱讀 1878·2019-08-26 10:49
閱讀 992·2019-08-26 10:40
閱讀 2552·2019-08-23 18:21