摘要:本人郵箱歡迎轉載轉載請注明網址代碼已經全部托管有需要的同學自行下載引言講完了和今天講一個跟這兩個類有點類似的移相器中引入了一種新的可重復使用的同步屏障稱為移相器擁有與和類似的功勞但是這個類提供了更加靈活的應用和都是只適用于固定數量的參與者
引言本人郵箱:
歡迎轉載,轉載請注明網址 http://blog.csdn.net/tianshi_kco
github: https://github.com/kco1989/kco
代碼已經全部托管github有需要的同學自行下載
講完了CyclicBarrier和CountDownLatch,今天講一個跟這兩個類有點類似的Phaser.->移相器
例子1 用Phaser代替CyclicBarrierjava7中引入了一種新的可重復使用的同步屏障,稱為移相器Phaser.Phaser擁有與CyclicBarrier和CountDownLatch類似的功勞.但是這個類提供了更加靈活的應用.CountDownLatch和CyclicBarrier都是只適用于固定數量的參與者.移相器適用于可變數目的屏障,在這個意義上,可以在任何時間注冊新的參與者.并且在抵達屏障是可以注銷已經注冊的參與者.因此,注冊到同步移相器的參與者的數目可能會隨著時間的推移而變化.如CyclicBarrier一樣,移相器可以重復使用,這意味著當前參與者到達移相器后,可以再一次注冊自己并等待另一次到達.因此,移相器會有多代.一旦為某個特定相位注冊的所有參與者都到達移相器,就增加相數.相數從零開始,在達到Integer.MAX_VALUE后,再次繞回0.當移相器發生變化時,通過重寫onAdvance方法,可以自行可選操作.這個方法也可用于終止移相器.移相器一旦被終止,所有的同步方法就會立即返回,并嘗試注冊新的失敗的參與者.
移相器的另一個重要特征是:移相器可能是分層的,這允許你以樹形結構來安排移相器以減少競爭.很明顯,更小的組將擁有更少的競爭同步的參與者.因此,將大量的參與者分成較小的組可以減少競爭.雖然創建移相器能增加中的吞吐量,但是這需要更多的開銷.最后,移相器的另一個重要的特征在于監控功能,使用獨立的對象可以監視移相器的當前狀態.監視器可以查詢注冊到移相器的參與者的數量,以及已經到達和還沒有到達某個特定相數的參與者的數量.1
將之前(九)java多線程之CyclicBarrier旅游的例子改寫一下,
Phaser替代CyclicBarrier比較簡單,CyclicBarrier的await()方法可以直接用Phaser的arriveAndAwaitAdvance()方法替代
CyclicBarrier與Phaser:CyclicBarrier只適用于固定數量的參與者,而Phaser適用于可變數目的屏障.
TourismRunnable 旅游類
public class TourismRunnable implements Runnable{ Phaser phaser; Random random; public TourismRunnable(Phaser phaser) { this.phaser = phaser; this.random = new Random(); } @Override public void run() { tourism(); } /** * 旅游過程 */ private void tourism() { goToStartingPoint(); goToHotel(); goToTourismPoint1(); goToTourismPoint2(); goToTourismPoint3(); goToEndPoint(); } /** * 裝備返程 */ private void goToEndPoint() { goToPoint("飛機場,準備登機回家"); } /** * 到達旅游點3 */ private void goToTourismPoint3() { goToPoint("旅游點3"); } /** * 到達旅游點2 */ private void goToTourismPoint2() { goToPoint("旅游點2"); } /** * 到達旅游點1 */ private void goToTourismPoint1() { goToPoint("旅游點1"); } /** * 入住酒店 */ private void goToHotel() { goToPoint("酒店"); } /** * 出發點集合 */ private void goToStartingPoint() { goToPoint("出發點"); } private int getRandomTime(){ int time = this.random.nextInt(400) + 100; try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } return time; } private void goToPoint(String point){ try { String name = Thread.currentThread().getName(); System.out.println(name + " 花了 " + getRandomTime() + " 時間才到了" + point); phaser.arriveAndAwaitAdvance(); } catch (Exception e) { e.printStackTrace(); } } }
TestMain 測試類
public class TestMain { public static void main(String[] args) { String name = "明剛紅麗黑白"; Phaser phaser = new Phaser(name.length()); ListtourismThread = new ArrayList<>(); for (char ch : name.toCharArray()){ tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch)); } for (Thread thread : tourismThread){ thread.start(); } } }
運行結果
小紅 花了 122 時間才到了出發點 小明 花了 259 時間才到了出發點 小白 花了 267 時間才到了出發點 小麗 花了 306 時間才到了出發點 小剛 花了 385 時間才到了出發點 小黑 花了 486 時間才到了出發點 小白 花了 299 時間才到了酒店 小剛 花了 345 時間才到了酒店 小黑 花了 449 時間才到了酒店 小麗 花了 452 時間才到了酒店 小明 花了 462 時間才到了酒店 小紅 花了 480 時間才到了酒店 小麗 花了 107 時間才到了旅游點1 小紅 花了 141 時間才到了旅游點1 小明 花了 212 時間才到了旅游點1 小黑 花了 286 時間才到了旅游點1 小白 花了 305 時間才到了旅游點1 小剛 花了 386 時間才到了旅游點1 小麗 花了 119 時間才到了旅游點2 小黑 花了 222 時間才到了旅游點2 小明 花了 259 時間才到了旅游點2 小剛 花了 299 時間才到了旅游點2 小紅 花了 354 時間才到了旅游點2 小白 花了 422 時間才到了旅游點2 小麗 花了 112 時間才到了旅游點3 小白 花了 182 時間才到了旅游點3 小剛 花了 283 時間才到了旅游點3 小明 花了 295 時間才到了旅游點3 小紅 花了 386 時間才到了旅游點3 小黑 花了 483 時間才到了旅游點3 小黑 花了 152 時間才到了飛機場,準備登機回家 小白 花了 178 時間才到了飛機場,準備登機回家 小明 花了 248 時間才到了飛機場,準備登機回家 小紅 花了 362 時間才到了飛機場,準備登機回家 小麗 花了 428 時間才到了飛機場,準備登機回家 小剛 花了 432 時間才到了飛機場,準備登機回家
例子2 用Phaser代替CountDownLatchPhaser(int parties) 創建一個指定parties個線程參與同步任務.
``
將之前(十)java多線程之CountDownLatch旅游回來坐飛機的例子改寫一下,
CountDownLatch主要使用的有2個方法
await()方法,可以使線程進入等待狀態,在Phaser中,與之對應的方法是awaitAdvance(int n)。
countDown(),使計數器減一,當計數器為0時所有等待的線程開始執行,在Phaser中,與之對應的方法是arrive()
Airplane飛機類
public class Airplane { private Phaser phaser; private Random random; public Airplane(int peopleNum){ phaser = new Phaser(peopleNum); random = new Random(); } /** * 下機 */ public void getOffPlane(){ try { String name = Thread.currentThread().getName(); Thread.sleep(random.nextInt(500)); System.out.println(name + " 在飛機在休息著...."); Thread.sleep(random.nextInt(500)); System.out.println(name + " 下飛機了"); phaser.arrive(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doWork(){ String name = Thread.currentThread().getName(); System.out.println(name + "準備做 清理 工作"); phaser.awaitAdvance(phaser.getPhase()); System.out.println("飛機的乘客都下機," + name + "可以開始做 清理 工作"); } }
TestMain 測試類(沒有改)
public class TestMain { public static void main(String[] args) { String visitor = "明剛紅麗黑白"; String kongjie = "美惠花"; Airplane airplane = new Airplane(visitor.length()); Setthreads = new HashSet<>(); for (int i = 0; i < visitor.length(); i ++){ threads.add(new Thread(() -> { airplane.getOffPlane(); }, "小" + visitor.charAt(i))); } for (int i = 0; i < kongjie.length(); i ++){ threads.add(new Thread(() ->{ airplane.doWork(); }, "小" + kongjie.charAt(i) + "空姐")); } for (Thread thread : threads){ thread.start(); } } }
運行結果
小花空姐準備做 清理 工作 小惠空姐準備做 清理 工作 小美空姐準備做 清理 工作 小黑 在飛機在休息著.... 小明 在飛機在休息著.... 小紅 在飛機在休息著.... 小麗 在飛機在休息著.... 小剛 在飛機在休息著.... 小明 下飛機了 小紅 下飛機了 小黑 下飛機了 小白 在飛機在休息著.... 小麗 下飛機了 小剛 下飛機了 小白 下飛機了 飛機的乘客都下機,小美空姐可以開始做 清理 工作 飛機的乘客都下機,小花空姐可以開始做 清理 工作 飛機的乘客都下機,小惠空姐可以開始做 清理 工作例子3 高級用法
前面兩個例子都比較簡單,現在我們還用Phaser一個比較高級一點用法.還是用旅游的例子
假如有這么一個場景,在旅游過程中,有可能很湊巧遇到幾個朋友,然后他們聽說你們在旅游,所以想要加入一起繼續接下來的旅游.也有可能,在旅游過程中,突然其中有某幾個人臨時有事,想退出這次旅游了.在自由行的旅游,這是很常見的一些事情.如果現在我們使用CyclicBarrier這個類來實現,我們發現是實現不了,這是用Phaser就可實現這個功能.
首先,我們改寫旅游類 TourismRunnable,這次改動相對比較多一點
public class TourismRunnable implements Runnable{ Phaser phaser; Random random; /** * 每個線程保存一個朋友計數器,比如小紅第一次遇到一個朋友,則取名`小紅的朋友0號`, * 然后旅游到其他景點的時候,如果小紅又遇到一個朋友,這取名為`小紅的朋友1號` */ AtomicInteger frientCount = new AtomicInteger(); public TourismRunnable(Phaser phaser) { this.phaser = phaser; this.random = new Random(); } @Override public void run() { tourism(); } /** * 旅游過程 */ private void tourism() { switch (phaser.getPhase()){ case 0:if(!goToStartingPoint()) break; case 1:if(!goToHotel()) break; case 2:if(!goToTourismPoint1()) break; case 3:if(!goToTourismPoint2()) break; case 4:if(!goToTourismPoint3()) break; case 5:if(!goToEndPoint()) break; } } /** * 準備返程 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToEndPoint() { return goToPoint("飛機場,準備登機回家"); } /** * 到達旅游點3 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToTourismPoint3() { return goToPoint("旅游點3"); } /** * 到達旅游點2 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToTourismPoint2() { return goToPoint("旅游點2"); } /** * 到達旅游點1 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToTourismPoint1() { return goToPoint("旅游點1"); } /** * 入住酒店 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToHotel() { return goToPoint("酒店"); } /** * 出發點集合 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToStartingPoint() { return goToPoint("出發點"); } private int getRandomTime() throws InterruptedException { int time = random.nextInt(400) + 100; Thread.sleep(time); return time; } /** * @param point 集合點 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean goToPoint(String point){ try { if(!randomEvent()){ phaser.arriveAndDeregister(); return false; } String name = Thread.currentThread().getName(); System.out.println(name + " 花了 " + getRandomTime() + " 時間才到了" + point); phaser.arriveAndAwaitAdvance(); return true; } catch (Exception e) { e.printStackTrace(); } return false; } /** * 隨機事件 * @return 返回true,說明還要繼續旅游,否則就臨時退出了 */ private boolean randomEvent() { int r = random.nextInt(100); String name = Thread.currentThread().getName(); if (r < 10){ int friendNum = 1; System.out.println(name + ":在這里竟然遇到了"+friendNum+"個朋友,他們說要一起去旅游..."); phaser.bulkRegister(friendNum); for (int i = 0; i < friendNum; i ++){ new Thread(new TourismRunnable(phaser), name + "的朋友" + frientCount.getAndAdd(1) + "號").start(); } }else if(r > 90){ System.out.println(name + ":突然有事要離開一下,不和他們繼續旅游了...."); return false; } return true; } }
代碼解析
tourism這個方法的case寫法看起有點怪異,如果是為了滿足我們這個需求,這里的case的意思是-->case 第幾次集合: if(是否繼續旅游) 若不繼續則break,否則繼續后面的旅游
phaser.getPhase() 初始值為0,如果全部人到達集合點這個Phase+1,如果phaser.getPhase()達到Integer的最大值,這重新清空為0,在這里表示第幾次集合了
phaser.arriveAndDeregister(); 表示這個人旅游到這個景點之后,就離開這個旅游團了
phaser.arriveAndAwaitAdvance(); 表示這個人在這個景點旅游完,在等待其他人
phaser.bulkRegister(friendNum); 表示這個人在這個景點遇到了friendNum個朋友,他們要加入一起旅游
最后我們的測試代碼還是差不多的,比例子1多了一個到齊后的操作
public class TestMain { public static void main(String[] args) { String name = "明剛紅麗黑白"; Phaser phaser = new SubPhaser(name.length()); ListtourismThread = new ArrayList<>(); for (char ch : name.toCharArray()){ tourismThread.add(new Thread(new TourismRunnable(phaser), "小" + ch)); } for (Thread thread : tourismThread){ thread.start(); } } public static class SubPhaser extends Phaser{ public SubPhaser(int parties) { super(parties); } @Override protected boolean onAdvance(int phase, int registeredParties) { System.out.println(Thread.currentThread().getName() + ":全部"+getArrivedParties()+"個人都到齊了,現在是第"+(phase + 1) +"次集合準備去下一個地方.................. "); return super.onAdvance(phase, registeredParties); } } }
運行輸出以下結果:
小白 花了 109 時間才到了出發點 小紅 花了 135 時間才到了出發點 小麗 花了 218 時間才到了出發點 小黑 花了 297 時間才到了出發點 小明 花了 303 時間才到了出發點 小剛 花了 440 時間才到了出發點 小剛:全部6個人都到齊了,現在是第1次集合準備去下一個地方.................. 小明:突然有事要離開一下,不和他們繼續旅游了.... 小剛:突然有事要離開一下,不和他們繼續旅游了.... 小紅 花了 127 時間才到了酒店 小麗 花了 162 時間才到了酒店 小黑 花了 365 時間才到了酒店 小白 花了 474 時間才到了酒店 小白:全部4個人都到齊了,現在是第2次集合準備去下一個地方.................. 小黑:突然有事要離開一下,不和他們繼續旅游了.... 小麗:突然有事要離開一下,不和他們繼續旅游了.... 小紅 花了 348 時間才到了旅游點1 小白 花了 481 時間才到了旅游點1 小白:全部2個人都到齊了,現在是第3次集合準備去下一個地方.................. 小白 花了 128 時間才到了旅游點2 小紅 花了 486 時間才到了旅游點2 小紅:全部2個人都到齊了,現在是第4次集合準備去下一個地方.................. 小紅 花了 159 時間才到了旅游點3 小白 花了 391 時間才到了旅游點3 小白:全部2個人都到齊了,現在是第5次集合準備去下一個地方.................. 小白:在這里竟然遇到了1個朋友,他們說要一起去旅游... 小白 花了 169 時間才到了飛機場,準備登機回家 小紅 花了 260 時間才到了飛機場,準備登機回家 小白的朋友0號 花了 478 時間才到了飛機場,準備登機回家 小白的朋友0號:全部3個人都到齊了,現在是第6次集合準備去下一個地方..................
通過結果配合我上面的解釋,還是比較好理解的.
遺漏這里還有phaser的中斷和樹形結構沒有舉例子,后續想到比較后的例子,我會繼續做補充的
后記這篇是我目前為止寫的最慢的一篇博文,因為之前沒有使用過phaser,導致在寫的出現很多問題.所以一邊查資料,一邊學習,總算還是把這個phaser給理解了.
打賞如果覺得我的文章寫的還過得去的話,有錢就捧個錢場,沒錢給我捧個人場(幫我點贊或推薦一下)
java7 ?
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69934.html
摘要:分層支持分層一種樹形結構,通過構造函數可以指定當前待構造的對象的父結點。當一個的參與者數量變成時,如果有該有父結點,就會將它從父結點中溢移除。當首次將某個結點鏈接到樹中時,會同時向該結點的父結點注冊一個參與者。 showImg(https://segmentfault.com/img/remote/1460000016010947); 本文首發于一世流云專欄:https://segme...
摘要:倒計時鎖,線程中調用使進程進入阻塞狀態,當達成指定次數后通過繼續執行每個線程中剩余的內容。實現分階段的的功能測試代碼拿客網站群三產創建于年月日。 同步器 為每種特定的同步問題提供了解決方案 Semaphore Semaphore【信號標;旗語】,通過計數器控制對共享資源的訪問。 測試類: package concurrent; import concurrent.th...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:前言在前面的幾篇文章中詳述了框架的若干組分在相應的官方文檔中總會不時地提起同樣的也提到可以用于幫助運行在中的運行時保持有效的執行并行度其實特指其他都在等待一個的前進時熟悉的朋友都知道它的大概組成部分包含支持并發的容器同步器線程池阻塞隊列原子 前言 在前面的幾篇文章中詳述了ForkJoin框架的若干組分,在相應的官方文檔中總會不時地提起Phaser,同樣的,也提到Phaser可以用于幫助...
摘要:本人郵箱歡迎轉載轉載請注明網址代碼已經全部托管有需要的同學自行下載引言自動的兩個線程池講完今天就講跟他們有關的一個工具類吧理論僅僅是一個線程池的工具類它無法實例話包含都是靜態方法或靜態類創建一個指定線程數量的線程池創建一個可以自定義的線 本人郵箱: 歡迎轉載,轉載請注明網址 http://blog.csdn.net/tianshi_kcogithub: https://github.c...
閱讀 3207·2021-11-19 09:40
閱讀 3005·2021-09-09 09:32
閱讀 792·2021-09-02 09:55
閱讀 1393·2019-08-26 13:23
閱讀 2403·2019-08-26 11:46
閱讀 1229·2019-08-26 10:19
閱讀 2054·2019-08-23 16:53
閱讀 1072·2019-08-23 12:44