摘要:當被調用時,表示已經斷開連接。第三版去掉第三版的目的是去掉。協程保持不變,但是已被剔除不再需要請求發送之后,繼續異步等待數據的接收,即。的作用是結束那個導致等待的,這樣也就可以結束了結束,以便結束。
關于 Asyncio 的其他文章:
Python 的異步 IO:Asyncio 簡介
Python 的異步 IO:Aiohttp Client 代碼分析
如果不知道 Asyncio 是什么,先看「Asyncio 簡介」那一篇。
一個簡單的 HTTP Server首先,為了便于測試,我們用 Python 內建的 http 模塊,運行一個簡單的 HTTP Server。
新建一個目錄,添加文件 index.html,內容為 Hello, World!(不是合法的 HTML 格式也沒有關系),然后運行如下命令(Ubuntu 請用 python3):
$ python -m http.server Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...
后面不同的 Client 實現,都會連接這個 Server:Host 為 localhost,Port 為 8000。
所有的示例代碼,import 語句一律從略。
import asyncio第一版
第一版改寫自 Python 官方文檔里的 例子。
Python 的例子是 Echo Client,我們稍微復雜一點,是 HTTP Client,都是 TCP。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop def connection_made(self, transport): request = "GET / HTTP/1.1 Host: localhost " transport.write(request.encode()) def data_received(self, data): print(data.decode()) def connection_lost(self, exc): self.loop.stop() async def main(loop): await loop.create_connection( lambda: ClientProtocol(loop), "localhost", 8000) loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.run_forever()
TCP 連接由 loop.create_connection() 創建,后者需要一個 Protocol 工廠,即 lambda: ClientProtocol(loop)。
Protocol 提供了 connection_made(),data_received(), connection_lost() 等接口,這些接口就像回調函數一樣,會在恰當的時候被調用。
我們在 connection_made() 中,通過參數 transport 發送一個 HTTP GET 請求,隨后在 data_received() 里,將收到 HTTP 應答。
當 connection_lost() 被調用時,表示 Server 已經斷開連接。
運行結果:
HTTP/1.0 200 OK Server: SimpleHTTP/0.6 Python/3.6.3 Date: Mon, 04 Dec 2017 06:11:52 GMT Content-type: text/html Content-Length: 13 Last-Modified: Thu, 30 Nov 2017 05:37:31 GMT Hello, World!
這就是一個標準的 HTTP 應答,包含 Status Line,Headers 和 Body。
值得注意的是,loop 其實運行了兩遍:
loop.run_until_complete(main(loop)) # 第一遍 loop.run_forever() # 第二遍
如果沒有 run_forever(),在收到數據之前,loop 可能就結束了。協程 main() 只是創建好連接,隨后 run_until_complete() 自然也就無事可做而終。
加了 run_forever() 后,data_received() 等便有了被調用的機會。但是也有問題,loop 一直在跑,程序沒辦法結束,所以才在 connection_lost() 里主動停止 loop:
def connection_lost(self, exc): self.loop.stop()第二版:ClientSession
第一版在 connection_made() 中 hard code 了一個 HTTP GET 請求,靈活性較差,以后必然還有 POST 等其他 HTTP 方法需要支持,所以有必要新增一個 ClientSession 類,來抽象客戶端的會話。于是,HTTP 請求的發送,便從 connection_made() 挪到了 ClientSession.get()。
ClientSession 應該為每一個 HTTP 方法提供一個相應的方法,比如 post,put 等等,雖然我們只考慮 HTTP GET。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop self.transport = None def connection_made(self, transport): self.transport = transport def data_received(self, data): print(data.decode()) def connection_lost(self, exc): self.loop.stop() class ClientSession: def __init__(self, loop): self._loop = loop async def get(self, url, host, port): transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop), host, port) request = "GET {} HTTP/1.1 Host: {} ".format(url, host) transport.write(request.encode())
首先,ClientProtocol 新增了一個屬性 transport,是在 connection_made() 時保存下來的,這樣在 ClientSession 里才能通過它來發送請求。
第三版:去掉 run_forever()第三版的目的是:去掉 run_forever() 。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop): self.loop = loop self.transport = None self._eof = False # 有沒有收到 EOF self._waiter = None # 用來等待接收數據的 future def connection_made(self, transport): self.transport = transport def data_received(self, data): print(data.decode()) def eof_received(self): self._eof = True self._wakeup_waiter() def connection_lost(self, exc): pass # 不再調用 self.loop.stop() async def wait_for_data(self): assert not self._eof assert not self._waiter self._waiter = self.loop.create_future() await self._waiter self._waiter = None def _wakeup_waiter(self): waiter = self._waiter if waiter: self._waiter = None waiter.set_result(None) class ClientSession: def __init__(self, loop): self._loop = loop async def get(self, url, host, port): transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop), host, port) request = "GET {} HTTP/1.1 Host: {} ".format(url, host) transport.write(request.encode()) # 等待接收數據。 await protocol.wait_for_data()
協程 main() 保持不變,但是 loop.run_forever() 已被剔除:
loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) # 不再需要 loop.run_forever()
HTTP 請求發送之后,繼續異步等待(await)數據的接收,即 protocol.wait_for_data()。
這個等待動作,是通過往 loop 里新增一個 future 來實現的:
async def wait_for_data(self): # ... self._waiter = self.loop.create_future() await self._waiter self._waiter = None
self._waiter 就是這個導致等待的 future,它會保證 loop 一直運行,直到數據接收完畢。
eof_received() 被調用時,數據就接收完畢了(EOF 的意思不用多說了吧?)。
def eof_received(self): self._eof = True self._wakeup_waiter()
_wakeup_waiter() 的作用是結束那個導致等待的 future,這樣 loop 也就可以結束了:
def _wakeup_waiter(self): waiter = self._waiter if waiter: self._waiter = None # 結束 waiter future,以便 loop 結束。 waiter.set_result(None)第四版:Reader
在 data_received() 里直接輸出 HTTP 的應答結果,實在算不上什么完美的做法。
def data_received(self, data): print(data.decode())
為了解決這一問題,我們引入一個 Reader 類,用來緩存收到的數據,并提供「讀」的接口給用戶。
首先,Protocol 被簡化了,前一版引入的各種處理,都轉交給了 Reader。
class ClientProtocol(asyncio.Protocol): def __init__(self, loop, reader): self.loop = loop self.transport = None self._reader = reader def connection_made(self, transport): self.transport = transport def data_received(self, data): self._reader.feed(data) # 轉交給 Reader def eof_received(self): self._reader.feed_eof() # 轉交給 Reader def connection_lost(self, exc): pass
下面是 ClientSession.get() 基于 Reader 的實現:
class ClientSession: async def get(self, url, host, port): reader = Reader(self._loop) transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop, reader), host, port) # 發送請求,代碼從略... data = await reader.read() print(data.decode())
Reader 本身是從上一版的 Protocol 抽取出來的,唯一不同的是,接收的數據被臨時放在了一個 bytearray 緩存里。
class Reader: def __init__(self, loop): self._loop = loop self._buffer = bytearray() # 緩存 self._eof = False self._waiter = None def feed(self, data): self._buffer.extend(data) self._wakeup_waiter() def feed_eof(self): self._eof = True self._wakeup_waiter() async def read(self): if not self._buffer and not self._eof: await self._wait_for_data() data = bytes(self._buffer) del self._buffer[:] return data async def _wait_for_data(self): assert not self._eof assert not self._waiter self._waiter = self._loop.create_future() await self._waiter self._waiter = None def _wakeup_waiter(self): waiter = self._waiter if waiter: self._waiter = None waiter.set_result(None)
稍微解釋一下 read(),比較重要的是開始的一句判斷:
# 如果緩存為空,并且 EOF 還沒收到,那就(繼續)等待接收數據。 if not self._buffer and not self._eof: # read() 會停在這個地方,直到 feed() 或 feed_eof() 被調用, # 也就是說有數據可讀了。 await self._wait_for_data()
接下來就是把緩存倒空:
data = bytes(self._buffer) del self._buffer[:]
運行一下,不難發現,ClientSession.get() 里讀數據的那一句是有問題的。
data = await reader.read()
收到的 data 并不是完整的 HTTP 應答,可能只包含了 HTTP 的 Headers,而沒有 Body。
一個 HTTP 應答,Server 端可能分多次發送過來。比如這個測試用的 Hello World Server,Headers 和 Body 就分了兩次發送,也就是說 data_received() 會被調用兩次。
之前我們在 eof_received() 里才喚醒 waiter(_wakeup_waiter()),現在在 data_received() 里就喚醒了,于是第一次數據收完, waiter 就結束了,loop 也便跟著結束。
為了讀到完整的 HTTP 應答,方法也很簡單,把 read() 放在循環里:
blocks = [] while True: block = await reader.read() if not block: break blocks.append(block) data = b"".join(blocks) print(data.decode())
每一次 read(),如果緩存為空,并且 EOF 還沒收到的話,就會再次創建 waiter,放到 loop 里,繼續等待接收數據。
這個循環顯然應該交給 Reader 處理,對 ClientSession 需保持透明。
class Reader: async def read(self): blocks = [] while True: block = await self._read() if not block: break blocks.append(block) data = b"".join(blocks) return data async def _read(self): if not self._buffer and not self._eof: await self._wait_for_data() data = bytes(self._buffer) del self._buffer[:] return data
最后,原來的 read() 重命名為 _read(),新的 read() 在循環中反復調用 _read(),直到無數據可讀。ClientSession 這邊直接調用新的 read() 即可。
第五版:Writer到目前為止,發送 HTTP 請求時,都是直接調用較為底層的 transport.write():
async def get(self, url, host, port): # ... transport.write(request.encode())
可以把它封裝在 Writer 中,與 Reader 的做法類似,但是 Writer 要簡單得多:
class Writer: def __init__(self, transport): self._transport = transport def write(self, data): self._transport.write(data)
然后在 ClientSession.get() 中創建 Writer:
async def get(self, url, host, port): reader = Reader(self._loop) transport, protocol = await self._loop.create_connection( lambda: ClientProtocol(loop, reader), host, port) writer = Writer(transport) request = "GET {} HTTP/1.1 Host: {} ".format(url, host) writer.write(request.encode()) # ...
對 ClientSession 來說,只需知道 Reader 和 Writer 就足夠了,所以不妨提供一個函數 open_connection(),直接返回 Reader 和 Writer。
async def open_connection(host, port, loop): reader = Reader(loop) protocol = ClientProtocol(loop, reader) transport, _ = await loop.create_connection(lambda: protocol, host, port) writer = Writer(transport) return reader, writer
然后 ClientSession 就可以簡化成這樣:
class ClientSession: async def get(self, url, host, port): reader, writer = await open_connection(host, port, self._loop) # ...第六版:Asyncio Streams
其實 Asyncio 已經提供了 Reader 和 Writer,詳見 官方文檔。
下面以 Asyncio Streams 實現 ClientSession.get():
class ClientSession: async def get(self, url, host, port): reader, writer = await asyncio.open_connection( host, port, loop=self._loop) request = "GET {} HTTP/1.1 Host: {} ".format(url, host) writer.write(request.encode()) data = await reader.read(-1) print(data.decode()) writer.close()
asyncio.open_connection() 就相當于我們的 open_connection()。Reader 和 Writer 也都類似,只是復雜了一些。
全文完
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/44528.html
摘要:具有以下基本同步原語子進程提供了通過創建和管理子進程的。雖然隊列不是線程安全的,但它們被設計為專門用于代碼。表示異步操作的最終結果。 Python的asyncio是使用 async/await 語法編寫并發代碼的標準庫。通過上一節的講解,我們了解了它不斷變化的發展歷史。到了Python最新穩定版 3.7 這個版本,asyncio又做了比較大的調整,把這個庫的API分為了 高層級API和...
摘要:的異步代碼分析是的一個框架,基于,所以叫。不可避免的,可讀性會比較差。想找教程的話,請移步官方教程,寫得還是挺不錯的。建議不要直接使用,而只把它當成的一個樣例。 Python 的異步 IO:Aiohttp Client 代碼分析 Aiohttp 是 Python 的一個 HTTP 框架,基于 asyncio,所以叫 Aiohttp。 我主要是看源碼,想理解它的設計,所以附上了類圖與時序...
摘要:并發的方式有多種,多線程,多進程,異步等。多線程和多進程之間的場景切換和通訊代價很高,不適合密集型的場景關于多線程和多進程的特點已經超出本文討論的范疇,有興趣的同學可以自行搜索深入理解。 編程中,我們經常會遇到并發這個概念,目的是讓軟件能充分利用硬件資源,提高性能。并發的方式有多種,多線程,多進程,異步IO等。多線程和多進程更多應用于CPU密集型的場景,比如科學計算的時間都耗費在CPU...
摘要:創建第一個協程推薦使用語法來聲明協程,來編寫異步應用程序。協程兩個緊密相關的概念是協程函數通過定義的函數協程對象調用協程函數返回的對象。它是一個低層級的可等待對象,表示一個異步操作的最終結果。 我們講以Python 3.7 上的asyncio為例講解如何使用Python的異步IO。 showImg(https://segmentfault.com/img/remote/14600000...
摘要:實例實例測試結果增加路由實例測試結果提供了一個方法,根據處理程序方法名生成。異常拋出異常要拋出異常,只需從異常模塊中提出相應的異常。 typora-copy-images-to: ipic [TOC] 快速開始 在安裝Sanic之前,讓我們一起來看看Python在支持異步的過程中,都經歷了哪些比較重大的更新。 首先是Python3.4版本引入了asyncio,這讓Python有了支...
閱讀 996·2023-04-25 14:45
閱讀 2773·2021-09-30 09:59
閱讀 3114·2021-09-22 15:48
閱讀 2425·2019-08-30 15:55
閱讀 3467·2019-08-30 15:44
閱讀 535·2019-08-29 14:07
閱讀 3412·2019-08-26 13:45
閱讀 536·2019-08-26 11:31