摘要:當一個客戶端連接時,它將返回一個新的對象,對象中有表示當前連接的和一個由主機端口號組成的連接的元組,更多關于元組值的內容可以查看地址族一節中的詳情這里必須要明白我們通過調用方法擁有了一個新的對象。
博客原文: https://keelii.com/2018/09/24/socket-programming-in-python/
說明本書翻譯自 realpython 網站上的文章教程 Socket Programming in Python (Guide),由于原文比較長,所以整理成了 Gitbook 方便閱讀
原作者Nathan Jennings 是 Real Python 教程團隊的一員,他在很早之前就使用 C 語言開始了自己的編程生涯,但是最終發現了 Python,從 Web 應用和網絡數據收集到網絡安全,他喜歡任何 Pythonic 的東西譯者注
—— realpython
譯者 是一名前端工程師,平常會寫很多的 JavaScript。但是當我使用 JavaScript 很長一段時間后,會對一些 語言無關 的編程概念感興趣,比如:網絡/socket 編程、異步/并發、線/進程通信等。然而恰好這些內容在 JavasScript 領域很少見
因為一直從事 Web 開發,所以我認為理解了網絡通信及其 socket 編程就理解了 Web 開發的某些本質。過程中我發現 Python 社區有很多我喜歡的內容,并且很多都是高質量的公開發布且開源的。
最近我發現了這篇文章,系統地從底層網絡通信講到了應用層協議及其 C/S 架構的應用程序,由淺入深。雖然代碼、API 使用了 Python,但是底層原因都是相通的。非常值得一讀,推薦給大家
另外,由于本人水平所限,翻譯的內容難免出現偏差,如果你在閱讀的過程中發現問題,請毫不猶豫的提醒我或者開新 PR。或者有什么不理解的地方也可以開 issue 討論
授權本文(翻譯版)通過了 realpython 官方授權,原文版權歸其所有,任何轉載請聯系他們
開始網絡中的 Socket 和 Socket API 是用來跨網絡的消息傳送的,它提供了 進程間通信(IPC) 的一種形式。網絡可以是邏輯的、本地的電腦網絡,或者是可以物理連接到外網的網絡,并且可以連接到其它網絡。英特網就是一個明顯的例子,就是那個你通過 ISP 連接到的網絡
本篇教程有三個不同的迭代階段,來展示如何使用 Python 構建一個 Socket 服務器和客戶端
我們將以一個簡單的 Socket 服務器和客戶端程序來開始本教程
當你看完 API 了解例子是怎么運行起來以后,我們將會看到一個具有同時處理多個連接能力的例子的改進版
最后,我們將會開發出一個更加完善且具有完整的自定義頭信息和內容的 Socket 應用
教程結束后,你將學會如何使用 Python 中的 socket 模塊 來寫一個自己的客戶端/服務器應用。以及向你展示如何在你的應用中使用自定義類在不同的端之間發送消息和數據
所有的例子程序都使用 Python 3.6 編寫,你可以在 Github 上找到 源代碼
網絡和 Socket 是個很大的話題。網上已經有了關于它們的字面解釋,如果你還不是很了解 Socket 和網絡。當你你讀到那些解釋的時候會感到不知所措,這是非常正常的。因為我也是這樣過來的
盡管如此也不要氣餒。 我已經為你寫了這個教程。 就像學習 Python 一樣,我們可以一次學習一點。用你的瀏覽器保存本頁面到書簽,以便你學習下一部分時能找到
讓我們開始吧!
背景Socket 有一段很長的歷史,最初是在 1971 年被用于 ARPANET,隨后就成了 1983 年發布的 Berkeley Software Distribution (BSD) 操作系統的 API,并且被命名為 Berkeleysocket
當互聯網在 20 世紀 90 年代隨萬維網興起時,網絡編程也火了起來。Web 服務和瀏覽器并不是唯一使用新的連接網絡和 Socket 的應用程序。各種類型不同規模的客戶端/服務器應用都廣泛地使用著它們
時至今日,盡管 Socket API 使用的底層協議已經進化了很多年,也出現了許多新的協議,但是底層的 API 仍然保持不變
Socket 應用最常見的類型就是 客戶端/服務器 應用,服務器用來等待客戶端的鏈接。我們教程中涉及到的就是這類應用。更明確地說,我們將看到用于 InternetSocket 的 Socket API,有時稱為 Berkeley 或 BSD Socket。當然也有 Unix domain sockets —— 一種用于 同一主機 進程間的通信
Socket API 概覽Python 的 socket 模塊提供了使用 Berkeley sockets API 的接口。這將會在我們這個教程里使用和討論到
主要的用到的 Socket API 函數和方法有下面這些:
socket()
bind()
listen()
accept()
connect()
connect_ex()
send()
recv()
close()
Python 提供了和 C 語言一致且方便的 API。我們將在下面一節中用到它們
作為標準庫的一部分,Python 也有一些類可以讓我們方便的調用這些底層 Socket 函數。盡管這個教程中并沒有涉及這部分內容,你也可以通過socketserver 模塊 中找到文檔。當然還有很多實現了高層網絡協議(比如:HTTP, SMTP)的的模塊,可以在下面的鏈接中查到 Internet Protocols and Support
TCP Sockets就如你馬上要看到的,我們將使用 socket.socket() 創建一個類型為 socket.SOCK_STREAM 的 socket 對象,默認將使用 Transmission Control Protocol(TCP) 協議,這基本上就是你想使用的默認值
為什么應該使用 TCP 協議?
可靠的:網絡傳輸中丟失的數據包會被檢測到并重新發送
有序傳送:數據按發送者寫入的順序被讀取
相反,使用 socket.SOCK_DGRAM 創建的 用戶數據報協議(UDP) Socket 是 不可靠 的,而且數據的讀取寫發送可以是 無序的
為什么這個很重要?網絡總是會盡最大的努力去傳輸完整數據(往往不盡人意)。沒法保證你的數據一定被送到目的地或者一定能接收到別人發送給你的數據
網絡設備(比如:路由器、交換機)都有帶寬限制,或者系統本身的極限。它們也有 CPU、內存、總線和接口包緩沖區,就像我們的客戶端和服務器。TCP 消除了你對于丟包、亂序以及其它網絡通信中通常出現的問題的顧慮
下面的示意圖中,我們將看到 Socket API 的調用順序和 TCP 的數據流:
左邊表示服務器,右邊則是客戶端
左上方開始,注意服務器創建「監聽」Socket 的 API 調用:
socket()
bind()
listen()
accept()
「監聽」Socket 做的事情就像它的名字一樣。它會監聽客戶端的連接,當一個客戶端連接進來的時候,服務器將調用 accept() 來「接受」或者「完成」此連接
客戶端調用 connect() 方法來建立與服務器的鏈接,并開始三次握手。握手很重要是因為它保證了網絡的通信的雙方可以到達,也就是說客戶端可以正常連接到服務器,反之亦然
上圖中間部分往返部分表示客戶端和服務器的數據交換過程,調用了 send() 和 recv()方法
下面部分,客戶端和服務器調用 close() 方法來關閉各自的 socket
打印客戶端和服務端你現在已經了解了基本的 socket API 以及客戶端和服務器是如何通信的,讓我們來創建一個客戶端和服務器。我們將會以一個簡單的實現開始。服務器將打印客戶端發送回來的內容
打印程序服務端下面就是服務器代碼,echo-server.py:
#!/usr/bin/env python3 import socket HOST = "127.0.0.1" # 標準的回環地址 (localhost) PORT = 65432 # 監聽的端口 (非系統級的端口: 大于 1023) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((HOST, PORT)) s.listen() conn, addr = s.accept() with conn: print("Connected by", addr) while True: data = conn.recv(1024) if not data: break conn.sendall(data)
注意:上面的代碼你可能還沒法完全理解,但是不用擔心。這幾行代碼做了很多事情,這
只是一個起點,幫你看見這個簡單的服務器是如何運行的
教程后面有引用部分,里面有很多額外的引用資源鏈接,這個教程中我將把鏈接放在那兒
讓我們一起來看一下 API 調用以及發生了什么
socket.socket() 創建了一個 socket 對象,并且支持 context manager type,你可以使用 with 語句,這樣你就不用再手動調用 s.close() 來關閉 socket 了
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: pass # Use the socket object without calling s.close().
調用 socket() 時傳入的 socket 地址族參數 socket.AF_INET 表示因特網 IPv4 地址族,SOCK_STREAM 表示使用 TCP 的 socket 類型,協議將被用來在網絡中傳輸消息
bind() 用來關聯 socket 到指定的網絡接口(IP 地址)和端口號:
HOST = "127.0.0.1" PORT = 65432 # ... s.bind((HOST, PORT))
bind() 方法的入參取決于 socket 的地址族,在這個例子中我們使用了 socket.AF_INET (IPv4),它將返回兩個元素的元組:(host, port)
host 可以是主機名稱、IP 地址、空字符串,如果使用 IP 地址,host 就應該是 IPv4 格式的字符串,127.0.0.1 是標準的 IPv4 回環地址,只有主機上的進程可以連接到服務器,如果你傳了空字符串,服務器將接受本機所有可用的 IPv4 地址
端口號應該是 1-65535 之間的整數(0是保留的),這個整數就是用來接受客戶端鏈接的 TCP 端口號,如果端口號小于 1024,有的操作系統會要求管理員權限
使用 bind() 傳參為主機名稱的時候需要注意:
如果你在 host 部分 主機名稱 作為 IPv4/v6 socket 的地址,程序可能會產生非確
定性的行為,因為 Python 會使用 DNS 解析后的 第一個 地址,根據 DNS 解析的結
果或者 host 配置 socket 地址將會以不同方式解析為實際的 IPv4/v6 地址。如果想得
到確定的結果傳入的 host 參數建議使用數字格式的地址 引用
我稍后將在 使用主機名 部分討論這個問題,但是現在也值得一提。目前來說你只需要知道當使用主機名時,你將會因為 DNS 解析的原因得到不同的結果
可能是任何地址。比如第一次運行程序時是 10.1.2.3,第二次是 192.168.0.1,第三次是 172.16.7.8 等等
繼續看上面的服務器代碼示例,listen() 方法調用使服務器可以接受連接請求,這使它成為一個「監聽中」的 socket
s.listen() conn, addr = s.accept()
listen() 方法有一個 backlog 參數。它指定在拒絕新的連接之前系統將允許使用的 未接受的連接 數量。從 Python 3.5 開始,這是可選參數。如果不指定,Python 將取一個默認值
如果你的服務器需要同時接收很多連接請求,增加 backlog 參數的值可以加大等待鏈接請求隊列的長度,最大長度取決于操作系統。比如在 Linux 下,參考 /proc/sys/net/core/somaxconn
accept() 方法阻塞并等待傳入連接。當一個客戶端連接時,它將返回一個新的 socket 對象,對象中有表示當前連接的 conn 和一個由主機、端口號組成的 IPv4/v6 連接的元組,更多關于元組值的內容可以查看 socket 地址族 一節中的詳情
這里必須要明白我們通過調用 accept() 方法擁有了一個新的 socket 對象。這非常重要,因為你將用這個 socket 對象和客戶端進行通信。和監聽一個 socket 不同的是后者只用來授受新的連接請求
conn, addr = s.accept() with conn: print("Connected by", addr) while True: data = conn.recv(1024) if not data: break conn.sendall(data)
從 accept() 獲取客戶端 socket 連接對象 conn 后,使用一個無限 while 循環來阻塞調用 conn.recv(),無論客戶端傳過來什么數據都會使用 conn.sendall() 打印出來
如果 conn.recv() 方法返回一個空 byte 對象(b""),然后客戶端關閉連接,循環結束,with 語句和 conn 一起使用時,通信結束的時候會自動關閉 socket 鏈接
打印程序客戶端現在我們來看下客戶端的程序, echo-client.py:
#!/usr/bin/env python3 import socket HOST = "127.0.0.1" # 服務器的主機名或者 IP 地址 PORT = 65432 # 服務器使用的端口 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.connect((HOST, PORT)) s.sendall(b"Hello, world") data = s.recv(1024) print("Received", repr(data))
與服務器程序相比,客戶端程序簡單很多。它創建了一個 socket 對象,連接到服務器并且調用 s.sendall() 方法發送消息,然后再調用 s.recv() 方法讀取服務器返回的內容并打印出來
運行打印程序的客戶端和服務端讓我們運行打印程序的客戶端和服務端,觀察他們的表現,看看發生了什么事情
如果你在運行示例代碼時遇到了問題,可以閱讀 如何使用 Python 開發命令行命令,如果
你使用的是 windows 操作系統,請查看 Python Windows FAQ
打開命令行程序,進入你的代碼所在的目錄,運行打印程序的服務端:
$ ./echo-server.py
你的命令行將被掛起,因為程序有一個阻塞調用
conn, addr = s.accept()
它將等待客戶端的連接,現在再打開一個命令行窗口運行打印程序的客戶端:
$ ./echo-client.py Received b"Hello, world"
在服務端的窗口你將看見:
$ ./echo-server.py Connected by ("127.0.0.1", 64623)
上面的輸出中,服務端打印出了 s.accept() 返回的 addr 元組,這就是客戶端的 IP 地址和 TCP 端口號。示例中的端口號是 64623 這很可能是和你機器上運行的結果不同
查看 socket 狀態想查找你主機上 socket 的當前狀態,可以使用 netstat 命令。這個命令在 macOS, Window, Linux 系統上默認可用
下面這個就是啟動服務后 netstat 命令的輸出結果:
$ netstat -an Active Internet connections (including servers) Proto Recv-Q Send-Q Local Address Foreign Address (state) tcp4 0 0 127.0.0.1.65432 *.* LISTEN
注意本地地址是 127.0.0.1.65432,如果 echo-server.py 文件中 HOST 設置成空字符串 "" 的話,netstat 命令將顯示如下:
$ netstat -an Active Internet connections (including servers) Proto Recv-Q Send-Q Local Address Foreign Address (state) tcp4 0 0 *.65432 *.* LISTEN
本地地址是 *.65432,這表示所有主機支持的 IP 地址族都可以接受傳入連接,在我們的例子里面調用 socket() 時傳入的參數 socket.AF_INET 表示使用了 IPv4 的 TCP socket,你可以在輸出結果中的 Proto 列中看到(tcp4)
上面的輸出是我截取的只顯示了咱們的打印程序服務端進程,你可能會看到更多輸出,具體取決于你運行的系統。需要注意的是 Proto, Local Address 和 state 列。分別表示 TCP socket 類型、本地地址端口、當前狀態
另外一個查看這些信息的方法是使用 lsof 命令,這個命令在 macOS 上是默認安裝的,Linux 上需要你手動安裝
$ lsof -i -n COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME Python 67982 nathan 3u IPv4 0xecf272 0t0 TCP *:65432 (LISTEN)
isof 命令使用 -i 參數可以查看打開的 socket 連接的 COMMAND, PID(process id) 和 USER(user id),上面的輸出就是打印程序服務端
netstat 和 isof 命令有許多可用的參數,這取決于你使用的操作系統。可以使用 man page 來查看他們的使用文檔,這些文檔絕對值得花一點時間去了解,你將受益匪淺,macOS 和 Linux 中使用命令 man netstat 或者 man lsof 命令,windows 下使用 netstat /? 來查看幫助文檔
一個通常會犯的錯誤是在沒有監聽 socket 端口的情況下嘗試連接:
$ ./echo-client.py Traceback (most recent call last): File "./echo-client.py", line 9, ins.connect((HOST, PORT)) ConnectionRefusedError: [Errno 61] Connection refused
也可能是端口號出錯、服務端沒啟動或者有防火墻阻止了連接,這些原因可能很難記住,或許你也會碰到 Connection timed out 的錯誤,記得給你的防火墻添加允許我們使用的端口規則
引用部分有一些常見的 錯誤
通信的流程分解讓我們再仔細的觀察下客戶端是如何與服務端進行通信的:
當使用回環地址時,數據將不會接觸到外部網絡,上圖中,回環地址包含在了 host 里面。這就是回環地址的本質,連接數據傳輸是從本地到主機,這就是為什么你會聽到有回環地址或者 127.0.0.1、::1 的 IP 地址和表示本地主機
應用程序使用回環地址來與主機上的其它進程通信,這使得它與外部網絡安全隔離。由于它是內部的,只能從主機內訪問,所以它不會被暴露出去
如果你的應用程序服務器使用自己的專用數據庫(非公用的),則可以配置服務器僅監聽回環地址,這樣的話網絡上的其它主機就無法連接到你的數據庫
如果你的應用程序中使用的 IP 地址不是 127.0.0.1 或者 ::1,那就可能會綁定到連接到外部網絡的以太網上。這就是你通往 localhost 王國之外的其他主機的大門
這里需要小心,并且可能讓你感到難受甚至懷疑全世界。在你探索 localhost 的安全限制之前,確認讀過 使用主機名 一節。 一個安全注意事項是 **不要使用主機名,要使用
IP 地址**
打印程序的服務端肯定有它自己的一些局限。這個程序只能服務于一個客戶端然后結束。打印程序的客戶端也有它自己的局限,但是還有一個問題,如果客戶端調用了下面的方法s.recv() 方法將返回 b"Hello, world" 中的一個字節 b"H"
data = s.recv(1024)
1024 是緩沖區數據大小限制最大值參數 bufsize,并不是說 recv() 方法只返回 1024個字節的內容
send() 方法也是這個原理,它返回發送內容的字節數,結果可能小于傳入的發送內容,你得處理這處情況,按需多次調用 send() 方法來發送完整的數據
應用程序負責檢查是否已發送所有數據;如果僅傳輸了一些數據,則應用程序需要嘗試傳
遞剩余數據 引用
我們可以使用 sendall() 方法來回避這個過程
和 send() 方法不一樣的是,sendall() 方法會一直發送字節,只到所有的數據傳輸完成
或者中途出現錯誤。成功的話會返回 None 引用
到目前為止,我們有兩個問題:
如何同時處理多個連接請求
我們需要一直調用 send() 或者 recv() 直到所有數據傳輸完成
應該怎么做呢,有很多方式可以實現并發。最近,有一個非常流程的庫叫做 Asynchronous I/O 可以實現,asyncio 庫在 Python 3.4 后默認添加到了標準庫里面。傳統的方法是使用線程
并發的問題是很難做到正確,有許多細微之處需要考慮和防范。可能其中一個細節的問題都會導致整個程序崩潰
我說這些并不是想嚇跑你或者讓你遠離學習和使用并發編程。如果你想讓程序支持大規模使用,使用多處理器、多核是很有必要的。然而在這個教程中我們將使用比線程更傳統的方法使得邏輯更容易推理。我們將使用一個非常古老的系統調用:select()
select() 允許你檢查多個 socket 的 I/O 完成情況,所以你可以使用它來檢測哪個 socket I/O 是就緒狀態從而執行讀取或寫入操作,但是這是 Python,總會有更多其它的選擇,我們將使用標準庫中的selectors 模塊,所以我們使用了最有效的實現,不用在意你使用的操作系統:
這個模塊提供了高層且高效的 I/O 多路復用,基于原始的 select 模塊構建,推薦用
戶使用這個模塊,除非他們需要精確到操作系統層面的使用控制 [引用
](https://docs.python.org/3/lib...
盡管如此,使用 select() 也無法并發執行。這取決于您的工作負載,這種實現仍然會很快。這也取決于你的應用程序對連接所做的具體事情或者它需要支持的客戶端數量
asyncio 使用單線程來處理多任務,使用事件循環來管理任務。通過使用 select(),我們可以創建自己的事件循環,更簡單且同步化。當使用多線程時,即使要處理并發的情況,我們也不得不面臨使用 CPython 或者 PyPy 中的「全局解析器鎖 GIL」,這有效地限制了我們可以并行完成的工作量
說這些是為了解析為什么使用 select() 可能是個更好的選擇,不要覺得你必須使用 asyncio、線程或最新的異步庫。通常,在網絡應用程序中,你的應用程序就是 I/O 綁定:它可以在本地網絡上,網絡另一端的端,磁盤上等待
如果你從客戶端收到啟動 CPU 綁定工作的請求,查看 concurrent.futures模塊,它包含一個 ProcessPoolExecutor 類,用來異步執行進程池中的調用
如果你使用多進程,你的 Python 代碼將被操作系統并行地在不同處理器或者核心上調度運行,并且沒有全局解析器鎖。你可以通過
Python 大會上的演講 John Reese - Thinking Outside the GIL with AsyncIO and Multiprocessing - PyCon 2018 來了解更多的想法
在下一節中,我們將介紹解決這些問題的服務器和客戶端的示例。他們使用 select() 來同時處理多連接請求,按需多次調用 send() 和 recv()
多連接的客戶端和服務端下面兩節中,我們將使用 selectors 模塊中的 selector 對象來創建一個可以同時處理多個請求的客戶端和服務端
多連接的服務端首頁,我們來看眼多連接服務端程序的代碼,multiconn-server.py。這是開始建立監聽 socket 部分
import selectors sel = selectors.DefaultSelector() # ... lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lsock.bind((host, port)) lsock.listen() print("listening on", (host, port)) lsock.setblocking(False) sel.register(lsock, selectors.EVENT_READ, data=None)
這個程序和之前打印程序服務端最大的不同是使用了 lsock.setblocking(False) 配置 socket 為非阻塞模式,這個 socket 的調用將不在是阻塞的。當它和 sel.select() 一起使用的時候(下面會提到),我們就可以等待 socket 就緒事件,然后執行讀寫操作
sel.register() 使用 sel.select() 為你感興趣的事件注冊 socket 監控,對于監聽 socket,我們希望使用 selectors.EVENT_READ 讀取到事件
data 用來存儲任何你 socket 中想存的數據,當 select() 返回的時候它也會返回。我們將使用 data 來跟蹤 socket 上發送或者接收的東西
下面就是事件循環:
import selectors sel = selectors.DefaultSelector() # ... while True: events = sel.select(timeout=None) for key, mask in events: if key.data is None: accept_wrapper(key.fileobj) else: service_connection(key, mask)
sel.select(timeout=None) 調用會阻塞直到 socket I/O 就緒。它返回一個(key, events) 元組,每個 socket 一個。key 就是一個包含 fileobj 屬性的具名元組。key.fileobj 是一個 socket 對象,mask 表示一個操作就緒的事件掩碼
如果 key.data 為空,我們就可以知道它來自于監聽 socket,我們需要調用 accept() 方法來授受連接請求。我們將使用一個 accept() 包裝函數來獲取新的 socket 對象并注冊到 selector 上,我們馬上就會看到
如果 key.data 不為空,我們就可以知道它是一個被接受的客戶端 socket,我們需要為它服務,接著 service_connection() 會傳入 key 和 mask 參數并調用,這包含了所有我們需要在 socket 上操作的東西
讓我們一起來看看 accept_wrapper() 方法做了什么:
def accept_wrapper(sock): conn, addr = sock.accept() # Should be ready to read print("accepted connection from", addr) conn.setblocking(False) data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"") events = selectors.EVENT_READ | selectors.EVENT_WRITE sel.register(conn, events, data=data)
由于監聽 socket 被注冊到了 selectors.EVENT_READ 上,它現在就能被讀取,我們調用 sock.accept() 后立即再立即調 conn.setblocking(False) 來讓 socket 進入非阻塞模式
請記住,這是這個版本服務器程序的主要目標,因為我們不希望它被阻塞。如果被阻塞,那么整個服務器在返回前都處于掛起狀態。這意味著其它 socket 處于等待狀態,這是一種 非常嚴重的 誰都不想見到的服務被掛起的狀態
接著我們使用了 types.SimpleNamespace 類創建了一個對象用來保存我們想要的 socket 和數據,由于我們得知道客戶端連接什么時候可以寫入或者讀取,下面兩個事件都會被用到:
events = selectors.EVENT_READ | selectors.EVENT_WRITE
事件掩碼、socket 和數據對象都會被傳入 sel.register()
現在讓我們來看下,當客戶端 socket 就緒的時候連接請求是如何使用 service_connection() 來處理的
def service_connection(key, mask): sock = key.fileobj data = key.data if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) # Should be ready to read if recv_data: data.outb += recv_data else: print("closing connection to", data.addr) sel.unregister(sock) sock.close() if mask & selectors.EVENT_WRITE: if data.outb: print("echoing", repr(data.outb), "to", data.addr) sent = sock.send(data.outb) # Should be ready to write data.outb = data.outb[sent:]
這就是多連接服務端的核心部分,key 就是從調用 select() 方法返回的一個具名元組,它包含了 socket 對象「fileobj」和數據對象。mask 包含了就緒的事件
如果 socket 就緒而且可以被讀取, mask & selectors.EVENT_READ 就為真,sock.recv() 會被調用。所有讀取到的數據都會被追加到 data.outb 里面。隨后被發送出去
注意 else: 語句,如果沒有收到任何數據:
if recv_data: data.outb += recv_data else: print("closing connection to", data.addr) sel.unregister(sock) sock.close()
這表示客戶端關閉了它的 socket 連接,這時服務端也應該關閉自己的連接。不過別忘了先調用 sel.unregister() 來撤銷 select() 的監控
當 socket 就緒而且可以被讀取的時候,對于正常的 socket 應該一直是這種狀態,任何接收并被 data.outb 存儲的數據都將使用 sock.send() 方法打印出來。發送出去的字節隨后就會被從緩沖中刪除
data.outb = data.outb[sent:]多連接的客戶端
現在讓我們一起來看看多連接的客戶端程序,multiconn-client.py,它和服務端很相似,不一樣的是它沒有監聽連接請求,它以調用 start_connections() 開始初始化連接:
messages = [b"Message 1 from client.", b"Message 2 from client."] def start_connections(host, port, num_conns): server_addr = (host, port) for i in range(0, num_conns): connid = i + 1 print("starting connection", connid, "to", server_addr) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(False) sock.connect_ex(server_addr) events = selectors.EVENT_READ | selectors.EVENT_WRITE data = types.SimpleNamespace(connid=connid, msg_total=sum(len(m) for m in messages), recv_total=0, messages=list(messages), outb=b"") sel.register(sock, events, data=data)
num_conns 參數是從命令行讀取的,表示為服務器建立多少個鏈接。就像服務端程序一樣,每個 socket 都設置成了非阻塞模式
由于 connect() 方法會立即觸發一個 BlockingIOError 異常,所以我們使用 connect_ex() 方法取代它。connect_ex() 會返回一個錯誤指示 errno.EINPROGRESS,不像 connect() 方法直接在進程中返回異常。一旦連接結束,socket 就可以進行讀寫并且通過 select() 方法返回
socket 建立完成后,我們將使用 types.SimpleNamespace 類創建想會傳送的數據。由于每個連接請求都會調用 socket.send(),發送到服務端的消息得使用 list(messages) 方法轉換成列表結構。所有你想了解的東西,包括客戶端將要發送的、已發送的、已接收的消息以及消息的總字節數都存儲在 data 對象中
讓我們再來看看 service_connection()。基本上和服務端一樣:
def service_connection(key, mask): sock = key.fileobj data = key.data if mask & selectors.EVENT_READ: recv_data = sock.recv(1024) # Should be ready to read if recv_data: print("received", repr(recv_data), "from connection", data.connid) data.recv_total += len(recv_data) if not recv_data or data.recv_total == data.msg_total: print("closing connection", data.connid) sel.unregister(sock) sock.close() if mask & selectors.EVENT_WRITE: if not data.outb and data.messages: data.outb = data.messages.pop(0) if data.outb: print("sending", repr(data.outb), "to connection", data.connid) sent = sock.send(data.outb) # Should be ready to write data.outb = data.outb[sent:]
有一個不同的地方,客戶端會跟蹤從服務器接收的字節數,根據結果來決定是否關閉 socket 連接,服務端檢測到客戶端關閉則會同樣的關閉服務端的連接
運行多連接的客戶端和服務端現在讓我們把 multiconn-server.py 和 multiconn-client.py 兩個程序跑起來。他們都使用了命令行參數,如果不指定參數可以看到參數調用的方法:
服務端程序,傳入主機和端口號
$ ./multiconn-server.py usage: ./multiconn-server.py
客戶端程序,傳入啟動服務端程序時同樣的主機和端口號以及連接數量
$ ./multiconn-client.py usage: ./multiconn-client.py
下面就是服務端程序運行起來在 65432 端口上監聽回環地址的輸出:
$ ./multiconn-server.py 127.0.0.1 65432 listening on ("127.0.0.1", 65432) accepted connection from ("127.0.0.1", 61354) accepted connection from ("127.0.0.1", 61355) echoing b"Message 1 from client.Message 2 from client." to ("127.0.0.1", 61354) echoing b"Message 1 from client.Message 2 from client." to ("127.0.0.1", 61355) closing connection to ("127.0.0.1", 61354) closing connection to ("127.0.0.1", 61355)
下面是客戶端,它創建了兩個連接請求到上面的服務端:
$ ./multiconn-client.py 127.0.0.1 65432 2 starting connection 1 to ("127.0.0.1", 65432) starting connection 2 to ("127.0.0.1", 65432) sending b"Message 1 from client." to connection 1 sending b"Message 2 from client." to connection 1 sending b"Message 1 from client." to connection 2 sending b"Message 2 from client." to connection 2 received b"Message 1 from client.Message 2 from client." from connection 1 closing connection 1 received b"Message 1 from client.Message 2 from client." from connection 2 closing connection 2應用程序客戶端和服務端
多連接的客戶端和服務端程序版本與最早的原始版本相比肯定有了很大的改善,但是讓我們再進一步地解決上面「多連接」版本中的不足,然后完成最終版的實現:客戶端/服務器應用程序
我們希望有個客戶端和服務端在不影響其它連接的情況下做好錯誤處理,顯然,如果沒有發生異常,我們的客戶端和服務端不能崩潰的一團糟。這也是到現在為止我們還沒討論的東西,我故意沒有引入錯誤處理機制因為這樣可以使之前的程序容易理解
現在你對基本的 API,非阻塞 socket、select() 等概念已經有所了解了。我們可以繼續添加一些錯誤處理同時討論下「房間里面的大象」的問題,我把一些東西隱藏在了幕后。你應該還記得,我在介紹中討論到的自定義類
首先,讓我們先解決錯誤:
所有的錯誤都會觸發異常,像無效參數類型和內存不足的常見異常可以被拋出;從 Python
3.3 開始,與 socket 或地址語義相關的錯誤會引發 OSError 或其子類之一的異常 引用
我們需要捕獲 OSError 異常。另外一個我沒提及的的問題是延遲,你將在文檔的很多地方看見關于延遲的討論,延遲會發生而且屬于「正常」錯誤。主機或者路由器重啟、交換機端口出錯、電纜出問題或者被拔出,你應該在你的代碼中處理好各種各樣的錯誤
剛才說的「房間里面的大象」問題是怎么回事呢。就像 socket.SOCK_STREAM 這個參數的字面意思一樣,當使用 TCP 連接時,你會從一個連續的字節流讀取的數據,好比從磁盤上讀取數據,不同的是你是從網絡讀取字節流
然而,和使用 f.seek() 讀文件不同,換句話說,沒法定位 socket 的數據流的位置,如果可以像文件一樣定位數據流的位置(使用下標),那你就可以隨意的讀取你想要的數據
當字節流入你的 socket 時,會需要有不同的網絡緩沖區,如果想讀取他們就必須先保存到其它地方,使用 recv() 方法持續的從 socket 上讀取可用的字節流
相當于你從 socket 中讀取的是一塊一塊的數據,你必須使用 recv() 方法不斷的從緩沖區中讀取數據,直到你的應用確定讀取到了足夠的數據
什么時候算「足夠」這取決于你的定義,就 TCP socket 而言,它只通過網絡發送或接收原始字節。它并不了解這些原始字節的含義
這可以讓我們定義一個應用層協議,什么是應用層協議?簡單來說,你的應用會發送或者接收消息,這些消息其實就是你的應用程序的協議
換句話說,這些消息的長度、格式可以定義應用程序的語義和行為,這和我們之前說的從socket 中讀取字節部分內容相關,當你使用 recv() 來讀取字節的時候,你需要知道讀的字節數,并且決定什么時候算讀取完成
這些都是怎么完成的呢?一個方法是只讀取固定長度的消息,如果它們的長度總是一樣的話,這樣做很容易。當你收到固定長度字節消息的時候,就能確定它是個完整的消息
然而,如果你使用定長模式來發送比較短的消息會比較低效,因為你還得處理填充剩余的部分,此外,你還得處理數據不適合放在一個定長消息里面的情況
在這個教程里面,我們將使用一個通用的方案,很多協議都會用到它,包括 HTTP。我們將在每條消息前面追加一個頭信息,頭信息中包括消息的長度和其它我們需要的字段。這樣做的話我們只需要追蹤頭信息,當我們讀到頭信息時,就可以查到消息的長度并且讀出所有字節然后消費它
我們將通過使用一個自定義類來實現接收文本/二進制數據。你可以在此基礎上做出改進或者通過繼承這個類來擴展你的應用程序。重要的是你將看到一個例子實現它的過程
我將會提到一些關于 socket 和字節相關的東西,就像之前討論過的。當你通過 socket 來發送或者接收數據時,其實你發送或者接收到的是原始字節
如果你收到數據并且想讓它在一個多字節解釋的上下文中使用,比如說 4-byte 的整形,你需要考慮它可能是一種不是你機器 CPU 本機的格式。客戶端或者服務器的另外一頭可能是另外一種使用了不同的字節序列的 CPU,這樣的話,你就得把它們轉換成你主機的本地字節序列來使用
上面所說的字節順序就是 CPU 的 字節序,在引用部分的字節序 一節可以查看更多。我們將會利用 Unicode 字符集的優點來規避這個問題,并使用UTF-8 的方式編碼,由于 UTF-8 使用了 8字節 編碼方式,所以就不會有字節序列的問題
你可以查看 Python 關于編碼與 Unicode 的 文檔,注意我們只會編碼消息的頭部。我們將使用嚴格的類型,發送的消息編碼格式會在頭信息中定義。這將讓我們可以傳輸我們覺得有用的任意類型/格式數據
你可以通過調用 sys.byteorder 來決定你的機器的字節序列,比如在我的英特爾筆記本上,運行下面的代碼就可以:
$ python3 -c "import sys; print(repr(sys.byteorder))" "little"
如果我把這段代碼跑在可以模擬大字節序 CPU「PowerPC」的虛擬機上的話,應該是下面的結果:
$ python3 -c "import sys; print(repr(sys.byteorder))" "big"
在我們的例子程序中,應用層的協議定義了使用 UTF-8 方式編碼的 Unicode 字符。對于真正傳輸消息來說,如果需要的話你還是得手動交換字節序列
這取決于你的應用,是否需要它來處理不同終端間的多字節二進制數據,你可以通過添加額外的頭信息來讓你的客戶端或者服務端支持二進制,像 HTTP 一樣,把頭信息做為參數傳進去
不用擔心自己還沒搞懂上面的東西,下面一節我們看到是如果實現的
應用的協議頭讓我們來定義一個完整的協議頭:
可變長度的文本
基于 UTF-8 編碼的 Unicode 字符集
使用 JSON 序列化的一個 Python 字典
其中必須具有的頭應該有以下幾個:
名稱 | 描述 |
---|---|
byteorder | 機器的字節序列(uses sys.byteorder),應用程序可能用不上 |
content-length | 內容的字節長度 |
content-type | 內容的類型,比如 text/json 或者 binary/my-binary-type |
content-encoding | 內容的編碼類型,比如 utf-8 編碼的 Unicode 文本,二進制數據 |
這些頭信息告訴接收者消息數據,這樣的話你就可以通過提供給接收者足夠的信息讓他接收到數據的時候正確的解碼的方式向它發送任何數據,由于頭信息是字典格式,你可以隨意向頭信息中添加鍵值對
發送應用程序消息不過還有一個問題,由于我們使用了變長的頭信息,雖然方便擴展但是當你使用 recv() 方法讀取消息的時候怎么知道頭信息的長度呢
我們前面講到過使用 recv() 接收數據和如何確定是否接收完成,我說過定長的頭可能會很低效,的確如此。但是我們將使用一個比較小的 2 字節定長的頭信息前綴來表示頭信息的長度
你可以認為這是一種混合的發送消息的實現方法,我們通過發送頭信息長度來引導接收者,方便他們解析消息體
為了給你更好地解釋消息格式,讓我們來看看消息的全貌:
消息以 2字節的固定長度的頭開始,這兩個字節是整型的網絡字節序列,表示下面的變長 JSON 頭信息的長度,當我們從 recv() 方法讀取到 2 個字節時就知道它表示的是頭信息長度的整形數字,然后在解碼 JSON 頭之前讀取這么多的字節
JSON 頭包含了頭信息的字典。其中一個就是 content-length,這表示消息內容的數量(不是JSON頭),當我們使用 recv() 方法讀取到了 content-length 個字節的數據時,就表示接收完成并且讀取到了完整的消息
應用程序類最后讓我們來看下成果,我們使用了一個消息類。來看看它是如何在 socket 發生讀寫事件時與 select() 配合使用的
對于這個示例應用程序而言,我必須想出客戶端和服務器將使用什么類型的消息,從這一點來講這遠遠超過了最早時候我們寫的那個玩具一樣的打印程序
為了保證程序簡單而且仍然能夠演示出它是如何在一個真正的程序中工作的,我創建了一個應用程序協議用來實現基本的搜索功能。客戶端發送一個搜索請求,服務器做一次匹配的查找,如果客戶端的請求沒法被識別成搜索請求,服務器就會假定這個是二進制請求,對應的返回二進制響應
跟著下面一節,運行示例、用代碼做實驗后你將會知道他是如何工作的,然后你就可以以這個消息類為起點把他修改成適合自己使用的
就像我們之前討論的,你將在下面看到,處理 socket 時需要保存狀態。通過使用類,我們可以將所有的狀態、數據和代碼打包到一個地方。當連接開始或者接受的時候消息類就會為每個 socket 創建一個實例
類中的很多包裝方法、工具方法在客戶端和服務端上都是差不多的。它們以下劃線開頭,就像 Message._json_encode() 一樣,這些方法通過類使用起來很簡單。這使得它們在其它方法中調用時更短,而且符合 DRY 原則
消息類的服務端程序本質上和客戶端一樣。不同的是客戶端初始化連接并發送請求消息,隨后要處理服務端返回的內容。而服務端則是等待連接請求,處理客戶端的請求消息,隨后發送響應消息
看起來就像這樣:
步驟 | 端 | 動作/消息內容 |
---|---|---|
1 | 客戶端 | 發送帶有請求內容的消息 |
2 | 服務端 | 接收并處理請求消息 |
3 | 服務端 | 發送有響應內容的消息 |
4 | 客戶端 | 接收并處理響應消息 |
下面是代碼的結構:
應用程序 | 文件 | 代碼 |
---|---|---|
服務端 | app-server.py | 服務端主程序 |
服務端 | libserver.py | 服務端消息類 |
客戶端 | app-client.py | 客戶端主程序 |
客戶端 | libclient.py | 客戶端消息類 |
我想通過首先提到它的設計方面來討論 Message 類的工作方式,不過這對我來說并不是立馬就能解釋清楚的,只有在重構它至少五次之后我才能達到它目前的狀態。為什么呢?因為要管理狀態
當消息對象創建的時候,它就被一個使用 selector.register() 事件監控起來的 socket 關聯起來了
message = libserver.Message(sel, conn, addr) sel.register(conn, selectors.EVENT_READ, data=message)
注意,這一節中的一些代碼來自服務端主程序與消息類,但是這部分內容的討論在客戶端
也是一樣的,我將在他們之間存在不同點的時候來解釋客戶端的版本
當 socket 上的事件就緒的時候,它就會被 selector.select() 方法返回。對過 key 對象的 data 屬性獲取到 message 的引用,然后在消息用調用一個方法:
while True: events = sel.select(timeout=None) for key, mask in events: # ... message = key.data message.process_events(mask)
觀察上面的事件循環,可以看見 sel.select() 位于「司機位置」,它是阻塞的,在循環的上面等待。當 socket 上的讀寫事件就緒時,它就會為其服務。這表示間接的它也要負責調用 process_events() 方法。這就是我說 process_events() 方法是入口的原因
讓我們來看下 process_events() 方法做了什么
def process_events(self, mask): if mask & selectors.EVENT_READ: self.read() if mask & selectors.EVENT_WRITE: self.write()
這樣做很好,因為 process_events() 方法很簡潔,它只可以做兩件事情:調用 read() 和 write() 方法
這又把我們帶回了狀態管理的問題。在幾次重構后,我決定如果別的方法依賴于狀態變量里面的某個確定值,那么它們就只應該從 read() 和 write() 方法中調用,這將使處理socket 事件的邏輯盡量的簡單
可能說起來很簡單,但是經歷了前面幾次類的迭代:混合了一些方法,檢查當前狀態、依賴于其它值、在 read() 或者 write() 方法外面調用處理數據的方法,最后這證明了這樣管理起來很復雜
當然,你肯定需要把類按你自己的需求修改使它能夠符合你的預期,但是我建議你盡可能把狀態檢查、依賴狀態的調用的邏輯放在 read() 和 write() 方法里面
讓我們來看看 read() 方法,這是服務端版本,但是客戶端也是一樣的。不同之處在于方法名稱,一個(客戶端)是 process_response() 另一個(服務端)是 process_request()
def read(self): self._read() if self._jsonheader_len is None: self.process_protoheader() if self._jsonheader_len is not None: if self.jsonheader is None: self.process_jsonheader() if self.jsonheader: if self.request is None: self.process_request()
_read() 方法首頁被調用,然后調用 socket.recv() 從 socket 讀取數據并存入到接收緩沖區
記住,當調用 socket.recv() 方法時,組成消息的所有數據并沒有一次性全部到達。socket.recv() 方法可能需要調用很多次,這就是為什么在調用相關方法處理數據前每次都要檢查狀態
當一個方法開始處理消息時,首頁要檢查的就是接受緩沖區保存了足夠的多讀取的數據,如果確定,它們將繼續處理各自的數據,然后把數據存到其它流程可能會用到的變量上,并且清空自己的緩沖區。由于一個消息有三個組件,所以會有三個狀態檢查和處理方法的調用:
Message Component | Method | Output |
---|---|---|
Fixed-length header | process_protoheader() | self._jsonheader_len |
JSON header | process_jsonheader() | self.jsonheader |
Content | process_request() | self.request |
接下來,讓我們一起看看 write() 方法,這是服務端的版本:
def write(self): if self.request: if not self.response_created: self.create_response() self._write()
write() 方法會首先檢測是否有請求,如果有而且響應還沒被創建的話 create_response() 方法就會被調用,它會設置狀態變量 response_created,然后為發送緩沖區寫入響應
如果發送緩沖區有數據,write() 方法會調用 socket.send() 方法
記住,當 socket.send() 被調用時,所有發送緩沖區的數據可能還沒進入到發送隊列,socket 的網絡緩沖區可能滿了,socket.send() 可能需要重新調用,這就是為什么需要檢查狀態的原因,create_response() 應該只被調用一次,但是 _write() 方法需要調用多次
客戶端的 write() 版大體與服務端一致:
def write(self): if not self._request_queued: self.queue_request() self._write() if self._request_queued: if not self._send_buffer: # Set selector to listen for read events, we"re done writing. self._set_selector_events_mask("r")
因為客戶端首頁初始化了一個連接請求到服務端,狀態變量_request_queued被檢查。如果請求還沒加入到隊列,就調用 queue_request() 方法創建一個請求寫入到發送緩沖區中,同時也會使用變量 _request_queued 記錄狀態值防止多次調用
就像服務端一樣,如果發送緩沖區有數據 _write() 方法會調用 socket.send() 方法
需要注意客戶端版本的 write() 方法與服務端不同之處在于最后的請求是否加入到隊列中的檢查,這個我們將在客戶端主程序中詳細解釋,原因是要告訴 selector.select()停止監控 socket 的寫入事件而且我們只對讀取事件感興趣,沒有辦法通知套接字是可寫的
我將在這一節中留下一個懸念,這一節的主要目的是解釋 selector.select() 方法是如何通過 process_events() 方法調用消息類以及它是如何工作的
這一點很重要,因為 process_events() 方法在連接的生命周期中將被調用很多次,因此,要確保那些只能被調用一次的方法正常工作,這些方法中要么需要檢查自己的狀態變量,要么需要檢查調用者的方法中的狀態變量
服務端主程序在服務端主程序 app-server.py 中,主機、端口參數是通過命令行傳遞給程序的:
$ ./app-server.py usage: ./app-server.py
例如需求監聽本地回環地址上面的 65432 端口,需要執行:
$ ./app-server.py 127.0.0.1 65432 listening on ("127.0.0.1", 65432)
創建完 socket 后,一個傳入參數 socket.SO_REUSEADDR 的方法 `to
socket.setsockopt()` 將被調用
# Avoid bind() exception: OSError: [Errno 48] Address already in use lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
設置這個參數是為了避免 端口被占用 的錯誤發生,如果當前程序使用的端口和之前的程序使用的一樣,你就會發現連接處于 TIME_WAIT 狀態
比如說,如果服務器主動關閉連接,服務器會保持為大概兩分鐘的 TIME_WAIT 狀態,具體時長取決于你的操作系統。如果你想在兩分鐘內再開啟一個服務,你將得到一個OSError 表示 端口被戰勝,這樣做是為了確保一些在途的數據包正確的被處理
事件循環會捕捉所有錯誤,以保證服務器正常運行:
while True: events = sel.select(timeout=None) for key, mask in events: if key.data is None: accept_wrapper(key.fileobj) else: message = key.data try: message.process_events(mask) except Exception: print("main: error: exception for", f"{message.addr}: {traceback.format_exc()}") message.close()
當服務器接受到一個客戶端連接時,消息對象就會被創建:
def accept_wrapper(sock): conn, addr = sock.accept() # Should be ready to read print("accepted connection from", addr) conn.setblocking(False) message = libserver.Message(sel, conn, addr) sel.register(conn, selectors.EVENT_READ, data=message)
消息對象會通過 sel.register() 方法關聯到 socket 上,而且它初始化就被設置成了只監控讀事件。當請求被讀取時,我們將通過監聽到的寫事件修改它
在服務器端采用這種方法的一個優點是,大多數情況下,當 socket 正常并且沒有網絡問題時,它始終是可寫的
如果我們告訴 sel.register() 方法監控 EVENT_WRITE 寫入事件,事件循環將會立即喚醒并通知我們這種情況,然而此時 socket 并不用喚醒調用 send() 方法。由于請求還沒被處理,所以不需要發回響應。這將消耗并浪費寶貴的 CPU 周期
服務端消息類在消息切入點一節中,當通過 process_events() 知道 socket 事件就緒時我們可以看到消息對象是如何發出動作的。現在讓我們來看看當數據在 socket 上被讀取是會發生些什么,以及為服務器就緒的消息的組件片段發生了什么
libserver.py 文件中的服務端消息類,可以在 Github 上找到 源代碼
這些方法按照消息處理順序出現在類中
當服務器讀取到至少兩個字節時,定長頭的邏輯就可以開始了
def process_protoheader(self): hdrlen = 2 if len(self._recv_buffer) >= hdrlen: self._jsonheader_len = struct.unpack(">H", self._recv_buffer[:hdrlen])[0] self._recv_buffer = self._recv_buffer[hdrlen:]
網絡字節序列中的定長整型兩字節包含了 JSON 頭的長度,struct.unpack() 方法用來讀取并解碼,然后保存在 self._jsonheader_len 中,當這部分消息被處理完成后,就要調用 process_protoheader() 方法來刪除接收緩沖區中處理過的消息
就像上面的定長頭的邏輯一樣,當接收緩沖區有足夠的 JSON 頭數據時,它也需要被處理:
def process_jsonheader(self): hdrlen = self._jsonheader_len if len(self._recv_buffer) >= hdrlen: self.jsonheader = self._json_decode(self._recv_buffer[:hdrlen], "utf-8") self._recv_buffer = self._recv_buffer[hdrlen:] for reqhdr in ("byteorder", "content-length", "content-type", "content-encoding"): if reqhdr not in self.jsonheader: raise ValueError(f"Missing required header "{reqhdr}".")
self._json_decode() 方法用來解碼并反序列化 JSON 頭成一個字典。由于我們定義的 JSON 頭是 utf-8 格式的,所以解碼方法調用時我們寫死了這個參數,結果將被存放在 self.jsonheader 中,process_jsonheader 方法做完他應該做的事情后,同樣需要刪除接收緩沖區中處理過的消息
接下來就是真正的消息內容,當接收緩沖區有 JSON 頭中定義的 content-length 值的數量個字節時,請求就應該被處理了:
def process_request(self): content_len = self.jsonheader["content-length"] if not len(self._recv_buffer) >= content_len: return data = self._recv_buffer[:content_len] self._recv_buffer = self._recv_buffer[content_len:] if self.jsonheader["content-type"] == "text/json": encoding = self.jsonheader["content-encoding"] self.request = self._json_decode(data, encoding) print("received request", repr(self.request), "from", self.addr) else: # Binary or unknown content-type self.request = data print(f"received {self.jsonheader["content-type"]} request from", self.addr) # Set selector to listen for write events, we"re done reading. self._set_selector_events_mask("w")
把消息保存到 data 變量中后,process_request() 又會刪除接收緩沖區中處理過的數據。接著,如果 content type 是 JSON 的話,它將解碼并反序列化數據。否則(在我們的例子中)數據將被視 做二進制數據并打印出來
最后 process_request() 方法會修改 selector 為只監控寫入事件。在服務端的程序 app-server.py 中,socket 初始化被設置成僅監控讀事件。現在請求已經被全部處理完了,我們對讀取事件就不感興趣了
現在就可以創建一個響應寫入到 socket 中。當 socket 可寫時 create_response() 將被從 write() 方法中調用:
def create_response(self): if self.jsonheader["content-type"] == "text/json": response = self._create_response_json_content() else: # Binary or unknown content-type response = self._create_response_binary_content() message = self._create_message(**response) self.response_created = True self._send_buffer += message
響應會根據不同的 content type 的不同而調用不同的方法創建。在這個例子中,當 action == "search" 的時候會執行一個簡單的字典查找。你可以在這個地方添加你自己的處理方法并調用
一個不好處理的問題是響應寫入完成時如何關閉連接,我會在 _write() 方法中調用 close()
def _write(self): if self._send_buffer: print("sending", repr(self._send_buffer), "to", self.addr) try: # Should be ready to write sent = self.sock.send(self._send_buffer) except BlockingIOError: # Resource temporarily unavailable (errno EWOULDBLOCK) pass else: self._send_buffer = self._send_buffer[sent:] # Close when the buffer is drained. The response has been sent. if sent and not self._send_buffer: self.close()
雖然close() 方法的調用有點隱蔽,但是我認為這是一種權衡。因為消息類一個連接只處理一條消息。寫入響應后,服務器無需執行任何操作。它的任務就完成了
客戶端主程序客戶端主程序 app-client.py 中,參數從命令行中讀取,用來創建請求并連接到服務端
$ ./app-client.py usage: ./app-client.py
來個示例演示一下:
$ ./app-client.py 127.0.0.1 65432 search needle
當從命令行參數創建完一個字典來表示請求后,主機、端口、請求字典一起被傳給 start_connection()
def start_connection(host, port, request): addr = (host, port) print("starting connection to", addr) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setblocking(False) sock.connect_ex(addr) events = selectors.EVENT_READ | selectors.EVENT_WRITE message = libclient.Message(sel, sock, addr, request) sel.register(sock, events, data=message)
對服務器的 socket 連接被創建,消息對象被傳入請求字典并創建
和服務端一樣,消息對象在 sel.register() 方法中被關聯到 socket 上。然而,客戶端不同的是,socket 初始化的時候會監控讀寫事件,一旦請求被寫入,我們將會修改為只監控讀取事件
這種實現和服務端一樣有好處:不浪費 CPU 生命周期。請求發送完成后,我們就不關注寫入事件了,所以不用保持狀態等待處理
客戶端消息類在 消息入口點 一節中,我們看到過,當 socket 使用準備就緒時,消息對象是如何調用具體動作的。現在讓我們來看看 socket 上的數據是如何被讀寫的,以及消息準備好被加工的時候發生了什么
客戶端消息類在 libclient.py 文件中,可以在 Github 上找到 源代碼
這些方法按照消息處理順序出現在類中
客戶端的第一個任務就是讓請求入隊列:
def queue_request(self): content = self.request["content"] content_type = self.request["type"] content_encoding = self.request["encoding"] if content_type == "text/json": req = { "content_bytes": self._json_encode(content, content_encoding), "content_type": content_type, "content_encoding": content_encoding } else: req = { "content_bytes": content, "content_type": content_type, "content_encoding": content_encoding } message = self._create_message(**req) self._send_buffer += message self._request_queued = True
用來創建請求的字典,取決于客戶端程序 app-client.py 中傳入的命令行參數,當消息對象創建的時候,請求字典被當做參數傳入
請求消息被創建并追加到發送緩沖區中,消息將被 _write() 方法發送,狀態參數 self._request_queued 被設置,這使 queue_request() 方法不會被重復調用
請求發送完成后,客戶端就等待服務器的響應
客戶端讀取和處理消息的方法和服務端一致,由于響應數據是從 socket 上讀取的,所以處理 header 的
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/42513.html
摘要:個高級多線程面試題及回答后端掘金在任何面試當中多線程和并發方面的問題都是必不可少的一部分。目前在生產環基于的技術問答網站系統實現后端掘金這一篇博客將詳細介紹一個基于的問答網站的實現,有詳細的代碼。 15 個高級 Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當中多線程和并發方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺資訊職位,那么你應該準備很多...
摘要:個高級多線程面試題及回答后端掘金在任何面試當中多線程和并發方面的問題都是必不可少的一部分。目前在生產環基于的技術問答網站系統實現后端掘金這一篇博客將詳細介紹一個基于的問答網站的實現,有詳細的代碼。 15 個高級 Java 多線程面試題及回答 - 后端 - 掘金在任何Java面試當中多線程和并發方面的問題都是必不可少的一部分。如果你想獲得任何股票投資銀行的前臺資訊職位,那么你應該準備很多...
摘要:做一個搬運工,希望自己能努力學習,也希望大神們的東西能讓更多的人看到不斷更新更新日志新增了網絡安全分類,整理了排版布局新增了的鏈接,將一些雜七雜八的東西弄到了一篇新文章上了,叫做積累與雜貨鋪一以及相關教程的規范與相關中文學習大本營中文文檔簡 做一個搬運工,希望自己能努力學習,也希望大神們的東西能讓更多的人看到 不斷更新 更新日志:2017.10.13 新增了網絡安全分類,整理了排版布局...
閱讀 2858·2021-11-22 13:54
閱讀 3522·2021-11-16 11:44
閱讀 1370·2021-09-07 10:19
閱讀 1470·2019-08-29 17:30
閱讀 3196·2019-08-29 11:33
閱讀 3543·2019-08-26 12:18
閱讀 2886·2019-08-26 11:53
閱讀 1336·2019-08-26 10:47