摘要:一旦有事件產生可能是一次出現好多個事件,就會按照優先級依次調用每個事件的回調函數。注意,是有超時的,所以一些無法以文件描述符的形式存在的事件也可以有機會被觸發。
這一篇主要想跟大家分享一下 Gevent 實現的基礎邏輯,也是有同學對這個很感興趣,所以貼出來跟大家一起分享一下。
Greenlet我們知道 Gevent 是基于 Greenlet 實現的,greenlet 有的時候也被叫做微線程或者協程。其實 Greenlet 本身非常簡單,其自身實現的功能也非常直接。區別于常規的編程思路——順序執行、調用進棧、返回出棧—— Greenlet 提供了一種在不同的調用棧之間自由跳躍的功能。從一個簡單的例子來看一下吧(摘自官方文檔):
from greenlet import greenlet def test1(): print 12 gr2.switch() print 34 def test2(): print 56 gr1.switch() print 78 gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch()
這里,每一個 greenlet 就是一個調用棧——您可以把他想象成一個線程,只不過真正的線程可以并行執行,而同一時刻只能有一個 greenlet 在執行(同一線程里)。正如例子中最后三句話,我們創建了 gr1 和 gr2 兩個不同的調用棧空間,入口函數分別是 test1 和 test2;這最后一句 gr1.switch() 得多解釋一點。
因為除了 gr1 和 gr2,我們還有一個棧空間,也就是所有 Python 程序都得有的默認的棧空間——我們暫且稱之為 main,而這一句 gr1.switch() 恰恰實現了從 main 到 gr1 的跳躍,也就是從當前的棧跳到指定的棧。這時,就猶如常規調用 test1() 一樣,gr1.switch() 的調用暫時不會返回結果,程序會跳轉到 test1 繼續執行;只不過區別于普通函數調用時 test1() 會向當前棧壓棧,而 gr1.switch() 則會將當前棧存檔,替換成 gr1 的棧。如圖所示:
對于這種棧的切換,我們有時也稱之為執行權的轉移,或者說 main 交出了執行權,同時 gr1 獲得了執行權。Greenlet 在底層是用匯編實現的這樣的切換:把當前的棧(main)相關的寄存器啊什么的保存到內存里,然后把原本保存在內存里的 gr1 的相關信息恢復到寄存器里。這種操作速度非常快,比操作系統對多進程調度的上下文切換還要快。代碼在這里,有興趣的同學可以一起研究一下(其中 switch_x32_unix.h 是我寫的哈哈)。
回到前面的例子,最后一句 gr1.switch() 調用將執行點跳到了 gr1 的第一句,于是輸出了 12。隨后順序執行到 gr2.switch(),繼而跳轉到 gr2 的第一句,于是輸出了 56。接著又是 gr1.switch(),跳回到 gr1,從之前跳出的地方繼續——對 gr1 而言就是 gr2.switch() 的調用返回了結果 None,然后輸出 34。
這個時候 test1 執行到頭了,gr1 的棧里面空了。Greenlet 設計了 parent greenlet 的概念,就是說,當一個 greenlet 的入口函數執行完之后,會自動切換回其 parent。默認情況下,greenlet 的 parent 就是創建該 greenlet 時所在的那個棧,前面的例子中,gr1 和 gr2 都是在 main 里被創建的,所以他們倆的 parent 都是 main。所以當 gr1 結束的時候,會回到 main 的最后一句,接著 main 結束了,所以整個程序也就結束了——78 從來沒有被執行到過。另外,greenlet 的 parent 也可以手工設置。
簡單來看,greenlet 只是為 Python 語言增加了創建多條執行序列的功能,而且多條執行序列之間的切換還必須得手動顯式調用 switch() 才行;這些都跟異步 I/O 沒有必然關系。
gevent.sleep接著來看 Gevent。最簡單的一個 Gevent 示例就是這樣的了:
import gevent gevent.sleep(1)
貌似非常簡單的一個 sleep,卻包含了 Gevent 的關鍵結構,讓我們仔細看一下 sleep 的實現吧。代碼在 gevent/hub.py:
def sleep(seconds=0): hub = get_hub() loop = hub.loop hub.wait(loop.timer(seconds))
這里我把一些當前用不著的代碼做了一些清理,只留下了三句關鍵的代碼,其中就有 Gevent 的兩個關鍵的部件——hub 和 loop。loop 是 Gevent 的核心部件,也就是主循環核心,默認是用 Cython 寫的 libev 的包裝(所以性能杠杠滴),稍后會在詳細提到它。hub 則是一個 greenlet,里面跑著 loop。
hub 是一個單例,從 get_hub() 的源碼就可以看出來:
import _thread _threadlocal = _thread._local() def get_hub(*args, **kwargs): global _threadlocal try: return _threadlocal.hub except AttributeError: hubtype = get_hub_class() hub = _threadlocal.hub = hubtype(*args, **kwargs) return hub
所以第一次執行 get_hub() 的時候,就會創建一個 hub 實例:
class Hub(greenlet): loop_class = config("gevent.core.loop", "GEVENT_LOOP") def __init__(self): greenlet.__init__(self) loop_class = _import(self.loop_class) self.loop = loop_class()
同樣這是一段精簡了的代碼,反映了一個 hub 的關鍵屬性——loop。loop 實例隨著 hub 實例的創建而創建,默認的 loop 就是 gevent/core.ppyx 里的 class loop,也可以通過環境變量 GEVENT_LOOP 來自定義。
值得注意的是,截止到 hub = get_hub() 和 loop = hub.loop,我們都只是創建了 hub 和 loop,并沒有真正開始跑我們的主循環。稍安勿躁,第三句就要開始了。
loop 有一堆接口,對應著底層 libev 的各個功能,詳見此處。我們這里用到的是 timer(seconds),該函數返回的是一個 watcher 對象,對應著底層 libev 的 watcher 概念。我們大概能猜到,這個 watcher 對象會在幾秒鐘之后做一些什么事情,但是具體怎么做,讓我們一起看看 hub.wait() 的實現吧。
def wait(self, watcher): waiter = Waiter() watcher.start(waiter.switch) waiter.get()
代碼也不長,不過能看到 watcher 的接口 watcher.start(method),也就是說,當給定的幾秒鐘過了之后,會調用這里給的函數,也就是 waiter.switch。讓我們再看一下這里用到的 Waiter,都是在同一個文件 hub.py 里面:
from greenlet import getcurrent class Waiter(object): def __init__(self): self.hub = get_hub() self.greenlet = None def switch(self): assert getcurrent() is self.hub self.greenlet.switch() def get(self): assert self.greenlet is None self.greenlet = getcurrent() try: self.hub.switch() finally: self.greenlet = None
這里同樣刪掉了大量干擾因素。根據前面 wait() 的定義,我們會先創建一個 waiter,然后調用其 get(),隨后幾秒鐘之后 loop 會調用其 switch()。一個個看。
get() 一上來會保證自己不會被同時調用到(assert),接著就去獲取了當前的 greenlet,也就是調用 get() 時所處的棧,一直往前找,找到 sleep(1),所以 getcurrent() 的結果是 main。Waiter 隨后將 main 保存在了 self.greenlet 引用中。
下面的一句話是重中之重了,self.hub.switch()!由不管任何上下文中,直接往 hub 里跳。由于這是第一次跳進 hub 里,所以此時 loop 就開始運轉了。
正巧,我們之前已經通過 loop.timer(1) 和 watcher.start(waiter.switch),在 loop 里注冊了說,1 秒鐘之后去調用 waiter.switch,loop 一旦跑起來就會嚴格執行之前注冊的命令。所以呢,一秒鐘之后,我們在 hub 的棧中,調用到了 Waiter.switch()。
在 switch() 里,程序一上來就要驗證當前上下文必須得是 hub,翻閱一下前面的代碼,這個是必然的。最后,跳到 self.greenlet!還記得它被設置成什么了嗎?——main。于是乎,我們就回到了最初的代碼里,gevent.sleep(1) 在經過了 1 秒鐘的等待之后終于返回了。
回頭看一下這個過程,其實也很簡單的:當我們需要等待一個事件發生時——比如需要等待 1 秒鐘的計時器事件,我們就把當前的執行棧跟這個事件做一個綁定(watcher.start(waiter.switch)),然后把執行權交給 hub;hub 則會在事件發生后,根據注冊的記錄盡快回到原來的斷點繼續執行。
異步hub 一旦拿到執行權,就可以做很多事情了,比如切換到別的 greenlet 去執行一些其他的任務,直到這些 greenlet 又主動把執行權交回給 hub。宏觀的來看,就是這樣的:一個 hub,好多個其他的任務 greenlet(其中沒準就包括 main),hub 負責總調度,去依次調用各個任務 greenlet;任務 greenlet 則在執行至下一次斷點時,主動切換回 hub。這樣一來,許多個任務 greenlet 就可以看似并行地同步運行了,這種任務調度方式叫做協作式的任務調度(cooperative scheduling)。
舉個例子:
import gevent def beep(interval): while True: print("Beep %s" % interval) gevent.sleep(interval) for i in range(10): gevent.spawn(beep, i) beep(20)
例子里我們總共創建了 10 個 greenlet,每一個都會按照不同頻率輸出“蜂鳴”;最后一句的 beep(20) 又讓 main greenlet 也不斷地蜂鳴。算上 hub,這個例子一共會有 12 個不同的 greenlet 在協作式地運行。
I/OGevent 最主要的功能當然是異步 I/O 了。其實,I/O 跟前面 sleep 的例子沒什么本質的區別,只不過 sleep 用的 watcher 是 timer,而 I/O 用到的 watcher 是 io。比如說 wait_read(fileno) 是這樣的:
def wait_read(fileno): hub = get_hub() io = hub.loop.io(fileno, 1) return hub.wait(io)
沒什么太大區別吧,原理其實都是一樣的。基于這個,我們就可以搞異步 socket 了。socket 的接口較為復雜,這里提取一些標志性的代碼一起讀一下吧:
class socket(object): def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0): self._sock = _realsocket(family, type, proto) # 創建底層的 socket self._sock.setblocking(0) # 將其設置為非阻塞的 fileno = self._sock.fileno() # 獲得其文件描述符 self.hub = get_hub() # 自己留一份 hub 的引用,省的每次再現取 io = self.hub.loop.io # 快捷方式 self._read_event = io(fileno, 1) # socket 的讀取事件 self._write_event = io(fileno, 2) # socket 的寫入事件 def _wait(self, watcher): assert watcher.callback is None # 一個 socket 只能被一個 greenlet 用 self.hub.wait(watcher) # 見之前的例子,等待一個事件發生 def recv(self, *args): sock = self._sock while True: try: return sock.recv(*args) # 異步接收,要么立即成功,要么立即失敗 except error as ex: if ex.args[0] != EWOULDBLOCK: # 如果失敗的話,除了是異步等待的情況, raise # 其他情況都報錯 self._wait(self._read_event) # 等待 socket 有數據可讀libev
最后提一點關于 libev 的東西,因為有同學也問到 Gevent 底層的調度方式。簡單來說,libev 是依賴操作系統底層的異步 I/O 接口實現的,Linux 用的是 epoll,FreeBSD 則是 kqueue。Python 代碼里,socket 會創建一堆 io watcher,對應底層則是將一堆文件描述符添加到一個——比如—— epoll 的句柄里。當切換到 hub 之后,libev 會調用底層的 epoll_wait 來等待這些 socket 中可能出現的事件。一旦有事件產生(可能是一次出現好多個事件),libev 就會按照優先級依次調用每個事件的回調函數。注意,epoll_wait 是有超時的,所以一些無法以文件描述符的形式存在的事件也可以有機會被觸發。關于 libev 網上還有很多資料,有興趣大家可以自行查閱。
Gevent 的性能調優Gevent 不是銀彈,不能無限制地創建 greenlet。正如多線程編程一樣,用 gevent 寫服務器也應該創建一個“微線程池”,超過池子大小的 spawn 應該被阻塞并且開始排隊。只有這樣,才能保證同時運行的 greenlet 數量不至于多到顯著增加異步等待的恢復時間,從而保證每個任務的響應速度。其實,當池子的大小增加到一定程度之后,CPU 使用量的增速會放緩甚至變為 0,這時繼續增加池子大小只能導致回調函數開始排隊,不能真正增加吞吐量。正確的做法是增加硬件或者優化代碼(提高算法效率、減少無謂調用等)。
關于 pool 的大小,我覺得是可以算出來的:
1、在壓力較小、pool 資源充足的情況下,測得單個請求平均處理總時間,記作 Ta
2、根據系統需求,估計一下能接受的最慢的請求處理時間,記作 Tm
3、設 Ta 中有 Ts 的時間,執行權是不屬于當前處理中的 greenlet 的,比如正在進行異步的數據庫訪問或是調用遠端 API 等后端訪問
4、在常規壓力下,通過測量后端訪問請求處理的平均時間,根據代碼實際調用情況測算出 Ts
5、pool 的大小 = (Tm / (Ta - Ts)) * 150%,這里的 150% 是個 buffer 值,拍腦門拍出來的
比如理想情況下平均每個請求處理需要 20ms,其中平均有 15ms 是花在數據庫訪問上(假設數據庫性能較為穩定,能夠線性 scale)。如果最大能容忍的請求處理時間是 500ms 的話,那池子大小應該設置成 (500 / (20 - 15)) * 150% = 150,也就意味著單進程最大并發量是 150。
從這個算法也可以看出,花在 Python 端的 CPU 時間越少,系統并發量就越高,而花在后端訪問上的時間長短對并發影響不是很大——當然了,依然得假設數據庫等后端可以線性 scale。
下面是我之前在 Amazon EC2 m1.small 機器上的部分測試結果,對比了同步多進程和 Gevent 在處理包含異步 PostgreSQL 和 Redis 訪問的請求時的性能:
Log Format (per actor) handling time for 500 requests / Time receiving 500 responses - time per handling / time per request - raw handling rate / request per second
8 actors, 128 testers: 798 rps on client
1230.08 ms / 5649.88 ms - 2.46 ms / 11.30 ms - 406.48 rps / 88.50 rps 1707.71 ms / 5938.53 ms - 3.42 ms / 11.88 ms - 292.79 rps / 84.20 rps 2219.12 ms / 6324.48 ms - 4.44 ms / 12.65 ms - 225.31 rps / 79.06 rps 1446.94 ms / 5491.89 ms - 2.89 ms / 10.98 ms - 345.56 rps / 91.04 rps 1064.61 ms / 5189.07 ms - 2.13 ms / 10.38 ms - 469.66 rps / 96.36 rps 2099.23 ms / 5844.37 ms - 4.20 ms / 11.69 ms - 238.18 rps / 85.55 rps
1 async actor with 8 concurrency limit, 128 testers: 1031 rps on client
3995.44 ms / 560.62 ms - 7.99 ms / 1.12 ms - 125.14 rps / 891.87 rps 4369.57 ms / 575.34 ms - 8.74 ms / 1.15 ms - 114.43 rps / 869.06 rps 4388.47 ms / 590.63 ms - 8.78 ms / 1.18 ms - 113.93 rps / 846.55 rps 4439.61 ms / 579.39 ms - 8.88 ms / 1.16 ms - 112.62 rps / 862.97 rps 3866.82 ms / 574.92 ms - 7.73 ms / 1.15 ms - 129.31 rps / 869.69 rps
1 async actor with no concurrency limit, 128 testers: 987 rps on client
38191.16 ms / 551.76 ms - 76.38 ms / 1.10 ms - 13.09 rps / 906.20 rps 34354.80 ms / 564.43 ms - 68.71 ms / 1.13 ms - 14.55 rps / 885.84 rps 40397.18 ms / 543.23 ms - 80.79 ms / 1.09 ms - 12.38 rps / 920.42 rps 45406.02 ms / 490.45 ms - 90.81 ms / 0.98 ms - 11.01 rps / 1019.48 rps 37106.92 ms / 581.95 ms - 74.21 ms / 1.16 ms - 13.47 rps / 859.18 rps
能看出來,同樣是 8 的并發限制,同步比異步處理快兩三倍(但是 load balance 拉低了同步的優勢),吞吐量上雖比不上異步,但也不差。在去掉并發限制之后,吞吐量變化不大,但處理時間翻了 10 倍(因為大量 callback 開始排隊,無法及時被調用到),且不穩定。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/37306.html
摘要:背景手里有一個項目,代碼按照前端代碼庫后端代碼庫分別在上,分散帶來的結果是,不容易持續集成,比如你可能需要很多的去保證一個項目的正常運作,但是這個項目也不是特別大,所以嘗試將代碼融合,于此同時將代碼化,用于持續部署。 背景 手里有一個web項目,代碼按照前端代碼庫、后端代碼庫分別在GitHub上,分散帶來的結果是,不容易持續集成,比如你可能需要很多的job去保證一個項目的正常運作,但是...
摘要:背景手里有一個項目,代碼按照前端代碼庫后端代碼庫分別在上,分散帶來的結果是,不容易持續集成,比如你可能需要很多的去保證一個項目的正常運作,但是這個項目也不是特別大,所以嘗試將代碼融合,于此同時將代碼化,用于持續部署。 背景 手里有一個web項目,代碼按照前端代碼庫、后端代碼庫分別在GitHub上,分散帶來的結果是,不容易持續集成,比如你可能需要很多的job去保證一個項目的正常運作,但是...
摘要:背景手里有一個項目,代碼按照前端代碼庫后端代碼庫分別在上,分散帶來的結果是,不容易持續集成,比如你可能需要很多的去保證一個項目的正常運作,但是這個項目也不是特別大,所以嘗試將代碼融合,于此同時將代碼化,用于持續部署。 背景 手里有一個web項目,代碼按照前端代碼庫、后端代碼庫分別在GitHub上,分散帶來的結果是,不容易持續集成,比如你可能需要很多的job去保證一個項目的正常運作,但是...
閱讀 3642·2021-11-23 09:51
閱讀 1990·2021-11-16 11:42
閱讀 3234·2021-11-08 13:20
閱讀 1097·2019-08-30 15:55
閱讀 2205·2019-08-30 10:59
閱讀 1239·2019-08-29 14:04
閱讀 1018·2019-08-29 12:41
閱讀 2006·2019-08-26 12:22