国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

tornado配合celery及rabbitmq實現web request異步非阻塞

番茄西紅柿 / 1373人閱讀

摘要:主要是為了實現系統之間的雙向解耦而實現的。問題及優化隊列過長問題使用上述方案的異步非阻塞可能會依賴于的任務隊列長度,若隊列中的任務過多,則可能導致長時間等待,降低效率。

Tornado和Celery介紹 1.Tornado

</>復制代碼

  1. Tornado是一個用python編寫的一個強大的、可擴展的異步HTTP服務器,同時也是一個web開發框架。tornado是一個非阻塞式web服務器,其速度相當快。得利于其非阻塞的方式和對 epoll的運用,tornado每秒可以處理數以千計的連接,這意味著對于實時web服務來說,tornado是一個理想的web框架。它在處理嚴峻的網絡流量時表現得足夠強健,但卻在創建和編寫時有著足夠的輕量級,并能夠被用在大量的應用和工具中。
    進一步了解和學習tornado可移步:tornado官方文檔
2.Celery

</>復制代碼

  1. Celery 是一個簡單、靈活且可靠的,處理大量消息的分布式系統,它是一個專注于實時處理的任務隊列, 同時也支持任務調度。Celery 中有兩個比較關鍵的概念:

Worker: worker 是一個獨立的進程,它持續監視隊列中是否有需要處理的任務;

Broker: broker 也被稱為中間人或者協調者,broker 負責協調客戶端和 worker 的溝通??蛻舳讼?隊列添加消息,broker 負責把消息派發給 worker。

3.RabbitMQ

</>復制代碼

  1. RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

    RabbitMQ主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者無法快速消費,那么需要一個中間層。保存這個數據。

  2. 例如一個日志系統,很容易使用RabbitMQ簡化工作量,一個Consumer可以進行消息的正常處理,另一個Consumer負責對消息進行日志記錄,只要在程序中指定兩個Consumer所監聽的queue以相同的方式綁定到同一exchange即可,剩下的消息分發工作由RabbitMQ完成。

一般情況下,一個工具庫或者一個框架都是獨立的,有自己的feature或者功能點,可能依賴其他的庫,但絕不依賴于其他服務。但是celery是一個特例,如果celery沒有broker這個服務,那就完全不能用了。celery 支持多種 broker, 但主要以 RabbitMQ 和 Redis 為主,其他都是試驗性的,雖然也可以使用, 但是沒有專門的維護者。官方推薦使用rabbitmq作為生產環境下的broker,redis雖然也在官方指名的broker之列,但是實際使用上有可能還會出現以下莫名其妙的問題。

Celery的配置和使用方法詳見:官方文檔

從Tornado的異步講起 tornado的同步阻塞

用tornado進行web開發的過程中(實際上用任何語言或者框架開發都會遇到),開發者可能會發現有時候tornado的響應會變慢,追根溯源會發現原因之一就是因為該請求被其他請求阻塞了。這就有問題了?。。?!tornado不是標榜自己是異步Http Web Server嗎?不是號稱自己解決了C10K問題了嗎?這是欺騙消費者啊?。?!
但是,深入了解tornado之后才發現,人家說的異步非阻塞是有條件的,只有按照它說的來,才能實現真正的異步非阻塞。。。
我們先來看一個小例子:

</>復制代碼

  1. #!/bin/env python
  2. import tornado.httpserver
  3. import tornado.ioloop
  4. import tornado.options
  5. import tornado.web
  6. import tornado.httpclient
  7. import torndb
  8. import time
  9. from tornado.options import define, options
  10. define("port", default=8000, help="run on the given port", type=int)
  11. db = torndb.Connection("127.0.0.1:3306", "user_db", "username", "passwd")
  12. class MysqlHandler(tornado.web.RequestHandler):
  13. def get(self, flag):
  14. self.write(db.query("select * from table where flag=%s", flag))
  15. class NowHandler(tornado.web.RequestHandler):
  16. def get(self):
  17. self.write("i want you, right now!")
  18. if __name__ == "__main__":
  19. tornado.options.parse_command_line()
  20. app = tornado.web.Application(handlers=[
  21. (r"/mysql_query/(d+)", MysqlHandler),
  22. (r"/i_want_you_now", NowHandler)])
  23. http_server = tornado.httpserver.HTTPServer(app)
  24. http_server.listen(options.port)
  25. tornado.ioloop.IOLoop.instance().start()

