摘要:是并發(fā)的一種方式。并不能帶來真正的并行。可交給執(zhí)行的任務(wù),稱為協(xié)程。輸出等待三秒鐘程序退出現(xiàn)在改用輸出等待三秒鐘程序沒有退出三秒鐘過后,結(jié)束,但是程序并不會(huì)退出。但是如果關(guān)閉了,就不能再運(yùn)行了此處異常建議調(diào)用,以徹底清理對象防止誤用。
所謂「異步 IO」,就是你發(fā)起一個(gè) IO 操作,卻不用等它結(jié)束,你可以繼續(xù)做其他事情,當(dāng)它結(jié)束時(shí),你會(huì)得到通知。
Asyncio 是并發(fā)(concurrency)的一種方式。對 Python 來說,并發(fā)還可以通過線程(threading)和多進(jìn)程(multiprocessing)來實(shí)現(xiàn)。
Asyncio 并不能帶來真正的并行(parallelism)。當(dāng)然,因?yàn)?GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的并行。
可交給 asyncio 執(zhí)行的任務(wù),稱為協(xié)程(coroutine)。一個(gè)協(xié)程可以放棄執(zhí)行,把機(jī)會(huì)讓給其它協(xié)程(即 yield from 或 await)。`
定義協(xié)程協(xié)程的定義,需要使用 async def 語句。
async def do_some_work(x): pass
do_some_work 便是一個(gè)協(xié)程。
準(zhǔn)確來說,do_some_work 是一個(gè)協(xié)程函數(shù),可以通過 asyncio.iscoroutinefunction 來驗(yàn)證:
print(asyncio.iscoroutinefunction(do_some_work)) # True
這個(gè)協(xié)程什么都沒做,我們讓它睡眠幾秒,以模擬實(shí)際的工作量 :
async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)
在解釋 await 之前,有必要說明一下協(xié)程可以做哪些事。協(xié)程可以:
* 等待一個(gè) future 結(jié)束 * 等待另一個(gè)協(xié)程(產(chǎn)生一個(gè)結(jié)果,或引發(fā)一個(gè)異常) * 產(chǎn)生一個(gè)結(jié)果給正在等它的協(xié)程 * 引發(fā)一個(gè)異常給正在等它的協(xié)程
asyncio.sleep 也是一個(gè)協(xié)程,所以 await asyncio.sleep(x) 就是等待另一個(gè)協(xié)程。可參見 asyncio.sleep 的文檔:
sleep(delay, result=None, *, loop=None) Coroutine that completes after a given time (in seconds).運(yùn)行協(xié)程
調(diào)用協(xié)程函數(shù),協(xié)程并不會(huì)開始運(yùn)行,只是返回一個(gè)協(xié)程對象,可以通過 asyncio.iscoroutine 來驗(yàn)證:
print(asyncio.iscoroutine(do_some_work(3))) # True
此處還會(huì)引發(fā)一條警告:
async1.py:16: RuntimeWarning: coroutine "do_some_work" was never awaited print(asyncio.iscoroutine(do_some_work(3)))
要讓這個(gè)協(xié)程對象運(yùn)行的話,有兩種方式:
* 在另一個(gè)已經(jīng)運(yùn)行的協(xié)程中用 `await` 等待它 * 通過 `ensure_future` 函數(shù)計(jì)劃它的執(zhí)行
簡單來說,只有 loop 運(yùn)行了,協(xié)程才可能運(yùn)行。
下面先拿到當(dāng)前線程缺省的 loop ,然后把協(xié)程對象交給 loop.run_until_complete,協(xié)程對象隨后會(huì)在 loop 里得到運(yùn)行。
loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3))
run_until_complete 是一個(gè)阻塞(blocking)調(diào)用,直到協(xié)程運(yùn)行結(jié)束,它才返回。這一點(diǎn)從函數(shù)名不難看出。
run_until_complete 的參數(shù)是一個(gè) future,但是我們這里傳給它的卻是協(xié)程對象,之所以能這樣,是因?yàn)樗趦?nèi)部做了檢查,通過 ensure_future 函數(shù)把協(xié)程對象包裝(wrap)成了 future。所以,我們可以寫得更明顯一些:
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
完整代碼:
import asyncio async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3))
運(yùn)行結(jié)果:
Waiting 3 <三秒鐘后程序結(jié)束>回調(diào)
假如協(xié)程是一個(gè) IO 的讀操作,等它讀完數(shù)據(jù)后,我們希望得到通知,以便下一步數(shù)據(jù)的處理。這一需求可以通過往 future 添加回調(diào)來實(shí)現(xiàn)。
def done_callback(futu): print("Done") futu = asyncio.ensure_future(do_some_work(3)) futu.add_done_callback(done_callback) loop.run_until_complete(futu)多個(gè)協(xié)程
實(shí)際項(xiàng)目中,往往有多個(gè)協(xié)程,同時(shí)在一個(gè) loop 里運(yùn)行。為了把多個(gè)協(xié)程交給 loop,需要借助 asyncio.gather 函數(shù)。
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))
或者先把協(xié)程存在列表里:
coros = [do_some_work(1), do_some_work(3)] loop.run_until_complete(asyncio.gather(*coros))
運(yùn)行結(jié)果:
Waiting 3 Waiting 1 <等待三秒鐘> Done
這兩個(gè)協(xié)程是并發(fā)運(yùn)行的,所以等待的時(shí)間不是 1 + 3 = 4 秒,而是以耗時(shí)較長的那個(gè)協(xié)程為準(zhǔn)。
參考函數(shù) gather 的文檔:
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.
發(fā)現(xiàn)也可以傳 futures 給它:
futus = [asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3))] loop.run_until_complete(asyncio.gather(*futus))
gather 起聚合的作用,把多個(gè) futures 包裝成單個(gè) future,因?yàn)?loop.run_until_complete 只接受單個(gè) future。
run_until_complete 和 run_forever我們一直通過 run_until_complete 來運(yùn)行 loop ,等到 future 完成,run_until_complete 也就返回了。
async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) print("Done") loop = asyncio.get_event_loop() coro = do_some_work(3) loop.run_until_complete(coro)
輸出:
Waiting 3 <等待三秒鐘> Done <程序退出>
現(xiàn)在改用 run_forever:
async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) print("Done") loop = asyncio.get_event_loop() coro = do_some_work(3) asyncio.ensure_future(coro) loop.run_forever()
輸出:
Waiting 3 <等待三秒鐘> Done <程序沒有退出>
三秒鐘過后,future 結(jié)束,但是程序并不會(huì)退出。run_forever 會(huì)一直運(yùn)行,直到 stop 被調(diào)用,但是你不能像下面這樣調(diào) stop:
loop.run_forever() loop.stop()
run_forever 不返回,stop 永遠(yuǎn)也不會(huì)被調(diào)用。所以,只能在協(xié)程中調(diào) stop:
async def do_some_work(loop, x): print("Waiting " + str(x)) await asyncio.sleep(x) print("Done") loop.stop()
這樣并非沒有問題,假如有多個(gè)協(xié)程在 loop 里運(yùn)行:
asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3)) loop.run_forever()
第二個(gè)協(xié)程沒結(jié)束,loop 就停止了——被先結(jié)束的那個(gè)協(xié)程給停掉的。
要解決這個(gè)問題,可以用 gather 把多個(gè)協(xié)程合并成一個(gè) future,并添加回調(diào),然后在回調(diào)里再去停止 loop。
async def do_some_work(loop, x): print("Waiting " + str(x)) await asyncio.sleep(x) print("Done") def done_callback(loop, futu): loop.stop() loop = asyncio.get_event_loop() futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3)) futus.add_done_callback(functools.partial(done_callback, loop)) loop.run_forever()
其實(shí)這基本上就是 run_until_complete 的實(shí)現(xiàn)了,run_until_complete 在內(nèi)部也是調(diào)用 run_forever。
Close Loop?以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢?
簡單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行。:
loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close()
但是如果關(guān)閉了,就不能再運(yùn)行了:
loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常
建議調(diào)用 loop.close,以徹底清理 loop 對象防止誤用。
gather vs. waitasyncio.gather 和 asyncio.wait 功能相似。
coros = [do_some_work(loop, 1), do_some_work(loop, 3)] loop.run_until_complete(asyncio.wait(coros))
具體差別可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。
TimerC++ Boost.Asio 提供了 IO 對象 timer,但是 Python 并沒有原生支持 timer,不過可以用 asyncio.sleep 模擬。
async def timer(x, cb): futu = asyncio.ensure_future(asyncio.sleep(x)) futu.add_done_callback(cb) await futu t = timer(3, lambda futu: print("Done")) loop.run_until_complete(t)
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/38529.html
摘要:當(dāng)被調(diào)用時(shí),表示已經(jīng)斷開連接。第三版去掉第三版的目的是去掉。協(xié)程保持不變,但是已被剔除不再需要請求發(fā)送之后,繼續(xù)異步等待數(shù)據(jù)的接收,即。的作用是結(jié)束那個(gè)導(dǎo)致等待的,這樣也就可以結(jié)束了結(jié)束,以便結(jié)束。 關(guān)于 Asyncio 的其他文章: Python 的異步 IO:Asyncio 簡介 Python 的異步 IO:Aiohttp Client 代碼分析 如果不知道 Asyncio 是...
摘要:具有以下基本同步原語子進(jìn)程提供了通過創(chuàng)建和管理子進(jìn)程的。雖然隊(duì)列不是線程安全的,但它們被設(shè)計(jì)為專門用于代碼。表示異步操作的最終結(jié)果。 Python的asyncio是使用 async/await 語法編寫并發(fā)代碼的標(biāo)準(zhǔn)庫。通過上一節(jié)的講解,我們了解了它不斷變化的發(fā)展歷史。到了Python最新穩(wěn)定版 3.7 這個(gè)版本,asyncio又做了比較大的調(diào)整,把這個(gè)庫的API分為了 高層級API和...
摘要:并發(fā)的方式有多種,多線程,多進(jìn)程,異步等。多線程和多進(jìn)程之間的場景切換和通訊代價(jià)很高,不適合密集型的場景關(guān)于多線程和多進(jìn)程的特點(diǎn)已經(jīng)超出本文討論的范疇,有興趣的同學(xué)可以自行搜索深入理解。 編程中,我們經(jīng)常會(huì)遇到并發(fā)這個(gè)概念,目的是讓軟件能充分利用硬件資源,提高性能。并發(fā)的方式有多種,多線程,多進(jìn)程,異步IO等。多線程和多進(jìn)程更多應(yīng)用于CPU密集型的場景,比如科學(xué)計(jì)算的時(shí)間都耗費(fèi)在CPU...
摘要:創(chuàng)建第一個(gè)協(xié)程推薦使用語法來聲明協(xié)程,來編寫異步應(yīng)用程序。協(xié)程兩個(gè)緊密相關(guān)的概念是協(xié)程函數(shù)通過定義的函數(shù)協(xié)程對象調(diào)用協(xié)程函數(shù)返回的對象。它是一個(gè)低層級的可等待對象,表示一個(gè)異步操作的最終結(jié)果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...
摘要:所以與多線程相比,線程的數(shù)量越多,協(xié)程性能的優(yōu)勢越明顯。值得一提的是,在此過程中,只有一個(gè)線程在執(zhí)行,因此這與多線程的概念是不一樣的。 真正有知識(shí)的人的成長過程,就像麥穗的成長過程:麥穗空的時(shí)候,麥子長得很快,麥穗驕傲地高高昂起,但是,麥穗成熟飽滿時(shí),它們開始謙虛,垂下麥芒。 ——蒙田《蒙田隨筆全集》 上篇論述了關(guān)于python多線程是否是雞肋的問題,得到了一些網(wǎng)友的認(rèn)可,當(dāng)然也有...
閱讀 711·2021-11-16 11:44
閱讀 3541·2019-08-26 12:13
閱讀 3236·2019-08-26 10:46
閱讀 2352·2019-08-23 12:37
閱讀 1180·2019-08-22 18:30
閱讀 2526·2019-08-22 17:30
閱讀 1835·2019-08-22 17:26
閱讀 2284·2019-08-22 16:20