摘要:分層支持分層一種樹形結構,通過構造函數可以指定當前待構造的對象的父結點。當一個的參與者數量變成時,如果有該有父結點,就會將它從父結點中溢移除。當首次將某個結點鏈接到樹中時,會同時向該結點的父結點注冊一個參與者。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、Phaser簡介
Phaser是JDK1.7開始引入的一個同步工具類,適用于一些需要分階段的任務的處理。它的功能與 CyclicBarrier和CountDownLatch有些類似,類似于一個多階段的柵欄,并且功能更強大,我們來比較下這三者的功能:
同步器 | 作用 |
---|---|
CountDownLatch | 倒數計數器,初始時設定計數器值,線程可以在計數器上等待,當計數器值歸0后,所有等待的線程繼續執行 |
CyclicBarrier | 循環柵欄,初始時設定參與線程數,當線程到達柵欄后,會等待其它線程的到達,當到達柵欄的總數滿足指定數后,所有等待的線程繼續執行 |
Phaser | 多階段柵欄,可以在初始時設定參與線程數,也可以中途注冊/注銷參與者,當到達的參與者數量滿足柵欄設定的數量后,會進行階段升級(advance) |
Phaser中有一些比較重要的概念,理解了這些概念才能理解Phaser的功能。
phase(階段)我們知道,在CyclicBarrier中,只有一個柵欄,線程在到達柵欄后會等待其它線程的到達。
Phaser也有柵欄,在Phaser中,柵欄的名稱叫做phase(階段),在任意時間點,Phaser只處于某一個phase(階段),初始階段為0,最大達到Integerr.MAX_VALUE,然后再次歸零。當所有parties參與者都到達后,phase值會遞增。
如果看過之前關于CyclicBarrier的文章,就會知道,Phaser中的phase(階段)這個概念其實和CyclicBarrier中的Generation很相似,只不過Generation沒有計數。
parties(參與者)parties(參與者)其實就是CyclicBarrier中的參與線程的概念。
CyclicBarrier中的參與者在初始構造指定后就不能變更,而Phaser既可以在初始構造時指定參與者的數量,也可以中途通過register、bulkRegister、arriveAndDeregister等方法注冊/注銷參與者。
arrive(到達) / advance(進階)Phaser注冊完parties(參與者)之后,參與者的初始狀態是unarrived的,當參與者到達(arrive)當前階段(phase)后,狀態就會變成arrived。當階段的到達參與者數滿足條件后(注冊的數量等于到達的數量),階段就會發生進階(advance)——也就是phase值+1。
Termination(終止)代表當前Phaser對象達到終止狀態,有點類似于CyclicBarrier中的柵欄被破壞的概念。
Tiering(分層)Phaser支持分層(Tiering) —— 一種樹形結構,通過構造函數可以指定當前待構造的Phaser對象的父結點。之所以引入Tiering,是因為當一個Phaser有大量參與者(parties)的時候,內部的同步操作會使性能急劇下降,而分層可以降低競爭,從而減小因同步導致的額外開銷。
在一個分層Phasers的樹結構中,注冊和撤銷子Phaser或父Phaser是自動被管理的。當一個Phaser的參與者(parties)數量變成0時,如果有該Phaser有父結點,就會將它從父結點中溢移除。
關于Phaser的分層,后續我們在講Phaser原理時會進一步討論。
二、Phaser示例為了更好的理解Phaser的功能,我們來看幾個示例:
示例一通過Phaser控制多個線程的執行時機:有時候我們希望所有線程到達指定點后再同時開始執行,我們可以利用CyclicBarrier或CountDownLatch來實現,這里給出使用Phaser的版本。
public class PhaserTest1 { public static void main(String[] args) { Phaser phaser = new Phaser(); for (int i = 0; i < 10; i++) { phaser.register(); // 注冊各個參與者線程 new Thread(new Task(phaser), "Thread-" + i).start(); } } } class Task implements Runnable { private final Phaser phaser; Task(Phaser phaser) { this.phaser = phaser; } @Override public void run() { int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務,當前phase =" + i + ""); } }
輸出:
Thread-8: 執行完任務,當前phase =1 Thread-4: 執行完任務,當前phase =1 Thread-3: 執行完任務,當前phase =1 Thread-0: 執行完任務,當前phase =1 Thread-5: 執行完任務,當前phase =1 Thread-6: 執行完任務,當前phase =1 Thread-7: 執行完任務,當前phase =1 Thread-9: 執行完任務,當前phase =1 Thread-1: 執行完任務,當前phase =1 Thread-2: 執行完任務,當前phase =1
以上示例中,創建了10個線程,并通過register方法注冊Phaser的參與者數量為10。當某個線程調用arriveAndAwaitAdvance方法后,arrive數量會加1,如果數量沒有滿足總數(參與者數量10),當前線程就是一直等待,當最后一個線程到達后,所有線程都會繼續往下執行。
注意:arriveAndAwaitAdvance方法是不響應中斷的,也就是說即使當前線程被中斷,arriveAndAwaitAdvance方法也不會返回或拋出異常,而是繼續等待。如果希望能夠響應中斷,可以參考awaitAdvanceInterruptibly方法。示例二
通過Phaser實現開關。在以前講CountDownLatch時,我們給出過以CountDownLatch實現開關的示例,也就是說,我們希望一些外部條件得到滿足后,然后打開開關,線程才能繼續執行,我們看下如何用Phaser來實現此功能。
public class PhaserTest2 { public static void main(String[] args) throws IOException { Phaser phaser = new Phaser(1); // 注冊主線程,當外部條件滿足時,由主線程打開開關 for (int i = 0; i < 10; i++) { phaser.register(); // 注冊各個參與者線程 new Thread(new Task2(phaser), "Thread-" + i).start(); } // 外部條件:等待用戶輸入命令 System.out.println("Press ENTER to continue"); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); reader.readLine(); // 打開開關 phaser.arriveAndDeregister(); System.out.println("主線程打開了開關"); } } class Task2 implements Runnable { private final Phaser phaser; Task2(Phaser phaser) { this.phaser = phaser; } @Override public void run() { int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務,當前phase =" + i + ""); } }
輸出:
主線程打開了開關 Thread-7: 執行完任務,當前phase =1 Thread-4: 執行完任務,當前phase =1 Thread-3: 執行完任務,當前phase =1 Thread-1: 執行完任務,當前phase =1 Thread-0: 執行完任務,當前phase =1 Thread-9: 執行完任務,當前phase =1 Thread-8: 執行完任務,當前phase =1 Thread-2: 執行完任務,當前phase =1 Thread-5: 執行完任務,當前phase =1 Thread-6: 執行完任務,當前phase =1
以上示例中,只有當用戶按下回車之后,任務才真正開始執行。這里主線程Main相當于一個協調者,用來控制開關打開的時機,arriveAndDeregister方法不會阻塞,該方法會將到達數加1,同時減少一個參與者數量,最終返回線程到達時的phase值。
示例三通過Phaser控制任務的執行輪數
public class PhaserTest3 { public static void main(String[] args) throws IOException { int repeats = 3; // 指定任務最多執行的次數 Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------"); return phase + 1 >= repeats || registeredParties == 0; } }; for (int i = 0; i < 10; i++) { phaser.register(); // 注冊各個參與者線程 new Thread(new Task3(phaser), "Thread-" + i).start(); } } } class Task3 implements Runnable { private final Phaser phaser; Task3(Phaser phaser) { this.phaser = phaser; } @Override public void run() { while (!phaser.isTerminated()) { //只要Phaser沒有終止, 各個線程的任務就會一直執行 int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務"); } } }
輸出:
---------------PHASE[0],Parties[5] --------------- Thread-4: 執行完任務 Thread-1: 執行完任務 Thread-2: 執行完任務 Thread-3: 執行完任務 Thread-0: 執行完任務 ---------------PHASE[1],Parties[5] --------------- Thread-0: 執行完任務 Thread-3: 執行完任務 Thread-1: 執行完任務 Thread-4: 執行完任務 Thread-2: 執行完任務 ---------------PHASE[2],Parties[5] --------------- Thread-2: 執行完任務 Thread-4: 執行完任務 Thread-1: 執行完任務 Thread-0: 執行完任務 Thread-3: 執行完任務
以上示例中,我們在創建Phaser對象時,覆寫了onAdvance方法,這個方法類似于CyclicBarrier中的barrierAction任務。
也就是說,當最后一個參與者到達時,會觸發onAdvance方法,入參phase表示到達時的phase值,registeredParties表示到達時的參與者數量,返回true表示需要終止Phaser。
我們通過phase + 1 >= repeats ,來控制階段(phase)數的上限為2(從0開始計),最終控制了每個線程的執行任務次數為repeats次。
示例四Phaser支持分層功能,我們先來考慮下如何用利用Phaser的分層來實現高并發時的優化,在示例三中,我們其實創建了10個任務,然后10個線程共用一個Phaser對象,如下圖:
如果任務數繼續增大,那么同步產生的開銷會非常大,利用Phaser分層的功能,我們可以限定每個Phaser對象的最大使用線程(任務數),如下圖:
可以看到,上述Phasers其實構成了一顆多叉樹,如果任務數繼續增多,還可以將Phaser的葉子結點繼續分裂,然后將分裂出的子結點供工作線程使用。
public class PhaserTest4 { private static final int TASKS_PER_PHASER = 4; // 每個Phaser對象對應的工作線程(任務)數 public static void main(String[] args) throws IOException { int repeats = 3; // 指定任務最多執行的次數 Phaser phaser = new Phaser() { @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------"); return phase + 1 >= repeats || registeredParties == 0; } }; Tasker[] taskers = new Tasker[10]; build(taskers, 0, taskers.length, phaser); // 根據任務數,為每個任務分配Phaser對象 for (int i = 0; i < taskers.length; i++) { // 執行任務 Thread thread = new Thread(taskers[i]); thread.start(); } } private static void build(Tasker[] taskers, int lo, int hi, Phaser phaser) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(taskers, i, j, new Phaser(phaser)); } } else { for (int i = lo; i < hi; ++i) taskers[i] = new Tasker(i, phaser); } } } class Task4 implements Runnable { private final Phaser phaser; Task4(Phaser phaser) { this.phaser = phaser; this.phaser.register(); } @Override public void run() { while (!phaser.isTerminated()) { //只要Phaser沒有終止, 各個線程的任務就會一直執行 int i = phaser.arriveAndAwaitAdvance(); // 等待其它參與者線程到達 // do something System.out.println(Thread.currentThread().getName() + ": 執行完任務"); } } }
輸出: ---------------PHASE[0],Parties[3] --------------- Thread-9: 執行完任務 Thread-6: 執行完任務 Thread-5: 執行完任務 Thread-4: 執行完任務 Thread-1: 執行完任務 Thread-0: 執行完任務 Thread-7: 執行完任務 Thread-8: 執行完任務 Thread-2: 執行完任務 Thread-3: 執行完任務 ---------------PHASE[1],Parties[3] --------------- Thread-3: 執行完任務 Thread-7: 執行完任務 Thread-0: 執行完任務 Thread-1: 執行完任務 Thread-5: 執行完任務 Thread-8: 執行完任務 Thread-2: 執行完任務 Thread-9: 執行完任務 Thread-6: 執行完任務 Thread-4: 執行完任務 ---------------PHASE[2],Parties[3] --------------- Thread-4: 執行完任務 Thread-2: 執行完任務 Thread-8: 執行完任務 Thread-0: 執行完任務 Thread-3: 執行完任務 Thread-9: 執行完任務 Thread-6: 執行完任務 Thread-7: 執行完任務 Thread-1: 執行完任務 Thread-5: 執行完任務三、Phaser原理
Phaser是本系列至今為止,內部結構最為復雜的同步器之一。在開始深入Phaser原理之前,我們有必要先來講講Phaser的內部組織結構和它的設計思想。
Phaser的內部結構之前我們說過,Phaser支持樹形結構,在示例四中,也給出了一個通過分層提高并發性和程序執行效率的例子。一個復雜分層結構的Phaser樹的內部結構如下圖所示:
上面圖中的幾點關鍵點:
樹的根結點root鏈接著兩個“無鎖棧”——Treiber Stack,用于保存等待線程(比如當線程等待Phaser進入下一階段時,會根據當前階段的奇偶性,把自己掛到某個棧中),所有Phaser對象都共享這兩個棧。
當首次將某個Phaser結點鏈接到樹中時,會同時向該結點的父結點注冊一個參與者。
為什么需要向父結點注冊參與者?
首先我們要明白對于Phaser來說,什么時候會發生躍遷(advance)進入下一階段?
廢話,當然是等它所有參與者都到達的時候。那么它所等待的參與者都包含那幾類呢?
①對于一個孤立的Phaser結點(也可以看成是只有一個根結點的樹)
其等待的參與者,就是顯式注冊的參與者,這也是最常見的情況。
比如下圖,如果有10個Task共用這個Phaser,那等待的參與者數就是10,當10個線程都到達后,Phaser就會躍遷至下一階段。
②對于一個非孤立的Phaser葉子結點,比如下圖中標綠的葉子結點
這種情況和①一樣,子Phaser1和子Phaser2等待的參與者數是4,子Phaser3等待的參與者數是2。
③對于一個非孤立非葉子的Phaser結點,比如上圖中標藍色的結點
這是最特殊的一種情況,這也是Phaser同步器關于分層的主要設計思路。
這種情況,結點所等待的參與者數目包含兩部分:
直接顯式注冊的參與者(通過構造器或register方法)。——等于0
子結點的數目。——等于3
也就是說在上圖中,當左一的子Phaser1的4個參與者都到達后,它會通知父結點Phaser,自己的狀態已經OK了,這時Phaser會認為子Phaser1已經準備就緒,會將自己的到達者數量加1,同理,當子Phaser2和子Phaser3的所有參與者分別到達后,它們也會依次通知Phaser,只有當Phaser(根結點)的到達者數量為3時,才會釋放“無鎖棧”中等待著的線程,并將階段數phase增加1。
這是一種層層遞歸的設計,只要當根結點的所有參與者都到達后(也就是到達參數者數等于其子結點數),所有等待線程才會放行,柵欄才會進入下一階段。
了解了上面這些,我們再來看Phaser的源碼。
同步狀態定義Phaser使用一個long類型來保存同步狀態值State,并按位劃分不同區域的含義,通過掩碼和位運算進行賦值和操作:
“無鎖棧”——Treiber Stack,保存在Phaser樹的根結點中,其余所有Phaser子結點共享這兩個棧:
結點的定義非常簡單,內部保存了線程信息和Phsaer對象信息:
注意:ForkJoinPool.ManagedBlocker是當棧包含ForkJoinWorkerThread類型的QNode阻塞的時候,ForkJoinPool內部會增加一個工作線程來保證并行度,后續講ForkJoin框架時我們會進行分析。
Phaser的構造器Phaser一共有4個構造器,可以看到,最終其實都是調用了Phaser(Phaser parent, int parties)這個構造器。
Phaser(Phaser parent, int parties)的內部實現如下,關鍵就是給當前的Phaser對象指定父結點時,如果當前Phaser的參與者不為0,需要向父Phaser注冊一個參與者(代表當前結點本身):
Phaser提供了兩個注冊參與者的方法:
register:注冊單個參與者
bulkRegister:批量注冊參與者
這兩個方法都很簡單,內部調用了doRegister方法:
/** * 注冊指定數目{#registrations}的參與者 */ private int doRegister(int registrations) { // 首先計算注冊后當前State要調整的值adjust long adjust = ((long) registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (; ; ) { long s = (parent == null) ? state : reconcileState(); // reconcileState()調整當前Phaser的State與root一致 int counts = (int) s; int parties = counts >>> PARTIES_SHIFT; // 參與者數目 int unarrived = counts & UNARRIVED_MASK; // 未到達的數目 if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int) (s >>> PHASE_SHIFT); // 當前Phaser所處的階段phase if (phase < 0) break; if (counts != EMPTY) { // CASE1: 當前Phaser已經注冊過參與者 if (parent == null || reconcileState() == s) { if (unarrived == 0) // 參與者已全部到達柵欄, 當前Phaser正在Advance, 需要阻塞等待這一過程完成 root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) // 否則,直接更新State break; } } else if (parent == null) { // CASE2: 當前Phaser未注冊過參與者(第一次注冊),且沒有父結點 long next = ((long) phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) // CAS更新當前Phaser的State值 break; } else { // CASE3: 當前Phaser未注冊過參與者(第一次注冊),且有父結點 synchronized (this) { if (state == s) { phase = parent.doRegister(1); // 向父結點注冊一個參與者 if (phase < 0) break; while (!UNSAFE.compareAndSwapLong(this, stateOffset, s, ((long) phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int) (root.state >>> PHASE_SHIFT); } break; } } } } return phase; }
doRegister方法用來給當前Phaser對象注冊參與者,主要有三個分支:
①當前Phaser已經注冊過參與者
如果參與者已經全部到達柵欄,則當前線程需要阻塞等待(因為此時phase正在變化,增加1到下一個phase),否則直接更新State。
②當前Phaser未注冊過參與者(第一次注冊),且沒有父結點
這種情況最簡單,直接更新當前Phaser的State值。
③當前Phaser未注冊過參與者(第一次注冊),且有父結點
說明當前Phaser是新加入的葉子結點,需要向父結點注冊自身,同時更新自身的State值。
注意: reconcileState方法比較特殊,因為當出現樹形結構時,根結點首先進行phase的更新,所以需要顯式同步,使當前結點和根結點保持一致。
另外,阻塞等待調用的是internalAwaitAdvance方法,其實就是根據當前階段phase,將線程包裝成結點加入到root結點所指向的某個“無鎖棧”中:
/** * internalAwaitAdvance的主要邏輯就是:當前參與者線程等待Phaser進入下一個階段(就是phase值變化). * @return 返回新的階段 */ private int internalAwaitAdvance(int phase, QNode node) { // assert root == this; releaseWaiters(phase - 1); // 清空不用的Treiber Stack(奇偶Stack交替使用) boolean queued = false; // 入隊標識 int lastUnarrived = 0; int spins = SPINS_PER_ARRIVAL; long s; int p; while ((p = (int) ((s = state) >>> PHASE_SHIFT)) == phase) { if (node == null) { // spinning in noninterruptible mode int unarrived = (int) s & UNARRIVED_MASK; if (unarrived != lastUnarrived && (lastUnarrived = unarrived) < NCPU) spins += SPINS_PER_ARRIVAL; boolean interrupted = Thread.interrupted(); if (interrupted || --spins < 0) { // need node to record intr node = new QNode(this, phase, false, false, 0L); node.wasInterrupted = interrupted; } } else if (node.isReleasable()) // done or aborted break; else if (!queued) { // 將結點壓入棧頂 AtomicReference參與者到達并等待head = (phase & 1) == 0 ? evenQ : oddQ; QNode q = node.next = head.get(); if ((q == null || q.phase == phase) && (int) (state >>> PHASE_SHIFT) == phase) // avoid stale enq queued = head.compareAndSet(q, node); } else { try { // 阻塞等待 ForkJoinPool.managedBlock(node); } catch (InterruptedException ie) { node.wasInterrupted = true; } } } ? if (node != null) { if (node.thread != null) node.thread = null; // avoid need for unpark() if (node.wasInterrupted && !node.interruptible) Thread.currentThread().interrupt(); if (p == phase && (p = (int) (state >>> PHASE_SHIFT)) == phase) return abortWait(phase); // possibly clean up on abort } releaseWaiters(phase); return p; }
arriveAndAwaitAdvance的主要邏輯如下:
首先將同步狀態值State中的未到達參與者數量減1,然后判斷未到達參與者數量是否為0?
如果不為0,則阻塞當前線程,以等待其他參與者到來;
如果為0,說明當前線程是最后一個參與者,如果有父結點則對父結點遞歸調用該方法。(因為只有根結點的未到達參與者數目為0時),才會進階phase。
四、Phaser類/接口聲明 類聲明 構造器聲明 接口聲明文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76698.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:線程可以調用的方法進入阻塞,當計數值降到時,所有之前調用阻塞的線程都會釋放。注意的初始計數值一旦降到,無法重置。 showImg(https://segmentfault.com/img/remote/1460000016012041); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 一、CountDownLatch簡介 CountDow...
摘要:二接口簡介可以看做是類的方法的替代品,與配合使用。當線程執行對象的方法時,當前線程會立即釋放鎖,并進入對象的等待區,等待其它線程喚醒或中斷。 showImg(https://segmentfault.com/img/remote/1460000016012601); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 本系列文章中所說的juc-...
摘要:注意線程與本地操作系統的線程是一一映射的。固定線程數的線程池提供了兩種創建具有固定線程數的的方法,固定線程池在初始化時確定其中的線程總數,運行過程中會始終維持線程數量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發于一世流云專欄:https://segmentfault.com/blog... ...
摘要:公平策略在多個線程爭用鎖的情況下,公平策略傾向于將訪問權授予等待時間最長的線程。使用方式的典型調用方式如下二類原理的源碼非常簡單,它通過內部類實現了框架,接口的實現僅僅是對的的簡單封裝,參見原理多線程進階七鎖框架獨占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發于一世流云的專欄:https...
閱讀 728·2021-08-17 10:11
閱讀 1594·2019-08-30 11:15
閱讀 1017·2019-08-26 13:54
閱讀 3502·2019-08-26 11:47
閱讀 1212·2019-08-26 10:20
閱讀 2816·2019-08-23 18:35
閱讀 1213·2019-08-23 17:52
閱讀 1297·2019-08-23 16:19