摘要:標準庫中所有阻塞型函數都會釋放,允許其他線程運行。如果調用引發異常,那么當從迭代器檢索其值時,將引發異常??偨Y自版就支持線程了,只不過是使用線程的最新方式。類封裝了模塊的組件,使使用線程變得更加方便。下一篇筆記應該是使用處理并發。
作為Python程序員,平時很少使用并發編程,偶爾使用也只需要派生出一批獨立的線程,然后放到隊列中,批量執行。所以,不夸張的說,雖然我知道線程、進程、并行、并發的概念,但每次使用的時候可能還需要再打開文檔回顧一下。
現在這一篇還是 《流暢的python》讀書筆記,譯者在這里把future 翻譯為“期物”,我覺得不太合適,既然future不能找到一個合適的詞匯,暫時還是直接使用 future 吧。
concurrent.futuresfuture 是一種對象,表示異步執行的操作。這個概念是 concurrent.futures模塊和asyncio包的基礎。
concurrent.futures 模塊是Python3.2 引入的,對于Python2x 版本,Python2.5 以上的版本可以安裝 futures 包來使用這個模塊。
從Python3.4起,標準庫中有兩個為Future的類:concurrent.futures.Future 和 asyncio.Future。這兩個類作用相同:兩個Future類的實例都表示可能已經完成或未完成的延遲計算。
Future 封裝待完成的操作,可放入隊列,完成的狀態可以查詢,得到結果(或拋出異常)后可以獲取結果(或異常)。
我們知道,如果程序中包含I/O操作,程序會有很高的延遲,CPU會處于等待狀態,這時如果我們不使用并發會浪費很多時間。
示例我們先舉個例子:
下邊是有兩段代碼,主要功能都是從網上下載人口前20的國際的國旗:
第一段代碼(flagss.py)是依序下載:下載完一個圖片后保存到硬盤,然后請求下一張圖片;
第二段代碼(flagss_threadpool.py)使用 concurrent.futures 模塊,批量下載10張圖片。
運行分別運行兩段代碼3次,結果如下:
images.py 的結果如下
$ python flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 6.18s $ python flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 5.67s $ python flags.py BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 20 flags downloaded in 6.55s
可以看到,依次下載10張圖片,平均需要6秒
flags_threadpool.py 的結果如下:
$ python flags_threadpool.py NG EG VN BR JP FR DE CN TR BD PK MX PH US RU IN ET CD ID IR 20 flags downloaded in 2.12s $ python flags_threadpool.py BR IN DE FR TR RU EG NG JP CN ID ET PK MX PH US IR CD VN BD 20 flags downloaded in 2.23s $ python flags_threadpool.py CN BR DE ID NG RU TR IN MX US IR BD VN CD PH EG FR JP ET PK 20 flags downloaded in 1.18s
使用 concurrent.futures 后,下載10張圖片平均需要2秒
通過上邊的結果我們發現使用 concurrent.futures 后,下載效率大幅提升。
下邊我們來看下這兩段代碼。
同步執行的代碼flags.py:
#! -*- coding: utf-8 -*- import os import time import sys import requests # <1> POP20_CC = ("CN IN US ID BR PK NG BD RU JP " "MX PH VN ET EG DE IR TR CD FR").split() # <2> BASE_URL = "http://flupy.org/data/flags" # <3> DEST_DIR = "images/" # <4> # 保存圖片 def save_flag(img, filename): # <5> path = os.path.join(DEST_DIR, filename) with open(path, "wb") as fp: fp.write(img) # 下載圖片 def get_flag(cc): # <6> url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) # 這里我們使用 requests 包,需要先通過pypi安裝 resp = requests.get(url) return resp.content # 顯示一個字符串,然后刷新sys.stdout,目的是在一行消息中看到進度 def show(text): # <7> print(text, end=" ") sys.stdout.flush() def download_many(cc_list): # <8> for cc in sorted(cc_list): # <9> image = get_flag(cc) show(cc) save_flag(image, cc.lower() + ".gif") return len(cc_list) def main(download_many): # <10> t0 = time.time() count = download_many(POP20_CC) elapsed = time.time() - t0 msg = " {} flags downloaded in {:.2f}s" print(msg.format(count, elapsed)) if __name__ == "__main__": main(download_many) # <11>
使用 concurrent.future 并發的代碼 flags_threadpool.py
#! -*- coding: utf-8 -*- from concurrent import futures from flags import save_flag, get_flag, show, main # 設定ThreadPoolExecutor 類最多使用幾個線程 MAX_WORKERS = 20 # 下載一個圖片 def download_one(cc): image = get_flag(cc) show(cc) save_flag(image, cc.lower() + ".gif") return cc def download_many(cc_list): # 設定工作的線程數量,使用約需的最大值與要處理的數量直接較小的那個值,以免創建多余的線程 workers = min(MAX_WORKERS, len(cc_list)) # <4> # 使用工作的線程數實例化ThreadPoolExecutor類; # executor.__exit__方法會調用executor.shutdown(wait=True)方法, # 它會在所有線程都執行完畢前阻塞線程 with futures.ThreadPoolExecutor(workers) as executor: # <5> # map 與內置map方法類似,不過download_one 函數會在多個線程中并發調用; # map 方法返回一個生成器,因此可以迭代, # 迭代器的__next__方法調用各個Future 的 result 方法 res = executor.map(download_one, sorted(cc_list)) # 返回獲取的結果數量;如果有現成拋出異常,會在這里拋出 # 這與隱式調用next() 函數從迭代器中獲取相應的返回值一樣。 return len(list(res)) # <7> return len(results) if __name__ == "__main__": main(download_many)
上邊的代碼,我們對 concurrent.futures 的使用有了大致的了解。但 future 在哪里呢,我們并沒有看到。
Future 是 concurrent.futures 模塊和 asyncio 包的重要組件。從Python3.4起,標準庫中有兩個為Future的類:concurrent.futures.Future 和 asyncio.Future。這兩個Future作用相同。
Future 封裝待完成的操作,可放入隊列,完成的狀態可以查詢,得到結果(或拋出異常)后可以獲取結果(或異常)。
Future 表示終將發生的事情,而確定某件事情會發生的唯一方式是執行的時間已經排定。因此只有把某件事交給 concurrent.futures.Executor 子類處理時,才會創建 concurrent.futures.Future 實例。
例如,調用Executor.submit() 方法的參數是一個可調用的對象,調用這個方法后會為傳入的可調用對象排期,并返回一個Future。
Future 有三個重要的方法:
.done() 返回布爾值,表示Future 是否已經執行
.add_done_callback() 這個方法只有一個參數,類型是可調用對象,Future運行結束后會回調這個對象。
.result() 如果 Future 運行結束后調用result(), 會返回可調用對象的結果或者拋出執行可調用對象時拋出的異常,如果是 Future 沒有運行結束時調用 f.result()方法,這時會阻塞調用方所在的線程,直到有結果返回。此時result 方法還可以接收 timeout 參數,如果在指定的時間內 Future 沒有運行完畢,會拋出 TimeoutError 異常。
asyncio.Future.result 方法不支持設定超時時間,如果想獲取 Future 的結果,可以使用 yield from 結構
為了加深對 Future 的理解,現在我們修改下 flags_threadpool.py download_many 函數。
def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=3) as executor: to_do = [] # 用于創建并排定 future for cc in sorted(cc_list): # submit 方法排定可調用對象的執行時間然后返回一個future,表示這個待執行的操作 future = executor.submit(download_one, cc) to_do.append(future) msg = "Scheduled for {}: {}" print(msg.format(cc, future)) results = [] # 用于獲取future 結果 # as_completed 接收一個future 列表,返回值是一個迭代器,在運行結束后產出future for future in futures.as_completed(to_do): res = future.result() msg = "{} result: {!r}" print(msg.format(future, res)) results.append(res) return len(results)
現在執行代碼,運行結果如下:
Scheduled for BR:Scheduled for CN: Scheduled for ID: Scheduled for IN: Scheduled for US: BR result: "BR" IN result: "IN" CN result: "CN" ID result: "ID" US result: "US" 5 flags downloaded in 1.47s
從結果可以看到,future 的 repr() 方法會顯示狀態,前三個 是running 是因為我們設定了三個進程,所以后兩個是pendding 狀態。如果將max_workers參數設置為5,結果就會全都是 running。
雖然,使用 future 的腳步比第一個腳本的執行速度快了很多,但由于受GIL的限制,下載并不是并行的。
GIL(Global Interpreter Lock)和阻塞型I/OCPython 解釋器本身不是線程安全的,因此解釋器被一個全局解釋器鎖保護著,它確保任何時候都只有一個Python線程執行。
然而,Python標準庫中所有執行阻塞型I/O操作的函數,在等待系統返回結果時都會釋放GIL。這意味著I/O密集型Python程序能從中受益:一個Python線程等待網絡響應時,阻塞型I/O函數會釋放GIL,再運行一個線程。
Python 標準庫中所有阻塞型I/O函數都會釋放GIL,允許其他線程運行。time.sleep()函數也會釋放GIL。
那么如何在CPU密集型作業中使用 concurrent.futures 模塊繞開GIL呢?
答案是 使用 ProcessPoolExecutor 類。
使用這個模塊可以在做CPU密集型工作是繞開GIL,利用所有可用核心。
ThreadPoolExecutor 和 ProcessPoolExecutor 都實現了通用的 Executor 接口,所以,我們可以輕松的將基于線程的方案改為使用進程的方案。
比如下邊這樣:
def download_many(cc_list): workers = min(MAX_WORKERS, len(cc_list)) with futures.ThreadPoolExecutor(workers) as executor: pass # 改成 def download_many(cc_list): with futures.ProcessPoolExecutor() as executor: pass
需要注意的是,ThreadPoolExecutor 需要指定 max_workers 參數,
而 ProcessPoolExecutor 的這個參數是可選的默認值是 os.cup_count()(計算機cpu核心數)。
ProcessPoolExecutor 的價值主要體現在CPU密集型作業上。
使用Python處理CPU密集型工作,應該試試PyPy,會有更高的執行速度。
現在我們回到開始的代碼,看下 Executor.map 函數。
文檔中對map函數的介紹如下。
map(func, *iterables, timeout=None, chunksize=1)
等同于 map(func, *iterables),不同的是 func 是異步執行的,并且可以同時進行對 func 的多個調用。如果調用 __next__(),則返回的迭代器提出 concurrent.futures.TimeoutError,并且在從 Executor.map() 的原始調用起的 timeout 秒之后結果不可用。 timeout 可以是int或float。如果未指定 timeout 或 None,則等待時間沒有限制。如果調用引發異常,那么當從迭代器檢索其值時,將引發異常。當使用 ProcessPoolExecutor 時,此方法將 iterables 分成多個塊,它作為多帶帶的任務提交到進程池。這些塊的(近似)大小可以通過將 chunksize 設置為正整數來指定。對于非常長的迭代,與默認大小1相比,使用大值 chunksize 可以顯著提高性能。使用 ThreadPoolExecutor,chunksize 沒有效果。
在 3.5 版更改: 添加了 chunksize 參數。
Executor.map 還有個特性比較有用,那就是這個函數返回結果的順序于調用開始的順序是一致的。如果第一個調用稱其結果用時10秒,其他調用只用1秒,代碼會阻塞10秒,獲取map方法返回的生成器產出的第一個結果。
如果不是獲取到所有結果再處理,通常會使用 Executor.submit + Executor.as_completed 組合使用的方案。
Executor.submit + Executor.as_completed 這個組合更靈活,因為submit方法能處理不同的可調用對象和參數,而executor.map 只能處理參數不同的同一個可調用對象。此外,傳給futures.as_completed 函數的期物集合可以來自不同的 Executor 實例。
future 的異常處理futures 有三個異常類:
exception concurrent.futures.CancelledError 在future取消時引發。
exception concurrent.futures.TimeoutError 在future操作超過給定超時時觸發。
exception concurrent.futures.process.BrokenProcessPool
從 RuntimeError 派生,當 ProcessPoolExecutor 的一個工人以非干凈方式終止(例如,如果它從外部被殺死)時,引發此異常類。
我們先看一下,future.result() 出現異常的處理情況。代碼改動如下:
# 將第一個 CN 改為CN1 也可以是其它任意錯誤代碼 POP20_CC = ("CN1 IN US ID BR PK NG BD RU JP " "MX PH VN ET EG DE IR TR CD FR").split() def get_flag(cc): # <6> url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower()) resp = requests.get(url) if resp.status_code != 200: # <1> resp.raise_for_status() # 如果不是200 拋出異常 return resp.content def download_one(cc): try: image = get_flag(cc) # 捕獲 requests.exceptions.HTTPError except requests.exceptions.HTTPError as exc: # # 如果有異常 直接拋出 raise else: save_flag(image, cc.lower() + ".gif") return cc
現在執行代碼,會發現 download_one 中的異常傳遞到了download_many 中,并且導致拋出了異常,未執行完的其它future 也都中斷。
為了能保證其它沒有錯誤的future 可以正常執行,這里我們需要對future.result() 做異常處理。
改動結果如下:
def download_many(cc_list): cc_list = cc_list[:5] with futures.ThreadPoolExecutor(max_workers=20) as executor: to_do_map = {} for cc in sorted(cc_list): future = executor.submit(download_one, cc) to_do_map[future] = cc msg = "Scheduled for {}: {}" print(msg.format(cc, future)) results = [] for future in futures.as_completed(to_do_map): try: res = future.result() except requests.exceptions.HTTPError as exc: # 處理可能出現的異常 error_msg = "{} result {}".format(cc, exc) else: error_msg = "" if error_msg: cc = to_do_map[future] # <16> print("*** Error for {}: {}".format(cc, error_msg)) else: msg = "{} result: {!r}" print(msg.format(future, res)) results.append(res) return len(results)
這里我們用到了一個對 futures.as_completed 函數特別有用的慣用法:構建一個字典,把各個future映射到其他數據(future運行結束后可能用的)上。這樣,雖然 future生成的順序雖然已經亂了,依然便于使用結果做后續處理。
一篇寫完了沒有總結總感覺少點什么,所以。
總結Python 自 0.9.8 版就支持線程了,concurrent.futures 只不過是使用線程的最新方式。
futures.ThreadPoolExecutor 類封裝了 threading 模塊的組件,使使用線程變得更加方便。
順便再推薦一下 《流暢的python》,絕對值得一下。
下一篇筆記應該是使用 asyncio 處理并發。
最后,感謝女朋友支持。
>歡迎關注 | >請我喝芬達 |
---|---|
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/38632.html
摘要:本文重點掌握異步編程的相關概念了解期物的概念意義和使用方法了解中的阻塞型函數釋放的特點。一異步編程相關概念阻塞程序未得到所需計算資源時被掛起的狀態。 導語:本文章記錄了本人在學習Python基礎之控制流程篇的重點知識及個人心得,打算入門Python的朋友們可以來一起學習并交流。 本文重點: 1、掌握異步編程的相關概念;2、了解期物future的概念、意義和使用方法;3、了解Python...
摘要:在中由于歷史原因使得中多線程的效果非常不理想使得任何時刻只能利用一個核并且它的調度算法簡單粗暴多線程中讓每個線程運行一段時間然后強行掛起該線程繼而去運行其他線程如此周而復始直到所有線程結束這使得無法有效利用計算機系統中的局部性頻繁的線程切換 GIL 在Python中,由于歷史原因(GIL),使得Python中多線程的效果非常不理想.GIL使得任何時刻Python只能利用一個CPU核,...
摘要:是之后引入的標準庫的,這個包使用事件循環驅動的協程實現并發。沒有能從外部終止線程,因為線程隨時可能被中斷。上一篇并發使用處理并發我們介紹過的,在中,只是調度執行某物的結果。 asyncio asyncio 是Python3.4 之后引入的標準庫的,這個包使用事件循環驅動的協程實現并發。asyncio 包在引入標準庫之前代號 Tulip(郁金香),所以在網上搜索資料時,會經??吹竭@種花的...
摘要:和類是高級類,大部分情況下只要學會使用即可,無需關注其實現細節。類與類十分相似,只不過一個是處理進程,一個是處理線程,可根據實際需要選擇。示例運行結果不同機器運行結果可能不同。 concurrent.futures模塊 該模塊主要特色在于ThreadPoolExecutor 和 ProcessPoolExecutor 類,這兩個類都繼承自concurrent.futures._base...
摘要:具有以下基本同步原語子進程提供了通過創建和管理子進程的。雖然隊列不是線程安全的,但它們被設計為專門用于代碼。表示異步操作的最終結果。 Python的asyncio是使用 async/await 語法編寫并發代碼的標準庫。通過上一節的講解,我們了解了它不斷變化的發展歷史。到了Python最新穩定版 3.7 這個版本,asyncio又做了比較大的調整,把這個庫的API分為了 高層級API和...
閱讀 2106·2021-11-05 09:42
閱讀 2851·2021-09-23 11:21
閱讀 2841·2019-08-30 14:00
閱讀 3314·2019-08-30 13:15
閱讀 465·2019-08-29 17:18
閱讀 3547·2019-08-29 16:29
閱讀 2749·2019-08-29 14:06
閱讀 2794·2019-08-23 14:41