摘要:是之后引入的標準庫的,這個包使用事件循環驅動的協程實現并發。沒有能從外部終止線程,因為線程隨時可能被中斷。上一篇并發使用處理并發我們介紹過的,在中,只是調度執行某物的結果。
asyncio
asyncio 是Python3.4 之后引入的標準庫的,這個包使用事件循環驅動的協程實現并發。
asyncio 包在引入標準庫之前代號 “Tulip”(郁金香),所以在網上搜索資料時,會經??吹竭@種花的名字。
wiki 上說:事件循環是”一種等待程序分配事件或者消息的編程架構“。基本上來說事件循環就是:”當A發生時,執行B"?;蛘哂米詈唵蔚睦觼斫忉屵@一概念就是每個瀏覽器中都存在的JavaScript事件循環。當你點擊了某個東西(“當A發生時”),這一點擊動作會發送給JavaScript的事件循環,并檢查是否存在注冊過的onclick 回調來處理這一點擊(執行B)。只要有注冊過的回調函數就會伴隨點擊動作的細節信息被執行。事件循環被認為是一種虛幻是因為它不停的手機事件并通過循環來發如何應對這些事件。
對 Python 來說,用來提供事件循環的 asyncio 被加入標準庫中。asyncio 重點解決網絡服務中的問題,事件循環在這里將來自套接字(socket)的 I/O 已經準備好讀和/或寫作為“當A發生時”(通過selectors模塊)。除了 GUI 和 I/O,事件循環也經常用于在別的線程或子進程中執行代碼,并將事件循環作為調節機制(例如,合作式多任務)。如果你恰好理解 Python 的 GIL,事件循環對于需要釋放 GIL 的地方很有用。
線程與協程我們先看兩斷代碼,分別用 threading 模塊和asyncio 包實現的一段代碼。
# sinner_thread.py import threading import itertools import time import sys class Signal: # 這個類定義一個可變對象,用于從外部控制線程 go = True def spin(msg, signal): # 這個函數會在多帶帶的線程中運行,signal 參數是前邊定義的Signal類的實例 write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle("|/-"): # itertools.cycle 函數從指定的序列中反復不斷地生成元素 status = char + " " + msg write(status) flush() write("x08" * len(status)) # 使用退格符把光標移回行首 time.sleep(.1) # 每 0.1 秒刷新一次 if not signal.go: # 如果 go屬性不是 True,退出循環 break write(" " * len(status) + "x08" * len(status)) # 使用空格清除狀態消息,把光標移回開頭 def slow_function(): # 模擬耗時操作 # 假裝等待I/O一段時間 time.sleep(3) # 調用sleep 會阻塞主線程,這么做事為了釋放GIL,創建從屬線程 return 42 def supervisor(): # 這個函數設置從屬線程,顯示線程對象,運行耗時計算,最后殺死進程 signal = Signal() spinner = threading.Thread(target=spin, args=("thinking!", signal)) print("spinner object:", spinner) # 顯示線程對象 輸出 spinner object:spinner.start() # 啟動從屬進程 result = slow_function() # 運行slow_function 行數,阻塞主線程。同時叢書線程以動畫形式旋轉指針 signal.go = False spinner.join() # 等待spinner 線程結束 return result def main(): result = supervisor() print("Answer", result) if __name__ == "__main__": main()
執行一下,結果大致是這個樣子:
這是一個動圖,“thinking" 前的 線是會動的(為了錄屏,我把sleep 的時間調大了)
python 并沒有提供終止線程的API,所以若想關閉線程,必須給線程發送消息。這里我們使用signal.go 屬性:在主線程中把它設置為False后,spinner 線程會接收到,然后退出
現在我們再看下使用 asyncio 包的版本:
# spinner_asyncio.py # 通過協程以動畫的形式顯示文本式旋轉指針 import asyncio import itertools import sys @asyncio.coroutine # 打算交給asyncio 處理的協程要使用 @asyncio.coroutine 裝飾 def spin(msg): write, flush = sys.stdout.write, sys.stdout.flush for char in itertools.cycle("|/-"): # itertools.cycle 函數從指定的序列中反復不斷地生成元素 status = char + " " + msg write(status) flush() write("x08" * len(status)) # 使用退格符把光標移回行首 try: yield from asyncio.sleep(0.1) # 使用 yield from asyncio.sleep(0.1) 代替 time.sleep(.1), 這樣的休眠不會阻塞事件循環 except asyncio.CancelledError: # 如果 spin 函數蘇醒后拋出 asyncio.CancelledError 異常,其原因是發出了取消請求 break write(" " * len(status) + "x08" * len(status)) # 使用空格清除狀態消息,把光標移回開頭 @asyncio.coroutine def slow_function(): # 5 現在此函數是協程,使用休眠假裝進行I/O 操作時,使用 yield from 繼續執行事件循環 # 假裝等待I/O一段時間 yield from asyncio.sleep(3) # 此表達式把控制權交給主循環,在休眠結束后回復這個協程 return 42 @asyncio.coroutine def supervisor(): #這個函數也是協程,因此可以使用 yield from 驅動 slow_function spinner = asyncio.async(spin("thinking!")) # asyncio.async() 函數排定協程的運行時間,使用一個 Task 對象包裝spin 協程,并立即返回 print("spinner object:", spinner) # Task 對象,輸出類似 spinner object:> # 驅動slow_function() 函數,結束后,獲取返回值。同事事件循環繼續運行, # 因為slow_function 函數最后使用yield from asyncio.sleep(3) 表達式把控制權交給主循環 result = yield from slow_function() # Task 對象可以取消;取消后會在協程當前暫停的yield處拋出 asyncio.CancelledError 異常 # 協程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消 spinner.cancel() return result def main(): loop = asyncio.get_event_loop() # 獲取事件循環引用 # 驅動supervisor 協程,讓它運行完畢;這個協程的返回值是這次調用的返回值 result = loop.run_until_complete(supervisor()) loop.close() print("Answer", result) if __name__ == "__main__": main()
除非想阻塞主線程,從而凍結事件循環或整個應用,否則不要再 asyncio 協程中使用 time.sleep().
如果協程需要在一段時間內什么都不做,應該使用 yield from asyncio.sleep(DELAY)
使用 @asyncio.coroutine 裝飾器不是強制要求,但建議這么做因為這樣能在代碼中突顯協程,如果還沒從中產出值,協程就把垃圾回收了(意味著操作未完成,可能有缺陷),可以發出警告。這個裝飾器不會預激協程。
這兩段代碼的執行結果基本相同,現在我們看一下兩段代碼的核心代碼 supervisor 主要區別:
asyncio.Task 對象差不多與 threading.Thread 對象等效(Task 對象像是實現寫作時多任務的庫中的綠色線程
Task 對象用于驅動協程,Thread 對象用于調用可調用的對象
Task 對象不由自己動手實例化,而是通過把協程傳給 asyncio.async(...) 函數或 loop.create_task(...) 方法獲取
獲取的Task 對象已經排定了運行時間;Thread 實例必須調用start方法,明確告知它運行
在線程版supervisor函數中,slow_function 是普通的函數,由線程直接調用,而異步版的slow_function 函數是協程,由yield from 驅動。
沒有API能從外部終止線程,因為線程隨時可能被中斷。而如果想終止任務,可以使用Task.cancel() 實例方法,在協程內部拋出CancelledError 異常。協程可以在暫停的yield 處捕獲這個異常,處理終止請求
supervisor 協程必須在main 函數中由loop.run_until_complete 方法執行。
asyncio.Future:故意不阻塞協程和線程相比關鍵的一個優點是,
線程必須記住保留鎖,去保護程序中的重要部分,防止多步操作再執行的過程中中斷,防止山水處于于曉狀態
協程默認會做好保護,我們必須顯式產出(使用yield 或 yield from 交出控制權)才能讓程序的余下部分運行。
asynci.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現方式不同,不可互換。
上一篇[python并發 1:使用 futures 處理并發]()我們介紹過 concurrent.futures.Future 的 future,在 concurrent.futures.Future 中,future只是調度執行某物的結果。在 asyncio 包中,BaseEventLoop.create_task(...) 方法接收一個協程,排定它的運行時間,然后返回一個asyncio.Task 實例(也是asyncio.Future 類的實例,因為 Task 是 Future 的子類,用于包裝協程。(在 concurrent.futures.Future 中,類似的操作是Executor.submit(...))。
與concurrent.futures.Future 類似,asyncio.Future 類也提供了
.done() 返回布爾值,表示Future 是否已經執行
.add_done_callback() 這個方法只有一個參數,類型是可調用對象,Future運行結束后會回調這個對象。
.result() 這個方法沒有參數,因此不能指定超時時間。 如果調用 .result() 方法時期還沒有運行完畢,會拋出 asyncio.InvalidStateError 異常。
對應的 concurrent.futures.Future 類中的 Future 運行結束后調用result(), 會返回可調用對象的結果或者拋出執行可調用對象時拋出的異常,如果是 Future 沒有運行結束時調用 f.result()方法,這時會阻塞調用方所在的線程,直到有結果返回。此時result 方法還可以接收 timeout 參數,如果在指定的時間內 Future 沒有運行完畢,會拋出 TimeoutError 異常。
我們使用asyncio.Future 時, 通常使用yield from,從中獲取結果,而不是使用 result()方法 yield from 表達式在暫停的協程中生成返回值,回復執行過程。
asyncio.Future 類的目的是與 yield from 一起使用,所以通常不需要使用以下方法:
不需調用 my_future.add_down_callback(...), 因為可以直接把想在 future 運行結束后的操作放在協程中 yield from my_future 表達式的后邊。(因為協程可以暫停和恢復函數)
無需調用 my_future.result(), 因為 yield from 產生的結果就是(result = yield from my_future)
在 asyncio 包中,可以使用yield from 從asyncio.Future 對象中產出結果。這也就意味著我們可以這么寫:
res = yield from foo() # foo 可以是協程函數,也可以是返回 Future 或 task 實例的普通函數asyncio.async(...)* 函數
asyncio.async(coro_or_future, *, loop=None)
這個函數統一了協程和Future: 第一個參數可以是二者中的任意一個。如果是Future 或者 Task 對象,就直接返回,如果是協程,那么async 函數會自動調用 loop.create_task(...) 方法創建 Task 對象。 loop 參數是可選的,用于傳入事件循環; 如果沒有傳入,那么async函數會通過調用asyncio.get_event_loop() 函數獲取循環對象。
BaseEventLoop.create_task(coro)這個方法排定協程的執行時間,返回一個 asyncio.Task 對象。如果在自定義的BaseEventLoop 子類上調用,返回的對象可能是外部庫中與Task類兼容的某個類的實例。
BaseEventLoop.create_task() 方法只在Python3.4.2 及以上版本可用。 Python3.3 只能使用 asyncio.async(...)函數。
如果想在Python控制臺或者小型測試腳本中實驗future和協程,可以使用下面的片段:
import asyncio def run_sync(coro_or_future): loop = asyncio.get_event_loop() return loop.run_until_complete(coro_or_future) a = run_sync(some_coroutine())使用asyncio 和 aiohttp 包下載
現在,我們了解了asyncio 的基礎知識,是時候使用asyncio 來重寫我們 上一篇 [python并發 1:使用 futures 處理并發]() 下載國旗的腳本了。
先看一下代碼:
import asyncio import aiohttp # 需要pip install aiohttp from flags import save_flag, show, main, BASE_URL @asyncio.coroutine # 我們知道,協程應該使用 asyncio.coroutine 裝飾 def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) # 阻塞的操作通過協程實現,客戶代碼通過yield from 把指責委托給協程,以便異步操作 resp = yield from aiohttp.request("GET", url) # 讀取也是異步操作 image = yield from resp.read() return image @asyncio.coroutine def download_one(cc): # 這個函數也必須是協程,因為用到了yield from image = yield from get_flag(cc) show(cc) save_flag(image, cc.lower() + ".gif") return cc def download_many(cc_list): loop = asyncio.get_event_loop() # 獲取事件序號底層實現的引用 to_do = [download_one(cc) for cc in sorted(cc_list)] # 調用download_one 獲取各個國旗,構建一個生成器對象列表 # 雖然函數名稱是wait 但它不是阻塞型函數,wait 是一個協程,等傳給他的所有協程運行完畢后結束 wait_coro = asyncio.wait(to_do) res, _ = loop.run_until_complete(wait_coro) # 執行事件循環,知道wait_coro 運行結束;事件循環運行的過程中,這個腳本會在這里阻塞。 loop.close() # 關閉事件循環 return len(res) if __name__ == "__main__": main(download_many)
這段代碼的運行簡述如下:
在download_many 函數獲取一個事件循環,處理調用download_one 函數生成的幾個協程對象
asyncio 事件循環一次激活各個協程
客戶代碼中的協程(get_flag)使用 yield from 把指責委托給庫里的協程(aiohttp.request)時,控制權交還給事件循環,執行之前排定的協程
事件循環通過基于回調的底層API,在阻塞的操作執行完畢后獲得通知。
獲得通知后,主循環把結果發給暫停的協程
協程向前執行到下一個yield from 表達式,例如 get_flag 函數的yield from resp.read()。事件循環再次得到控制權,重復第4~6步,直到循環終止。
download_many 函數中,我們使用了 asyncio.wait(...) 函數,這個函數是一個協程,協程的參數是一個由future或者協程構成的可迭代對象;wait 會分別把各個協程包裝進一個Task對象。最終的結果是,wait 處理的所有對象都通過某種方式變成Future 類的實例。
wait 是協程函數,因此,返回的是一個協程或者生成器對象;waite_coro 變量中存儲的就是這種對象
loop.run_until_complete 方法的參數是一個future 或協程。如果是協程,run_until_complete 方法與 wait 函數一樣,把協程包裝進一個Task 對象中。這里 run_until_complete 方法把 wait_coro 包裝進一個Task 對象中,由yield from 驅動。wait_coro 運行結束后返回兩個參數,第一個參數是結束的future 第二個參數是未結束的future。
有兩個命名參數,timeout 和 return_when 如果設置了可能會返回未結束的future。
有一點你可能也注意到了,我們重寫了get_flags 函數,是因為之前用到的 requests 庫執行的是阻塞型I/O操作。為了使用 asyncio 包,我們必須把函數改成異步版。
小技巧如果你覺得 使用了協程后代碼難以理解,可以采用 Python之父(Guido van Rossum)的建議,假裝沒有yield from。
已上邊這段代碼為例:
@asyncio.coroutine def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request("GET", url) image = yield from resp.read() return image # 把yield form 去掉 def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = aiohttp.request("GET", url) image = resp.read() return image # 現在是不是清晰多了知識點
在asyncio 包的API中使用 yield from 時,有個細節要注意:
使用asyncio包時,我們編寫的異步代碼中包含由asyncio本身驅動的協程(委派生成器),而生成器最終把指責委托給asyncio包或者第三方庫中的協程。這種處理方式相當于架起了管道,讓asyncio事件循環驅動執行底層異步I/O的庫函數。
避免阻塞型調用我們先看一個圖,這個圖顯示了電腦從不同存儲介質中讀取數據的延遲情況:
通過這個圖,我們可以看到,阻塞型調用對于CPU來說是巨大的浪費。有什么辦法可以避免阻塞型調用中止整個應用程序么?
有兩種方法:
在多帶帶的線程中運行各個阻塞型操作
把每個阻塞型操作轉化成非阻塞的異步調用使用
當然我們推薦第二種方案,因為第一種方案中如果每個連接都使用一個線程,成本太高。
第二種我們可以使用把生成器當做協程使用的方式實現異步編程。對事件循環來說,調用回調與在暫停的協程上調用 .send() 方法效果差不多。各個暫停的協程消耗的內存比線程小的多。
現在,你應該能理解為什么 flags_asyncio.py 腳本比 flags.py 快的多了吧。
改進 asyncio 下載腳本因為flags.py 是依次同步下載,每次下載都要用幾十億個CPU周期等待結果。而在flags_asyncio.py中,在download_many 函數中調用loop.run_until_complete 方法時,事件循環驅動各個download_one 協程,運行到yield from 表達式出,那個表達式又驅動各個 get_flag 協程,運行到第一個yield from 表達式處,調用 aiohttp.request()函數。這些調用不會阻塞,因此在零點幾秒內所有請求都可以全部開始。
現在我們改進一下上邊的 flags_asyncio.py,在其中添加上異常處理,計數器
import asyncio import collections from collections import namedtuple from enum import Enum import aiohttp from aiohttp import web from flags import save_flag, show, main, BASE_URL DEFAULT_CONCUR_REQ = 5 MAX_CONCUR_REQ = 1000 Result = namedtuple("Result", "status data") HTTPStatus = Enum("Status", "ok not_found error") # 自定義異常用于包裝其他HTTP貨網絡異常,并獲取country_code,以便報告錯誤 class FetchError(Exception): def __init__(self, country_code): self.country_code = country_code @asyncio.coroutine def get_flag(cc): # 此協程有三種返回結果: # 1. 返回下載到的圖片 # 2. HTTP 響應為404 時,拋出web.HTTPNotFound 異常 # 3. 返回其他HTTP狀態碼時, 拋出aiohttp.HttpProcessingError url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = yield from aiohttp.request("GET", url) if resp.status == 200: image = yield from resp.read() return image elif resp.status == 404: raise web.HttpNotFound() else: raise aiohttp.HttpProcessionError( code=resp.status, message=resp.reason, headers=resp.headers ) @asyncio.coroutine def download_one(cc, semaphore): # semaphore 參數是 asyncio.Semaphore 類的實例 # Semaphore 類是同步裝置,用于限制并發請求 try: with (yield from semaphore): # 在yield from 表達式中把semaphore 當成上下文管理器使用,防止阻塞整個系統 # 如果semaphore 計數器的值是所允許的最大值,只有這個協程會阻塞 image = yield from get_flag(cc) # 退出with語句后 semaphore 計數器的值會遞減, # 解除阻塞可能在等待同一個semaphore對象的其他協程實例 except web.HTTPNotFound: status = HTTPStatus.not_found msg = "not found" except Exception as exc: raise FetchError(cc) from exc else: save_flag(image, cc.lower() + ".gif") status = HTTPStatus.ok msg = "ok" return Result(status, cc) @asyncio.coroutine def downloader_coro(cc_list): counter = collections.Counter() # 創建一個 asyncio.Semaphore 實例,最多允許激活MAX_CONCUR_REQ個使用這個計數器的協程 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ) # 多次調用 download_one 協程,創建一個協程對象列表 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)] # 獲取一個迭代器,這個迭代器會在future運行結束后返回future to_do_iter = asyncio.as_completed(to_do) for future in to_do_iter: # 迭代允許結束的 future try: res = yield from future # 獲取asyncio.Future 對象的結果(也可以調用future.result) except FetchError as exc: # 拋出的異常都包裝在FetchError 對象里 country_code = exc.country_code try: # 嘗試從原來的異常 (__cause__)中獲取錯誤消息 error_msg = exc.__cause__.args[0] except IndexError: # 如果在原來的異常中找不到錯誤消息,使用所連接異常的類名作為錯誤消息 error_msg = exc.__cause__.__class__.__name__ if error_msg: msg = "*** Error for {}: {}" print(msg.format(country_code, error_msg)) status = HTTPStatus.error else: status = res.status counter[status] += 1 return counter def download_many(cc_list): loop = asyncio.get_event_loop() coro = downloader_coro(cc_list) counts = loop.run_until_complete(coro) loop.close() return counts if __name__ == "__main__": main(download_many)
由于協程發起的請求速度較快,為了防止向服務器發起太多的并發請求,使服務器過載,我們在download_coro 函數中創建一個asyncio.Semaphore 實例,然后把它傳給download_one 函數。
Semaphore 對象維護著一個內部計數器,若在對象上調用 .acquire() 協程方法,計數器則遞減;若在對象上調用 .release() 協程方法,計數器則遞增。計數器的值是在初始化的時候設定。
如果計數器大于0,那么調用 .acquire() 方法不會阻塞,如果計數器為0, .acquire() 方法會阻塞調用這個方法的協程,直到其他協程在同一個 Semaphore 對象上調用 .release() 方法,讓計數器遞增。
在上邊的代碼中,我們并沒有手動調用 .acquire() 或 .release() 方法,而是在 download_one 函數中 把 semaphore 當做上下文管理器使用:
with (yield from semaphore): image = yield from get_flag(cc)
這段代碼保證,任何時候都不會有超過 MAX_CONCUR_REQ 個 get_flag 協程啟動。
使用 asyncio.as_completed 函數因為要使用 yield from 獲取 asyncio.as_completed 函數產出的future的結果,所以 as_completed 函數秩序在協程中調用。由于 download_many 要作為參數傳給非協程的main 函數,我已我們添加了一個新的 downloader_coro 協程,讓download_many 函數只用于設置事件循環。
使用Executor 對象,防止阻塞事件循環現在我們回去看下上邊關于電腦從不同存儲介質讀取數據的延遲情況圖,有一個實時需要注意,那就是訪問本地文件系統也會阻塞。
上邊的代碼中,save_flag 函數阻塞了客戶代碼與 asyncio 事件循環公用的唯一線程,因此保存文件時,整個應用程序都會暫停。為了避免這個問題,可以使用事件循環對象的 run_in_executor 方法。
asyncio 的事件循環在后臺維護著一個ThreadPoolExecutor 對象,我們可以調用 run_in_executor 方法,把可調用的對象發給它執行。
下邊是我們改動后的代碼:
@asyncio.coroutine def download_one(cc, semaphore): try: with (yield from semaphore): image = yield from get_flag(cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = "not found" except Exception as exc: raise FetchError(cc) from exc else: # 這里是改動部分 loop = asyncio.get_event_loop() # 獲取事件循環的引用 loop.run_in_executor(None, save_flag, image, cc.lower() + ".gif") status = HTTPStatus.ok msg = "ok" return Result(status, cc)
run_in_executor 方法的第一個參數是Executor 實例;如果設為None,使用事件循環的默認 ThreadPoolExecutor 實例。
從回調到future到協程在接觸協程之前,我們可能對回調有一定的認識,那么和回調相比,協程有什么改進呢?
python中的回調代碼樣式:
def stage1(response1): request2 = step1(response1) api_call2(request2, stage2) def stage2(response2): request3 = step3(response3) api_call3(request3, stage3) def stage3(response3): step3(response3) api_call1(request1, stage1)
上邊的代碼的缺陷:
容易出現回調地獄
代碼難以閱讀
在這個問題上,協程能發揮很大的作用。如果換成協程和yield from 結果做的異步代碼,代碼示例如下:
@asyncio.coroutine def three_stages(request1): response1 = yield from api_call1(request1) request2 = step1(response1) response2 = yield from api_call2(requests) request3 = step2(response2) response3 = yield from api_call3(requests) step3(response3) loop.create_task(three_stages(request1)
和之前的代碼相比,這個代碼就容易理解多了。如果異步調用 api_call1,api_call2,api_call3 會拋出異常,那么可以把相應的 yield from 表達式放在 try/except 塊中處理異常。
使用協程必須習慣 yield from 表達式,并且協程不能直接調用,必須顯式的排定協程的執行時間,或在其他排定了執行時間的協程中使用yield from 表達式吧它激活。如果不使用 loop.create_task(three_stages(request1)),那么什么都不會發生。
下面我們用一個實際的例子來演示一下:
每次下載發起多次請求我們修改一下上邊下載國旗的代碼,使在下載國旗的同時還可以獲取國家名稱在保存圖片的時候使用。
我們使用協程和yield from 解決這個問題:
@asyncio.coroutine def http_get(url): resp = yield from aiohttp.request("GET", url) if resp.status == 200: ctype = resp.headers.get("Content-type", "").lower() if "json" in ctype or url.endswith("json"): data = yield from resp.json() else: data = yield from resp.read() return data elif resp.status == 404: raise web.HttpNotFound() else: raise aiohttp.HttpProcessionError( code=resp.status, message=resp.reason, headers=resp.headers) @asyncio.coroutine def get_country(cc): url = "{}/{cc}/metadata.json".format(BASE_URL, cc=cc.lower()) metadata = yield from http_get(url) return metadata["country"] @asyncio.coroutine def get_flag(cc): url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) return (yield from http_get(url)) @asyncio.coroutine def download_one(cc, semaphore): try: with (yield from semaphore): image = yield from get_flag(cc) with (yield from semaphore): country = yield from get_country(cc) except web.HTTPNotFound: status = HTTPStatus.not_found msg = "not found" except Exception as exc: raise FetchError(cc) from exc else: country = country.replace(" ", "_") filename = "{}--{}.gif".format(country, cc) print(filename) loop = asyncio.get_event_loop() loop.run_in_executor(None, save_flag, image, filename) status = HTTPStatus.ok msg = "ok" return Result(status, cc)
在這段代碼中,我們在download_one 函數中分別在 semaphore 控制的兩個with 塊中調用get_flag 和 get_country,是為了節約時間。
get_flag 的return 語句在外層加上括號,是因為() 的運算符優先級高,會先執行括號內的yield from 語句 返回的結果。如果不加 會報句法錯誤
加() ,相當于
image = yield from http_get(url) return image
如果不加(),那么程序會在 yield from 處中斷,交出控制權,這時使用return 會報句法錯誤。
總結這一篇我們討論了:
對比了一個多線程程序和asyncio版,說明了多線程和異步任務之間的關系
比較了 asyncio.Future 類 和 concurrent.futures.Future 類的區別
如何使用異步編程管理網絡應用中的高并發
在異步編程中,與回調相比,協程顯著提升性能的方式
下一篇,我們將介紹如何使用asyncio包編寫服務器
參考鏈接class asyncio.Semaphore
asyncio — Asynchronous I/O, event loop, coroutines and tasks
【譯】 Python 3.5 協程究竟是個啥
PEP 0492 Coroutines with async and await syntax
Python 之 asyncio
我所不能理解的Python中的Asyncio模塊
最后,感謝女朋友支持
>歡迎關注 | >請我喝芬達 |
---|---|
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38646.html
摘要:并發用于制定方案,用來解決可能但未必并行的問題。在協程中使用需要注意兩點使用鏈接的多個協程最終必須由不是協程的調用方驅動,調用方顯式或隱式在最外層委派生成器上調用函數或方法。對象可以取消取消后會在協程當前暫停的處拋出異常。 導語:本文章記錄了本人在學習Python基礎之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學習并交流。 本文重點: 1、了解asyncio...
摘要:使用進行并發編程篇三掘金這是使用進行并發編程系列的最后一篇。所以我考慮啟用一個本地使用進行并發編程篇二掘金我們今天繼續深入學習。 使用 Python 進行并發編程 - asyncio 篇 (三) - 掘金 這是「使用Python進行并發編程」系列的最后一篇。我特意地把它安排在了16年最后一天。 重新實驗上篇的效率對比的實現 在第一篇我們曾經對比并發執行的效率,但是請求的是httpb...
摘要:上一篇我們介紹了包,以及如何使用異步編程管理網絡應用中的高并發。倒排索引保存在本地一個名為的文件中。運行示例如下這個模塊沒有使用并發,主要作用是為使用包編寫的服務器提供支持。 asyncio 上一篇我們介紹了 asyncio 包,以及如何使用異步編程管理網絡應用中的高并發。在這一篇,我們主要介紹使用 asyncio 包編程的兩個例子。 async/await語法 我們先介紹下 asyn...
摘要:具有以下基本同步原語子進程提供了通過創建和管理子進程的。雖然隊列不是線程安全的,但它們被設計為專門用于代碼。表示異步操作的最終結果。 Python的asyncio是使用 async/await 語法編寫并發代碼的標準庫。通過上一節的講解,我們了解了它不斷變化的發展歷史。到了Python最新穩定版 3.7 這個版本,asyncio又做了比較大的調整,把這個庫的API分為了 高層級API和...
摘要:我們以請求網絡服務為例,來實際測試一下加入多線程之后的效果。所以,執行密集型操作時,多線程是有用的,對于密集型操作,則每次只能使用一個線程。說到這里,對于密集型,可以使用多線程或者多進程來提高效率。 為了提高系統密集型運算的效率,我們常常會使用到多個進程或者是多個線程,python中的Threading包實現了線程,multiprocessing 包則實現了多進程。而在3.2版本的py...
閱讀 1166·2021-11-11 16:55
閱讀 3052·2021-08-16 11:00
閱讀 2895·2019-08-30 15:56
閱讀 3435·2019-08-30 11:24
閱讀 3416·2019-08-30 11:05
閱讀 3531·2019-08-29 15:15
閱讀 2615·2019-08-26 13:57
閱讀 2554·2019-08-23 18:17