摘要:所以系統的吞吐能力取決于每個線程的操作耗時。另外過多的線程,也會帶來更多的開銷。其代表派是以及里的新秀。類似線程也有自己的棧。線程中斷的條件只有兩個,一個是拋異常,另外一個就是。
什么是協程(coroutine)
這東西其實有很多名詞,比如有的人喜歡稱為纖程(Fiber),或者綠色線程(GreenThread)。其實最直觀的解釋可以定義為線程的線程。有點拗口,但本質上就是這樣。
我們先回憶一下線程的定義,操作系統產生一個進程,進程再產生若干個線程并行的處理邏輯,線程的切換由操作系統負責調度。傳統語言C++ Java等線程其實與操作系統線程是1:1的關系,每個線程都有自己的Stack, Java在64位系統默認Stack大小是1024KB,所以指望一個進程開啟上萬個線程是不現實的。但是實際上我們也不會這么干,因為起這么多線程并不能充分的利用CPU,大部分線程處于等待狀態,CPU也沒有這么核讓線程使用。所以一般線程數目都是CPU的核數。
傳統的J2EE系統都是基于每個請求占用一個線程去完成完整的業務邏輯,(包括事務)。所以系統的吞吐能力取決于每個線程的操作耗時。如果遇到很耗時的I/O行為,則整個系統的吞吐立刻下降,比如JDBC是同步阻塞的,這也是為什么很多人都說數據庫是瓶頸的原因。這里的耗時其實是讓CPU一直在等待I/O返回,說白了線程根本沒有利用CPU去做運算,而是處于空轉狀態。暴殄天物啊。另外過多的線程,也會帶來更多的ContextSwitch開銷。
Java的JDK里有封裝很好的ThreadPool,可以用來管理大量的線程生命周期,但是本質上還是不能很好的解決線程數量的問題,以及線程空轉占用CPU資源的問題。
先階段行業里的比較流行的解決方案之一就是單線程加上異步回調。其代表派是node.js以及Java里的新秀Vert.x。他們的核心思想是一樣的,遇到需要進行I/O操作的地方,就直接讓出CPU資源,然后注冊一個回調函數,其他邏輯則繼續往下走,I/O結束后帶著結果向事件隊列里插入執行結果,然后由事件調度器調度回調函數,傳入結果。這時候執行的地方可能就不是你原來的代碼區塊了,具體表現在代碼層面上,你會發現你的局部變量全部丟失,畢竟相關的棧已經被覆蓋了,所以為了保存之前的棧上數據,你要么選擇帶著一起放入回調函數里,要么就不停的嵌套,從而引起反人類的Callback hell.
因此相關的Promise,CompletableFuture等技術都是為解決相關的問題而產生的。但是本質上還是不能解決業務邏輯的割裂。
說了這么多,終于可以提一下協程了,協程的本質上其實還是和上面的方法一樣,只不過他的核心點在于調度那塊由他來負責解決,遇到阻塞操作,立刻yield掉,并且記錄當前棧上的數據,阻塞完后立刻再找一個線程恢復棧并把阻塞的結果放到這個線程上去跑,這樣看上去好像跟寫同步代碼沒有任何差別,這整個流程可以稱為coroutine,而跑在由coroutine負責調度的線程稱為Fiber。比如Golang里的 go關鍵字其實就是負責開啟一個Fiber,讓func邏輯跑在上面。而這一切都是發生的用戶態上,沒有發生在內核態上,也就是說沒有ContextSwitch上的開銷。
既然我們的標題叫Java里的協程,自然我們會討論JVM上的實現,JVM上早期有kilim以及現在比較成熟的Quasar。而本文章會全部基于Quasar,因為kilim已經很久不更新了。
簡單的例子,用Java寫出Golang的味道上面已經說明了什么是Fiber,什么是coroutine。這里嘗試通過Quasar來實現類似于golang的coroutine以及channel。這里假設各位已經大致了解golang。
為了對比,這里先用golang實現一個對于10以內自然數分別求平方的例子,當然了可以直接單線程for循環就完事了,但是為了凸顯coroutine的高逼格,我們還是要稍微復雜化一點的。
func counter(out chan<- int) { for x := 0; x < 10; x++ { out <- x } close(out) } func squarer(out chan<- int, in <-chan int) { for v := range in { out <- v * v } close(out) } func printer(in <-chan int) { for v := range in { fmt.Println(v) } } func main() { //定義兩個int類型的channel naturals := make(chan int) squares := make(chan int) //產生兩個Fiber,用go關鍵字 go counter(naturals) go squarer(squares, naturals) //獲取計算結果 printer(squares) }
上面的例子,有點類似生產消費者模式,通過channel兩解耦兩邊的數據共享。大家可以將channel理解為Java里的SynchronousQueue。那傳統的基于線程模型的Java實現方式,想必大家都知道怎么做,這里就不啰嗦了,我直接上Quasar版的,幾乎可以原封不動的copy golang的代碼。
public class Example { private static void printer(Channelin) throws SuspendExecution, InterruptedException { Integer v; while ((v = in.receive()) != null) { System.out.println(v); } } public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { //定義兩個Channel Channel naturals = Channels.newChannel(-1); Channel squares = Channels.newChannel(-1); //運行兩個Fiber實現. new Fiber(() -> { for (int i = 0; i < 10; i++) naturals.send(i); naturals.close(); }).start(); new Fiber(() -> { Integer v; while ((v = naturals.receive()) != null) squares.send(v * v); squares.close(); }).start(); printer(squares); } }
看起來Java似乎要啰嗦一點,沒辦法這是Java的風格,而且畢竟不是語言上支持coroutine,是通過第三方的庫。到后面我會考慮用其他JVM上的語言去實現,這樣會顯得更精簡一點。
說到這里各位肯定對Fiber很好奇了。也許你會表示懷疑Fiber是不是如上面所描述的那樣,下面我們嘗試用Quasar建立一百萬個Fiber,看看內存占用多少,我先嘗試了創建百萬個Thread。
for (int i = 0; i < 1_000_000; i++) { new Thread(() -> { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
很不幸,直接報Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread,這是情理之中的。下面是通過Quasar建立百萬個Fiber
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution { int FiberNumber = 1_000_000; CountDownLatch latch = new CountDownLatch(1); AtomicInteger counter = new AtomicInteger(0); for (int i = 0; i < FiberNumber; i++) { new Fiber(() -> { counter.incrementAndGet(); if (counter.get() == FiberNumber) { System.out.println("done"); } Strand.sleep(1000000); }).start(); } latch.await(); }
我這里加了latch,阻止程序跑完就關閉,Strand.sleep其實跟Thread.sleep一樣,只是這里針對的是Fiber
最終控制臺是可以輸出done的,說明程序已經創建了百萬個Fiber,設置Sleep是為了讓Fiber一直運行,從而方便計算內存占用。官方宣稱一個空閑的Fiber大約占用400Byte,那這里應該是占用400MB堆內存,但是這里通過jmap -heap pid顯示大約占用了1000MB,也就是說一個Fiber占用1KB。
Quasar是怎么實現Fiber的其實Quasar實現的coroutine的方式與Golang很像,只不過一個是框架級別實現,一個是語言內置機制而已。
如果你熟悉了Golang的調度機制,那理解Quasar的調度機制就會簡單很多,因為兩者是差不多的。
Quasar里的Fiber其實是一個continuation,他可以被Quasar定義的scheduler調度,一個continuation記錄著運行實例的狀態,而且會被隨時中斷,并且也會隨后在他被中斷的地方恢復。Quasar其實是通過修改bytecode來達到這個目的,所以運行Quasar程序的時候,你需要先通過java-agent在運行時修改你的代碼,當然也可以在編譯期間這么干。golang的內置了自己的調度器,Quasar則默認使用ForkJoinPool這個JDK7以后才有的,具有work-stealing功能的線程池來當調度器。work-stealing非常重要,因為你不清楚哪個Fiber會先執行完,而work-stealing可以動態的從其他的等等隊列偷一個context過來,這樣可以最大化使用CPU資源。
那這里你會問了,Quasar怎么知道修改哪些字節碼呢,其實也很簡單,Quasar會通過java-agent在運行時掃描哪些方法是可以中斷的,同時會在方法被調用前和調度后的方法內插入一些continuation邏輯,如果你在方法上定義了@Suspendable注解,那Quasar會對調用該注解的方法做類似下面的事情。
這里假設你在方法f上定義了@Suspendable,同時去調用了有同樣注解的方法g,那么所有調用f的方法會插入一些字節碼,這些字節碼的邏輯就是記錄當前Fiber棧上的狀態,以便在未來可以動態的恢復。(Fiber類似線程也有自己的棧)。在suspendable方法鏈內Fiber的父類會調用Fiber.park,這樣會拋出SuspendExecution異常,從而來停止線程的運行,好讓Quasar的調度器執行調度。這里的SuspendExecution會被Fiber自己捕獲,業務層面上不應該捕獲到。如果Fiber被喚醒了(調度器層面會去調用Fiber.unpark),那么f會在被中斷的地方重新被調用(這里Fiber會知道自己在哪里被中斷),同時會把g的調用結果(g會return結果)插入到f的恢復點,這樣看上去就好像g的return是f的local variables了,從而避免了callback嵌套。
上面啰嗦了一大堆,其實簡單點講就是,想辦法讓運行中的線程棧停下來,好讓Quasar的調度器介入。JVM線程中斷的條件只有兩個,一個是拋異常,另外一個就是return。這里Quasar就是通過拋異常的方式來達到的,所以你會看到我上面的代碼會拋出SuspendExecution。但是如果你真捕獲到這個異常,那就說明有問題了,所以一般會這么寫。
@Suspendable public int f() { try { // do some stuff return g() * 2; } catch(SuspendExecution s) { //這里不應該捕獲到異常. throw new AssertionError(s); } }與Golang性能對比
在github上無意中發現一個有趣的benchmark,大致是測試各種語言在生成百萬actor/Fiber的開銷skynet。
大致的邏輯是先生成10個Fiber,每個Fiber再生成10個Fiber,直到生成1百萬個Fiber,然后每個Fiber做加法累積計算,并把結果發到channel里,這樣一直遞歸到根Fiber。后將最終結果發到channel。如果邏輯沒有錯的話結果應該是499999500000。我們搞個Quasar版的,來測試一下性能。
所有的測試都是基于我的Macbook Pro Retina 2013later。Quasar-0.7.5:JDK8,JDK 1.8.0_91,Golang 1.6
public class Skynet { private static final int RUNS = 4; private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited static void skynet(Channelc, long num, int size, int div) throws SuspendExecution, InterruptedException { if (size == 1) { c.send(num); return; } Channel rc = newChannel(BUFFER); long sum = 0L; for (int i = 0; i < div; i++) { long subNum = num + i * (size / div); new Fiber(() -> skynet(rc, subNum, size / div, div)).start(); } for (int i = 0; i < div; i++) sum += rc.receive(); c.send(sum); } public static void main(String[] args) throws Exception { //這里跑4次,是為了讓JVM預熱好做優化,所以我們以最后一個結果為準。 for (int i = 0; i < RUNS; i++) { long start = System.nanoTime(); Channel c = newChannel(BUFFER); new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start(); long result = c.receive(); long elapsed = (System.nanoTime() - start) / 1_000_000; System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)"); } } }
golang的代碼我就不貼了,大家可以從github上拿到,我這里直接貼出結果。
platform | time |
---|---|
Golang | 261ms |
Quasar | 612ms |
從Skynet測試中可以看出,Quasar的性能對比Golang還是有差距的,但是不應該達到兩倍多吧,經過向Quasar作者求證才得知這個測試并沒有測試出實際性能,只是測試調度開銷而已。
因為skynet方法內部幾乎沒有做任何事情,只是簡單的做了一個加法然后進一步的遞歸生成新的Fiber而已,相當于只是測試了Quasar生成并調度百萬Fiber所需要的時間而已。而Java里的加法操作開銷遠比生成Fiber的開銷要低,因此感覺整體性能不如golang(golang的coroutine是語言級別的)。
實際上我們在實際項目中生成的Fiber中不可能只做一下簡單的加法就退出,至少要花費1ms做一些簡單的事情吧,(Quasar里Fiber的調度差不多在us級別),所以我們考慮在skynet里加一些比較耗時的操作,比如隨機生成1000個整數并對其進行排序,這樣Fiber里算是有了相應的性能開銷,與調度的開銷相比,調度的開銷就可以忽略不計了。(大家可以把調度開銷想象成不定積分的常數)。
下面我分別為兩種語言了加了數組排序邏輯,并插在響應的Fiber里。
public class Skynet { private static Random random = new Random(); private static final int NUMBER_COUNT = 1000; private static final int RUNS = 4; private static final int BUFFER = 1000; // = 0 unbufferd, > 0 buffered ; < 0 unlimited private static void numberSort() { int[] nums = new int[NUMBER_COUNT]; for (int i = 0; i < NUMBER_COUNT; i++) nums[i] = random.nextInt(NUMBER_COUNT); Arrays.sort(nums); } static void skynet(Channelc, long num, int size, int div) throws SuspendExecution, InterruptedException { if (size == 1) { c.send(num); return; } //加入排序邏輯 numberSort(); Channel rc = newChannel(BUFFER); long sum = 0L; for (int i = 0; i < div; i++) { long subNum = num + i * (size / div); new Fiber(() -> skynet(rc, subNum, size / div, div)).start(); } for (int i = 0; i < div; i++) sum += rc.receive(); c.send(sum); } public static void main(String[] args) throws Exception { for (int i = 0; i < RUNS; i++) { long start = System.nanoTime(); Channel c = newChannel(BUFFER); new Fiber(() -> skynet(c, 0, 1_000_000, 10)).start(); long result = c.receive(); long elapsed = (System.nanoTime() - start) / 1_000_000; System.out.println((i + 1) + ": " + result + " (" + elapsed + " ms)"); } } }
const ( numberCount = 1000 loopCount = 1000000 ) //排序函數 func numberSort() { nums := make([]int, numberCount) for i := 0; i < numberCount; i++ { nums[i] = rand.Intn(numberCount) } sort.Ints(nums) } func skynet(c chan int, num int, size int, div int) { if size == 1 { c <- num return } //加了排序邏輯 numberSort() rc := make(chan int) var sum int for i := 0; i < div; i++ { subNum := num + i*(size/div) go skynet(rc, subNum, size/div, div) } for i := 0; i < div; i++ { sum += <-rc } c <- sum } func main() { c := make(chan int) start := time.Now() go skynet(c, 0, loopCount, 10) result := <-c took := time.Since(start) fmt.Printf("Result: %d in %d ms. ", result, took.Nanoseconds()/1e6) }
platform | time |
---|---|
Golang | 23615ms |
Quasar | 15448ms |
最后再進行一次測試,發現Java的性能優勢體現出來了。幾乎是golang的1.5倍,這也許是JVM/JDK經過多年優化的優勢。因為加了業務邏輯后,對比的就是各種庫以及編譯器對語言的優化了,協程調度開銷幾乎可以忽略不計。
為什么協程在Java里一直那么小眾其實早在JDK1的時代,Java的線程被稱為GreenThread,那個時候就已經有了Fiber,但是當時不能與操作系統實現N:M綁定,所以放棄了。現在Quasar憑借ForkJoinPool這個成熟的線程調度庫。
另外,如果你希望你的代碼能夠跑在Fiber里面,需要一個很大的前提條件,那就是你所有的庫,必須是異步無阻塞的,也就說必須類似于node.js上的庫,所有的邏輯都是異步回調,而自Java里基本上所有的庫都是同步阻塞的,很少見到異步無阻塞的。而且得益于J2EE,以及Java上的三大框架(SSH)洗腦,大部分Java程序員都已經習慣了基于線程,線性的完成一個業務邏輯,很難讓他們接受一種將邏輯割裂的異步編程模型。
但是隨著異步無阻塞這股風氣起來,以及相關的coroutine語言Golang大力推廣,人們越來越知道如何更好的榨干CPU性能(讓CPU避免不必要的等待,減少上下文切換),阻塞的行為基本發生在I/O上,如果能有一個庫能把所有的I/O行為都包裝成異步阻塞的話,那么Quasar就會有用武之地,JVM上公認的是異步網絡通信庫是Netty,通過Netty基本解決了網絡I/O問題,另外還有一個是文件I/O,而這個JDK7提供的NIO2就可以滿足,通過AsynchronousFileChannel即可。
剩下的就是如何將他們封裝成更友好的API了。目前能達到生產級別的這種異步工具庫,JVM上只有Vert.x3,封裝了Netty4,封裝了AsynchronousFileChannel,而且Vert.x官方也出了一個相對應的封裝了Quasar的庫vertx-sync。
Quasar目前是由一家商業公司Parallel Universe控制著,且有自己的一套體系,包括Quasar-actor,Quasar-galaxy等各個模塊,但是Quasar-core是開源的,此外Quasar自己也通過Fiber封裝了很多的第三方庫,目前全都在comsat這個項目里。隨便找一個項目看看,你會發現其實通過Quasar的Fiber去封裝第三方的同步庫還是很簡單的。
寫在最后異步無阻塞的編碼方式其實有很多種實現,比如node.js的提倡的Promise,對應到Java8的就是CompletableFuture。
另外事件響應式也算是一個比較流行的做法,比如ReactiveX系列,RxJava,Rxjs,RxSwift,等。我個人覺得RxJava是一個非常好的函數式響應實現(JDK9會有對應的JDK實現),但是我們不能要求所有的程序員一眼就提煉出業務里的functor,monad(這些能力需要長期浸淫在函數式編程思想里),反而RxJava特別適合用在前端與用戶交互的部分,因為用戶的點擊滑動行為是一個個真實的事件流,這也是為什么RxJava在Android端非常火的原因,而后端基本上都是通過Rest請求過來,每一個請求其實已經限定了業務范圍,不會再有復雜的事件邏輯,所以基本上RxJava在Vert.x這端只是做了一堆的flatmap,再加上微服務化,所有的業務邏輯都已經做了最小的邊界,所以順序的同步的編碼方式更適合寫業務邏輯的后端程序員。
所以這里Golang開了個好頭,但是Golang也有其自身的限制,比如不支持泛型,當然這個仁者見仁智者見智了,包的依賴管理比較弱,此外Golang沒有線程池的概念,如果coroutine里的邏輯發生了阻塞,那么整個程序會hang死。而這點Vert.x提供了一個Worker Pool的概念,可以將需要耗時執行的邏輯包到線程池里面,執行完后異步返回給EventLoop線程。
下一篇我們來研究一下vertx-sync,讓vert.x里所有的異步編碼方式同步化,徹底解決Vert.x里的Callback Hell。
本文系力譜宿云LeapCloud旗下 MaxLeap 團隊成員:劉小溪【原創】,轉載請務必注明作者及原創地址
原創首發地址:https://blog.maxleap.cn/archi...
第二篇續作:次時代Java編程(一):續 vertx-sync實踐
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/65986.html
摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務器的帶寬只有,某個資源的下載速度是,系統啟動個線程下載該資源并不會導致下載速度編程,所以在并發編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Jav...
摘要:相比與其他操作系統包括其他類系統有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協程。所以如何避免死鎖的產生,在我們使用并發編程時至關重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關鍵字(1) java多線程學習(二)syn...
摘要:定時器例子之前通過調用定時器,需要傳一個回調,然后所有的代碼邏輯都包在里面。這里定時器會阻塞在這一行,直到一秒后才會執行下面的一行。 之前介紹過quasar,如果你希望在vert.x項目里使用coroutine的話,建議使用vertx-sync。本篇將介紹vertx-sync。 showImg(/img/bVzIsu); 本來打算另起一篇,寫其他方面的東西,但是最近比較忙,就先寫一篇實...
摘要:特別是最火的協程框架也無法保存狀態,讓人非常惋惜。但是因為棧的本身無法持久化,所以也就無法持久化。其難度在于,假設整個要持久化的調用棧全部都是內的,比如純的。采取的是暴力地把整個棧區域拷貝到上的方式來保存其狀態。 python主流的協程實現有五種: cPython的generator cPython的greenlet cPython的fibers stackless python ...
閱讀 3279·2021-10-11 11:08
閱讀 4422·2021-09-22 15:54
閱讀 911·2019-08-30 15:56
閱讀 864·2019-08-30 15:55
閱讀 3540·2019-08-30 15:52
閱讀 1351·2019-08-30 15:43
閱讀 1937·2019-08-30 11:14
閱讀 2502·2019-08-29 16:11