當我們先請求/mysql_query接口時再請求/i_want_you_now接口,會發現原來可以立刻返回的第二個請求卻被一直阻塞到第一個接口執行完之后才返回。為什么?因為大部分web框架都是使用的同步阻塞模型來處理請求的,tornado的默認模型也不例外。但是tornado可是一個異步http服務器啊,不會這么弱吧?而且不上場景下都有一些相當耗時的操作,這些操作就會阻塞其他一些普通的請求,應該怎么解決這個問題?

相信很多使用過tornado的人會想到@tornado.web.asynchronous這個裝飾器,但是這就是tornado官方雞賊的地方了?。?!裝飾器 web.asynchronous 只能用在verb函數之前(即get/post/delete等),并且需要搭配tornado異步客戶端使用,如httpclient.AsyncHTTPClient,或者,你需要異步執行的那個函數(操作)必須也是異步的。。。(我是怨念滿滿的粗體?。。。?/strong>,而且加上這個裝飾器后,開發者必須在異步回調函數里顯式調用 RequestHandler.finish 才會結束這次 HTTP 請求。(因為tornado默認在函數處理返回時會自動關閉客戶端的連接)

什么意思呢?就是說,tornado:老子只給你提供異步的入口,你要是真想異步操作,要不你就使用我提供的一些異步客戶端來搞,不然你就自己實現一個異步的操作。

以操作MongoDB為例,如果你的函數中含有調用mongo的調用(使用pymongo庫),那么這時候你加asynchronous這個裝飾器就沒有任何效果了,因為你的mongo調用本身是同步的,如果想做成異步非阻塞的效果,需要使用mongo出品的另一個python driver -- motor,這個driver支持異步操作mongo,這時候你再加asynchronous裝飾器并操作mongo就可以實現異步非阻塞的效果了。

異步非阻塞的實現

所以,如果要使用tornado的異步調用,第一,使用tornado內置的異步客戶端如httpclient.AsyncHTTPClient等;第二,可參考內置異步客戶端,借助tornado.ioloop.IOLoop封裝一個自己的異步客戶端,但開發成本并不小。

然而,天無絕人之路,還是有辦法可以用較低的成本實現tornado的異步非阻塞的,那就是借助celery項目。前面說了,它是一個分布式的實時處理消息隊列調度系統,tornado接到請求后,可以把所有的復雜業務邏輯處理、數據庫操作以及IO等各種耗時的同步任務交給celery,由這個任務隊列異步處理完后,再返回給tornado。這樣只要保證tornado和celery的交互是異步的,那么整個服務是完全異步的。至于如何保證tornado和celery之間的交互是異步的,可以借助tornado-celery這個適配器來實現。

celery配合rabbitmq的工作流程如下:

這里我們來使用這幾個組件重寫前面的同步阻塞的例子:

</>復制代碼

  1. #!/bin/env python
  2. import tornado.httpserver
  3. import tornado.ioloop
  4. import tornado.options
  5. import tornado.web
  6. import tornado.httpclient
  7. import time
  8. import tcelery, tasks
  9. from tornado.options import define, options
  10. tcelery.setup_nonblocking_producer()
  11. define("port", default=8000, help="run on the given port", type=int)
  12. class AsyncMysqlHandler(tornado.web.RequestHandler):
  13. @tornado.web.asynchronous
  14. @tornado.gen.coroutine
  15. def get(self, flag):
  16. res = yield tornado.gen.Task(tasks.query_mysql.apply_async, args=[flag])
  17. self.write(res.result)
  18. self.finish()
  19. class NowHandler(tornado.web.RequestHandler):
  20. def get(self):
  21. self.write("i want you, right now!")
  22. if __name__ == "__main__":
  23. tornado.options.parse_command_line()
  24. app = tornado.web.Application(handlers=[
  25. (r"/mysql_query/(d+)", AsyncMysqlHandler),
  26. (r"/i_want_you_now", NowHandler)])
  27. http_server = tornado.httpserver.HTTPServer(app)
  28. http_server.listen(options.port)
  29. tornado.ioloop.IOLoop.instance().start()

這里有個新的tornado.gen.coroutine裝飾器, coroutine是3.0之后新增的裝飾器.以前的辦法是用回調函數的方式進行異步調用,如果使用回調函數的方式,則代碼如下:

</>復制代碼

  1. #!/bin/env python
  2. import tornado.httpserver
  3. import tornado.ioloop
  4. import tornado.options
  5. import tornado.web
  6. import tornado.httpclient
  7. import time
  8. import tcelery, tasks
  9. from tornado.options import define, options
  10. tcelery.setup_nonblocking_producer()
  11. define("port", default=8000, help="run on the given port", type=int)
  12. class AsyncMysqlHandler(tornado.web.RequestHandler):
  13. @tornado.web.asynchronous
  14. def get(self, flag):
  15. tasks.query_mysql.apply_async(args=[flag], callback=self.on_result)
  16. def on_result(self, response):
  17. self.write(response.result)
  18. self.finish()
  19. class NowHandler(tornado.web.RequestHandler):
  20. def get(self):
  21. self.write("i want you, right now!")
  22. if __name__ == "__main__":
  23. tornado.options.parse_command_line()
  24. app = tornado.web.Application(handlers=[
  25. (r"/mysql_query/(d+)", AsyncMysqlHandler),
  26. (r"/i_want_you_now", NowHandler)])
  27. http_server = tornado.httpserver.HTTPServer(app)
  28. http_server.listen(options.port)
  29. tornado.ioloop.IOLoop.instance().start()

使用callback的話始終覺得會是的代碼結構變得比較混亂,試想如果有大量異步回調,每一個都寫一個回調函數的話,勢必導致項目代碼結構變得不那么清晰和優雅,畢竟回調這種反人類的寫法還是很多人不喜歡的,但也看個人喜好,不喜歡callback風格的可以使用yield來進行異步調用。

tasks.py集中放置開發者需要異步執行的函數。

</>復制代碼

  1. import time
  2. import torndb
  3. from celery import Celery
  4. db = torndb.Connection("127.0.0.1:3306", "user_db", "username", "passwd")
  5. app = Celery("tasks", broker="amqp://guest:guest@localhost:5672")
  6. app.conf.CELERY_RESULT_BACKEND = "amqp://guest:guest@localhost:5672"
  7. @app.task(name="task.query_users")
  8. def query_mysql(flag):
  9. return db.query("select * from table where flag=%s", flag)
  10. if __name__ == "__main__":
  11. app.start()

然后啟動celery worker監聽任務隊列(消費者會從任務隊列中取走一個個的task并執行):

</>復制代碼

  1. celery -A tasks worker --loglevel=info

自此,依靠這種架構,可以實現tornado處理請求的完全異步調用。

問題及優化 1.隊列過長問題

使用上述方案的異步非阻塞可能會依賴于celery的任務隊列長度,若隊列中的任務過多,則可能導致長時間等待,降低效率。
解決方案:

啟動多個celery worker監聽任務隊列,使用多進程并發消費任務隊列,celery命令可以通過-concurrency參數來指定用來執行任務而prefork的worker進程,如果所有的worker都在執行任務,那么新添加的任務必須要等待有一個正在執行的任務完成后才能被執行,默認的concurrency數量是機器上CPU的數量。另外,celery是支持好幾個并發模式的,有prefork,threading,協程(gevent,eventlet),prefork在celery的介紹是,默認是用了multiprocess來實現的;可以通過-p參數指定其他的并發模型,如gevent(需自己配置好gevent環境)。

建立多個任務queue,把大量的任務分發到不同的queue中,減輕單個queue時可能出現的任務數量過載。

2.水平擴展優化

前面說了celery是一個分布式系統,也就是說,基于celery的項目可無痛實現分布式擴展,前面寫的tornado和celery配合的demo,也可以實現獨立部署,即tornado server和celery server其實可以分開部署,即分布在不同的服務器上,celery server部署自己的tasks.py任務,并啟動celery worker監聽,然后在tornado server上添加以下代碼:

</>復制代碼

  1. from celery import Celery
  2. app = Celery(broker = "amqp://",)

并使用Celery的send_task函數調用任務:

</>復制代碼

  1. app.send_task("function_name", args=[param1, param2, param3...])

即可實現tornado和celery的完全解耦。

后續:

另外,了解到tornado.concurrent.futures(py3自帶這個庫,py2需多帶帶安裝)這個module可以實現自定義函數的異步化,目前還沒有深入了解這個東西,有時間去研究一下這個東西,有心得再分享一下這個module相關的知識。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/41978.html

相關文章

  • django開發-django和tornado的不同

    摘要:中常用的幾個框架有等,今天來總結一下和的不同。本文使用的環境是。文件可以加載路由信息和項目配置信息,文件負責啟動項目。以上就簡單的比較了和幾個方面的不同,它們各有優缺點,實際工作中可以根據不同的需求選擇不同的框架進行開發。 python中常用的幾個web框架有django, tornado, flask等,今天來總結一下django和tornado的不同。工作中django和torna...

    Reducto 評論0 收藏0
  • 分布式隊列神器 Celery

    摘要:是什么是一個由編寫的簡單靈活可靠的用來處理大量信息的分布式系統它同時提供操作和維護分布式系統所需的工具。專注于實時任務處理,支持任務調度。說白了,它是一個分布式隊列的管理工具,我們可以用提供的接口快速實現并管理一個分布式的任務隊列。 Celery 是什么? Celery 是一個由 Python 編寫的簡單、靈活、可靠的用來處理大量信息的分布式系統,它同時提供操作和維護分布式系統所需的工...

    趙春朋 評論0 收藏0
  • tornado6與python3.7 異步新姿勢

    摘要:這是我重新復習的原因放棄了之前自己實現的全面擁抱的這個改動是非常大的而且閱讀的源碼可以發現其中大部分函數都支持了類型檢驗和返回值提示值得閱讀 廢話不多說,直接上代碼 __auth__ = aleimu __doc__ = 學習tornado6.0+ 版本與python3.7+ import time import asyncio import tornado.gen import t...

    maxmin 評論0 收藏0
  • 記一次tornado QPS 優化

    摘要:初步分析提升可從兩方面入手,一個是增加并發數,其二是減少平均響應時間。大部分的時間花在系統與數據庫的交互上,到這,便有了一個優化的主題思路最大限度的降低平均響應時間。不要輕易否定一項公認的技術真理,要拿數據說話。 本文最早發表于個人博客:PylixmWiki 應項目的需求,我們使用tornado開發了一個api系統,系統開發完后,在8核16G的虛機上經過壓測qps只有200+。與我們當...

    Doyle 評論0 收藏0
  • Flask+Celery+Redis實現隊列化異步任務

    摘要:使用異步框架,例如等等,裝飾異步任務。它是一個專注于實時處理的任務隊列,同時也支持任務調度。不存儲任務狀態。標識要使用的默認序列化方法的字符串。指定該任務的結果存儲后端用于此任務。 概述: ????????我們考慮一個場景,公司有一個需求,現在需要做一套web系統,而這套系統某些功能需要使用...

    Ali_ 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<