摘要:所以這就現(xiàn)實(shí)了在中使用的應(yīng)用上下文。要引入請(qǐng)求上下文,需要考慮這兩個(gè)問題如何在中產(chǎn)生請(qǐng)求上下文。中有和可以產(chǎn)生請(qǐng)求上下文。具體的思路還是在中重載類,通過(guò),在的上下文環(huán)境下執(zhí)行。將他們傳入,生成偽造的請(qǐng)求上下文可以覆蓋大多數(shù)的使用情況。
其實(shí)我只是想把郵件發(fā)送這個(gè)動(dòng)作移到Celery中執(zhí)行。
既然用到了Celery,那么每次發(fā)郵件都多帶帶開一個(gè)線程似乎有點(diǎn)多余,異步任務(wù)還是交給Celery吧。
Celery和Flask一起使用并沒有什么不和諧的地方,都可以不用定制的Flask擴(kuò)展,按照網(wǎng)上隨處可見的示例也很簡(jiǎn)單:
from flask import Flask from celery import Celery app = Flask(__name__) app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0" celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"]) celery.conf.update(app.config) @celery.task def send_email(): ....
然而,稍微上點(diǎn)規(guī)模的Flask應(yīng)用都會(huì)使用Factory模式(中文叫工廠函數(shù),我聽著特別扭),即只有在創(chuàng)建Flask實(shí)例時(shí),才會(huì)初始化各種擴(kuò)展,這樣可以動(dòng)態(tài)的修改擴(kuò)展程序的配置。比如你有一套線上部署的配置和一套本地開發(fā)測(cè)試的配置,希望通過(guò)不同的啟動(dòng)入口,就使用不同的配置。
使用Factory模式的話,上面的代碼大概要修改成這個(gè)樣:
from flask import Flask from celery import Celery app = Flask(__name__) celery = Celery() def create_app(config_name): app.config.from_object(config[config_name]) celery.conf.update(app.config)
通過(guò)config_name,來(lái)動(dòng)態(tài)調(diào)整celery的配置。然而,這樣子是不行的!
Celery的__init__()函數(shù)會(huì)調(diào)用celery._state._register_app()直接就通過(guò)傳入的配置生成了Celery實(shí)例,上面的代碼中,celery = Celery()直接使用默認(rèn)的amqp作為了broker,隨后通過(guò)celery.conf.update(app.config)是更改不了broker的。這也就是為什么網(wǎng)上的示例代碼中,在定義Celery實(shí)例時(shí),就傳入了broker=app.config["CELERY_BROKER_URL"],而不是之后通過(guò)celery.conf.update(app.config)傳入。當(dāng)你的多套配置文件中,broker設(shè)置的不同時(shí),就悲劇了。
當(dāng)然不用自己造輪子,F(xiàn)lask-Celery-Helper就是解決以上問題的FLask擴(kuò)展。
看看它的__init__()函數(shù):
def __init__(self, app=None): """If app argument provided then initialize celery using application config values. If no app argument provided you should do initialization later with init_app method. :param app: Flask application instance. """ self.original_register_app = _state._register_app # Backup Celery app registration function. _state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing. super(Celery, self).__init__() if app is not None: self.init_app(app)
將_state._register_app函數(shù)備份,再置為空。這樣__init__()就不會(huì)創(chuàng)建Celery實(shí)例了。但如果指定了app,那么進(jìn)入init_app,嗯,大多數(shù)Flask擴(kuò)展都有這個(gè)函數(shù),用來(lái)動(dòng)態(tài)生成擴(kuò)展實(shí)例。
def init_app(self, app): """Actual method to read celery settings from app configuration and initialize the celery instance. :param app: Flask application instance. """ _state._register_app = self.original_register_app # Restore Celery app registration function. if not hasattr(app, "extensions"): app.extensions = dict() if "celery" in app.extensions: raise ValueError("Already registered extension CELERY.") app.extensions["celery"] = _CeleryState(self, app) # Instantiate celery and read config. super(Celery, self).__init__(app.import_name, broker=app.config["CELERY_BROKER_URL"]) ...
將_state._register_app函數(shù)還原,再執(zhí)行Celery原本的__init__。這樣就達(dá)到動(dòng)態(tài)生成實(shí)例的目的了。接著往下看:
task_base = self.Task # Add Flask app context to celery instance. class ContextTask(task_base): """Celery instance wrapped within the Flask app context.""" def __call__(self, *_args, **_kwargs): with app.app_context(): return task_base.__call__(self, *_args, **_kwargs) setattr(ContextTask, "abstract", True) setattr(self, "Task", ContextTask)
這里重載了celery.Task類,通過(guò)with app.app_context():,在app.app_context()的上下文環(huán)境下執(zhí)行Task。對(duì)于一個(gè)已生成的Flask實(shí)例,應(yīng)用上下文不會(huì)隨便改變。所以這就現(xiàn)實(shí)了在Celery中使用Flask的應(yīng)用上下文。
下面是官方的示例代碼:
# extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config["CELERY_IMPORTS"] = ("tasks.add_together", ) app.config["CELERY_BROKER_URL"] = "redis://localhost" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost" celery.init_app(app) return app # tasks.py from extensions import celery @celery.task() def add_together(a, b): return a + b # manage.py from application import create_app app = create_app() app.run()
跟普通的Flask擴(kuò)展一樣了。
Celery中使用Flask上下文在Flask的view函數(shù)中調(diào)用task.delay()時(shí),這個(gè)task相當(dāng)于一個(gè)離線的異步任務(wù),它對(duì)Flask的應(yīng)用上下文和請(qǐng)求上下文一無(wú)所知。但是這都可能是異步任務(wù)需要用到的。比如發(fā)送郵件要用到的render_template和url_for就分別要用到應(yīng)用上下文和請(qǐng)求上下文。不在celery中引入它們的話,就是Running code outside of a request。
引入應(yīng)用上下文的工作Flask-Celery-Helper已經(jīng)幫我們做好了,在Flask的文檔中也有相關(guān)介紹。實(shí)現(xiàn)方法和上面Flask-Celery-Helper的一樣。然而,不管是Flask-Celery-Helper還是Flask文檔,都沒有提及如何在Celery中使用請(qǐng)求上下文。
要引入請(qǐng)求上下文,需要考慮這兩個(gè)問題:
如何在Celery中產(chǎn)生請(qǐng)求上下文。Flask中有request_context和test_request_context可以產(chǎn)生請(qǐng)求上下文。區(qū)別是request_context需要WSGI環(huán)境變量environ,而test_request_context根據(jù)傳入的參數(shù)生成請(qǐng)求上下文。我沒有找到如何在Celery中獲取到WSGI環(huán)境變量的方法,所以只能自己傳入相關(guān)參數(shù)生成請(qǐng)求上下文了。
請(qǐng)求上下文是隨HTTP請(qǐng)求產(chǎn)生的,要獲取請(qǐng)求上下文,就必須在view函數(shù)中處理,view函數(shù)通過(guò)task.delay()發(fā)送Celery任務(wù)。所以需要重載task.delay(),以獲取請(qǐng)求上下文。
具體的思路還是在init_app中重載celery.Task類,通過(guò)with app.test_request_context():,在app.test_request_context()的上下文環(huán)境下執(zhí)行Task。
首先獲取request,從中整理出test_request_context()需要的參數(shù)。根據(jù)test_request_context的函數(shù)注釋,它需要的參數(shù)和werkzeug.test.EnvironBuilder類的參數(shù)一樣。
CONTEXT_ARG_NAME = "_flask_request_context" def _include_request_context(self, kwargs): """Includes all the information about current Flask request context as an additional argument to the task. """ if not has_request_context(): return # keys correspond to arguments of :meth:`Flask.test_request_context` context = { "path": request.path, "base_url": request.url_root, "method": request.method, "headers": dict(request.headers), "data": request.form } if "?" in request.url: context["query_string"] = request.url[(request.url.find("?") + 1):] kwargs[self.CONTEXT_ARG_NAME] = context
_include_request_context函數(shù)從request中提取path,base_url,method,headers,data,query_string。將他們傳入test_request_context,生成偽造的請(qǐng)求上下文可以覆蓋大多數(shù)的使用情況。
Celery通過(guò)apply_async,apply,retry調(diào)用異步任務(wù)(delay是apply_async的簡(jiǎn)化方法)。這里需要重載它們,讓這些函數(shù)獲取request:
def apply_async(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply_async(args, kwargs, **rest) def apply(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply(args, kwargs, **rest) def retry(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).retry(args, kwargs, **rest)
最后重載celery.Task的__call__方法:
def __call__(self, *args, **kwargs): """Execute task code with given arguments.""" call = lambda: super(ContextTask, self).__call__(*args, **kwargs) context = kwargs.pop(self.CONTEXT_ARG_NAME, None) if context is None or has_request_context(): return call() with app.test_request_context(**context): result = call() # process a fake "Response" so that # ``@after_request`` hooks are executed app.process_response(make_response(result or "")) return result
context是我們從request中獲取的參數(shù),將它傳給test_request_context,偽造請(qǐng)求上下文,并在這個(gè)上下文環(huán)境中執(zhí)行task。既然偽造了請(qǐng)求,那也得為這個(gè)假請(qǐng)求生成響應(yīng),萬(wàn)一你定義了after_request這個(gè)在響應(yīng)后執(zhí)行的鉤子呢?通過(guò)process_response就可以激活after_request。
注意這里并沒有傳入應(yīng)用上下文,因?yàn)镕lask在創(chuàng)建請(qǐng)求上下文時(shí),會(huì)判斷應(yīng)用上下文是否為空,為空就先創(chuàng)建應(yīng)用上下文,再創(chuàng)建請(qǐng)求上下文。
完整代碼在這里。
celery = CeleryWithContext()創(chuàng)建的Celery實(shí)例就可以給各種task使用了。
另外創(chuàng)建一個(gè)celery_worker.py文件,生成一個(gè)Flask實(shí)例,供Celery的worker使用。
# celery_worker.py #!/usr/bin/env python from app import create_app from app.extensions import celery app = create_app()
啟動(dòng)worker:celery -A celery_worker.celery worker -l info
這下就可以使用Celery發(fā)郵件了。唉,還真是麻煩。
http://xion.io/post/code/celery-include-flask-request-context.html
博客地址
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/38572.html
摘要:目的曾經(jīng)想向前臺(tái)實(shí)時(shí)返回任務(wù)的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個(gè)小的,實(shí)現(xiàn)前臺(tái)實(shí)時(shí)獲取后臺(tái)傳輸?shù)娜蝿?wù)狀態(tài)。實(shí)現(xiàn)仿照其他例子實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的后臺(tái)任務(wù)監(jiān)控。 1. 目的曾經(jīng)想向前臺(tái)實(shí)時(shí)返回Celery任務(wù)的狀態(tài)監(jiān)控,也查看了很多博客,但是好多也沒能如愿,因此基于網(wǎng)上已有的博客已經(jīng)自己的嘗試,寫了一個(gè)小的demo,實(shí)現(xiàn)前臺(tái)實(shí)時(shí)獲取后...
摘要:使用異步框架,例如等等,裝飾異步任務(wù)。它是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,同時(shí)也支持任務(wù)調(diào)度。不存儲(chǔ)任務(wù)狀態(tài)。標(biāo)識(shí)要使用的默認(rèn)序列化方法的字符串。指定該任務(wù)的結(jié)果存儲(chǔ)后端用于此任務(wù)。 概述: ????????我們考慮一個(gè)場(chǎng)景,公司有一個(gè)需求,現(xiàn)在需要做一套web系統(tǒng),而這套系統(tǒng)某些功能需要使用...
摘要:基于網(wǎng),分享項(xiàng)目的組網(wǎng)架構(gòu)和部署。項(xiàng)目組網(wǎng)架構(gòu)架構(gòu)說(shuō)明流項(xiàng)目訪問分為兩個(gè)流,通過(guò)分兩個(gè)端口暴露給外部使用數(shù)據(jù)流用戶訪問網(wǎng)站。通過(guò)進(jìn)行配置,使用作為異步隊(duì)列來(lái)存儲(chǔ)任務(wù),并將處理結(jié)果存儲(chǔ)在中。 基于Raindrop網(wǎng),分享項(xiàng)目的組網(wǎng)架構(gòu)和部署。 項(xiàng)目組網(wǎng)架構(gòu) showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
摘要:解決辦法如下測(cè)試表格我們從引入,首先對(duì)文件名進(jìn)行編碼,然后中作為的參數(shù),這時(shí)候能成功下載文件,但是文件名是編碼后的名字,要解碼的話,我們需要在里面聲明編碼格式,即這樣的話,對(duì)文件名進(jìn)行解碼,我們的文件名就是中文了。 在寫 flask 后端的時(shí)候,特別是在做數(shù)據(jù)相關(guān)的操作的時(shí)候,產(chǎn)品往往需要我們做一個(gè)導(dǎo)出數(shù)據(jù)的需求,一般都是導(dǎo)出 excel 格式的文件。 那在 flask 上,如何實(shí)現(xiàn)請(qǐng)...
摘要:本文將介紹如何使用和抓取主流的技術(shù)博客文章,然后用搭建一個(gè)小型的技術(shù)文章聚合平臺(tái)。是谷歌開源的基于和的自動(dòng)化測(cè)試工具,可以很方便的讓程序模擬用戶的操作,對(duì)瀏覽器進(jìn)行程序化控制。相對(duì)于,是新的開源項(xiàng)目,而且是谷歌開發(fā),可以使用很多新的特性。 背景 說(shuō)到爬蟲,大多數(shù)程序員想到的是scrapy這樣受人歡迎的框架。scrapy的確不錯(cuò),而且有很強(qiáng)大的生態(tài)圈,有g(shù)erapy等優(yōu)秀的可視化界面。但...
閱讀 3734·2021-10-15 09:42
閱讀 2594·2021-09-03 10:50
閱讀 1628·2021-09-03 10:28
閱讀 1788·2019-08-30 15:54
閱讀 2510·2019-08-30 12:46
閱讀 401·2019-08-30 11:06
閱讀 2818·2019-08-30 10:54
閱讀 521·2019-08-29 12:59