摘要:什么是協(xié)程大多數(shù)的開發(fā)人員可能對進程,線程這兩個名字比較熟悉。但是為了追求最大力度的發(fā)揮硬件的性能和提升軟件的速度,出現(xiàn)了協(xié)程或者叫纖程,或者綠色線程。原理利用字節(jié)碼增強,將普通的代碼轉(zhuǎn)換為支持協(xié)程的代碼。
什么是協(xié)程
大多數(shù)的開發(fā)人員可能對進程,線程這兩個名字比較熟悉。但是為了追求最大力度的發(fā)揮硬件的性能和提升軟件的速度,出現(xiàn)了協(xié)程或者叫纖程(Fiber),或者綠色線程(GreenThread)。那我們來聊下什么是協(xié)程,以及在java中是怎么體現(xiàn)和運用協(xié)程的。
在說協(xié)程之前,我們先來回想下,現(xiàn)在大多數(shù)的程序中,都是使用了多線程技術(shù)來解決一些需要長時間阻塞的場景。JAVA中每個線程棧默認1024K,沒有辦法開成千上萬個線程,而且就算通過JVM參數(shù)調(diào)小,CPU也無法分配時間片給每個線程,大多數(shù)的線程還是在等待中,所以我們一般會使用
Runtime.getRuntime().availableProcessors()來配置線程數(shù)的大小(或者會根據(jù)實際情況調(diào)整,就不展開討論了),但是就算是我們開了新的線程,該線程也可能是在等待系統(tǒng)IO的返回或者網(wǎng)絡(luò)IO的返回,而且線程的切換有著大量的開銷。
為了解決上面說的問題,大家可能會想到回調(diào)。現(xiàn)在很多框架都是基于回調(diào)來解決那些耗時的操作。但層數(shù)嵌套多了反而會引起反人類的回調(diào)地獄,并且回調(diào)后就丟失原函數(shù)的上下文。其中的代表呢就比如說nodeJs。
終于可以來聊聊協(xié)程。它的基本原理是:在某個點掛起當前的任務(wù),并且保存棧信息,去執(zhí)行另一個任務(wù);等完成或達到某個條件時,在還原原來的棧信息并繼續(xù)執(zhí)行。上面提到的幾個點大家會想到JVM的結(jié)構(gòu),棧, 程序計數(shù)器等等,但是JVM原生是不支持這樣的操作的(至少java是不支持的,kotlin是可以的)。因此如果要在純java代碼里需要使用協(xié)程的話需要引入第三方包,如kilim,Quasar。而kilim已經(jīng)很久未更新了,那么我們來看看Quasar。
Quasar原理
1、利用字節(jié)碼增強,將普通的java代碼轉(zhuǎn)換為支持協(xié)程的代碼。
2、在調(diào)用pausable方法的時候,如果pause了就保存當前方法棧的State,停止執(zhí)行當前協(xié)程,將控制權(quán)交給調(diào)度器
3、調(diào)度器負責調(diào)度就緒的協(xié)程
4、協(xié)程resume的時候,自動恢復State,根據(jù)協(xié)程的pc計數(shù)跳轉(zhuǎn)到上次執(zhí)行的位置,繼續(xù)執(zhí)行。
這些第三方的框架大部分實現(xiàn)是一致的。通過對字節(jié)碼直接操作,在編譯期把你寫的代碼變?yōu)橹С謪f(xié)程的版本,并在運行時把你所有需要用到協(xié)程的部分由他來控制和調(diào)度,同時也支持在運行期這樣做。
Quasar中使用了拋異常的方式來中斷線程,但是 實際上如果我們捕獲了這個異常就會產(chǎn)生問題,所以一般是以這種方式來注冊:
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//這里不應(yīng)該捕獲到異常.
throw new AssertionError(s);
}
}
在調(diào)度方面,Quasar中默認使用了JDK7以上才有的ForkJoinPool,它的優(yōu)勢就在于空閑的線程會去從其他線程任務(wù)隊列尾部”偷取”任務(wù)來自己處理,因此也叫work-stealing功能。這個功能可以大大的利用CPU資源,不讓線程白白空閑著。
Quasar模塊
Fiber
Fiber可以認為是一個微線程,使用方式基本上和Thread相同,啟動start:
new Fiber
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();
new Fiber
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();
其實它更類似于一個CallBack,是可以攜帶返回值的,并且可以拋異常SuspendExecution,InterruptedException。你也可以向其中傳遞SuspendableRunnable 或 SuspendableCallable 給Fiber的構(gòu)造函數(shù)。你甚至可以像線程一樣調(diào)用join(),或者get()來阻塞線程等待他完成。
當Fiber比較大的時候,F(xiàn)iber可以在調(diào)用parkAndSerialize 方法時被序列化,在調(diào)用unparkSerialized時被反序列化。
從以上我們可以看出Fiber與Thread非常類似,極大的減少了遷移的成本。
FiberScheduler
FiberScheduler是Quasar框架中核心的任務(wù)調(diào)度器,負責管理任務(wù)的工作者線程WorkerThread,之前提到的他是一個FiberForkJoinScheduler。
ForkJoinPool的默認初始化個數(shù)為Runtime.getRuntime().availableProcessors()。
instrumentation
當一個類被加載時,Quasar的instrumentation模塊 (使用 Java agent時) 搜索suspendable 方法。每一個suspendable 方法 f通過下面的方式 instrument:
它搜索對其它suspendable方法的調(diào)用。對suspendable方法g的調(diào)用,一些代碼會在這個調(diào)用g的前后被插入,它們會保存和恢復fiber棧本地變量的狀態(tài),記錄這個暫停點。在這個“suspendable function chain”的最后,我們會發(fā)現(xiàn)對Fiber.park的調(diào)用。park暫停這個fiber,扔出 SuspendExecution異常。
當g block的時候,SuspendExecution異常會被Fiber捕獲。 當Fiber被喚醒(使用unpark), 方法f會被調(diào)用, 執(zhí)行記錄顯示它被block在g的調(diào)用上,所以程序會立即跳到f調(diào)用g的那一行,然后調(diào)用它。最終我們會到達暫停點,然后繼續(xù)執(zhí)行。當g返回時, f中插入的代碼會恢復f的本地變量。
過程聽起來很復雜,但是它只會帶來3% ~ 5%的性能的損失。
下面看一個簡單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調(diào)用m2和m3,所以也聲明拋出這個異常,最后這個異常會被Fiber所捕獲:
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = "m1"; System.out.println("m1 begin"); m = m2(); m = m3(); System.out.println("m1 end"); System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return "m2";
}
static String m3() throws SuspendExecution, InterruptedException {
return "m3";
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { m1(); } }).start();
}
}
// 反編譯后的代碼
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
我并沒有更深入的去了解Quasar的實現(xiàn)細節(jié)以及調(diào)度算法,有興趣的讀者可以翻翻它的代碼。
實戰(zhàn)
public class Helloworld {
@Suspendable
static void m1() throws InterruptedException, SuspendExecution {
String m = "m1"; //System.out.println("m1 begin"); m = m2(); //System.out.println("m1 end"); //System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
String m = m3(); Strand.sleep(1000); return m;
}
//or define in META-INF/suspendables
@Suspendable
static String m3() {
List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList()); return l.toString();
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
int count = 10000; testThreadpool(count); testFiber(count);
}
static void testThreadpool(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); ExecutorService es = Executors.newFixedThreadPool(200); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { es.submit(() -> { long start = System.currentTimeMillis(); try { m1(); } catch (InterruptedException e) { e.printStackTrace(); } catch (SuspendExecution suspendExecution) { suspendExecution.printStackTrace(); } start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); }); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("thread pool took: " + t + ", latency: " + l + " ms"); es.shutdownNow();
}
static void testFiber(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { new Fiber("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { long start = System.currentTimeMillis(); m1(); start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); } }).start(); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("fiber took: " + t + ", latency: " + l + " ms");
}
}
OUTPUT:
1
2
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms
可以看到很明顯的時間差距,存在多線程阻塞的情況下,協(xié)程的性能非常的好,但是。如果把sleep這段去掉,F(xiàn)iber的性能反而更差:
這說明Fiber并不意味著它可以在所有的場景中都可以替換Thread。當fiber的代碼經(jīng)常會被等待其它fiber阻塞的時候,就應(yīng)該使用fiber。
對于那些需要CPU長時間計算的代碼,很少遇到阻塞的時候,就應(yīng)該首選thread
擴展
其實協(xié)程這個概念在其他的語言中有原生的支持,如:
kotlin 1.30之后已經(jīng)穩(wěn)定
: https://www.kotlincn.net/docs...
golang : https://gobyexample.com/gorou...
python : http://www.gevent.org/content...~
在這些語言中協(xié)程就看起來至少沒這么奇怪或者難以理解了,而且開發(fā)起開也相比java簡單很多。
總結(jié)
協(xié)程的概念也不算是很新了,但是在像Java這樣的語言或者特定的領(lǐng)域并不是很火,也并沒有完全普及。不是很明白是它的學習成本高,還是說應(yīng)用場景是在太小了。但是當我聽到這個概念的時候確實是挺好奇,也挺好奇的。也希望之后會有更多的框架和特性來簡化我們苦逼程序員的開發(fā)~~
參考文獻
http://docs.paralleluniverse....
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/74561.html
摘要:本周提交的一份增強建議草案要求將虛擬線程作為標準版的一部分進行預覽。虛擬線程目的是更好地支持編寫和維護高吞吐量并發(fā)應(yīng)用程序。該提案指出,使用虛擬線程不需要學習新的編程模型。我們知道 Go 語言最大亮點之一就是原生支持并發(fā),這得益于 Go 語言的協(xié)程機制。一個 go 語句就可以發(fā)起一個協(xié)程 (goroutin)。 協(xié)程本質(zhì)上是一種用戶態(tài)線程,它不需要操作系統(tǒng)來進行調(diào)度,而是由用戶程序自行管理...
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時至關(guān)重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關(guān)鍵字(1) java多線程學習(二)syn...
摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務(wù)器的帶寬只有,某個資源的下載速度是,系統(tǒng)啟動個線程下載該資源并不會導致下載速度編程,所以在并發(fā)編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Jav...
摘要:很長一段時間,我都很天真的認為,特別是以為代表的庫,才是協(xié)程的樂土。里是沒法實現(xiàn)協(xié)程,更別說實現(xiàn)這樣可以的協(xié)程的。咱真的是太井底之蛙了。不完全列表如下還有一個據(jù)作者說是最的這些協(xié)程庫的實現(xiàn)方式都是類似的,都是通過字節(jié)碼生成達到的目的。 很長一段時間,我都很天真的認為python,特別是以gevent為代表的庫,才是協(xié)程的樂土。Java里是沒法實現(xiàn)協(xié)程,更別說實現(xiàn)stackless py...
摘要:線程線程,是程序執(zhí)行流的最小單元。由于線程之間的相互制約,致使線程在運行中呈現(xiàn)出間斷性。線程的狀態(tài)機線程也有就緒阻塞和運行三種基本狀態(tài)。在單個程序中同時運行多個線程完成不同的工作,稱為多線程。可以視為不同線程競爭一把鎖。 進程線程協(xié)程 進程 進程是一個實體。每一個進程都有它自己的地址空間, 文本區(qū)域(text region) 數(shù)據(jù)區(qū)域(data region) 堆棧(stack re...
閱讀 1884·2021-11-17 09:33
閱讀 6470·2021-10-12 10:20
閱讀 2299·2021-09-22 15:50
閱讀 1783·2021-09-22 15:10
閱讀 615·2021-09-10 10:51
閱讀 618·2021-09-10 10:50
閱讀 3020·2021-08-11 11:19
閱讀 1776·2019-08-30 15:55