摘要:倒計(jì)時(shí)鎖,線程中調(diào)用使進(jìn)程進(jìn)入阻塞狀態(tài),當(dāng)達(dá)成指定次數(shù)后通過繼續(xù)執(zhí)行每個(gè)線程中剩余的內(nèi)容。實(shí)現(xiàn)分階段的的功能測(cè)試代碼拿客網(wǎng)站群三產(chǎn)創(chuàng)建于年月日。
同步器
為每種特定的同步問題提供了解決方案
SemaphoreSemaphore【信號(hào)標(biāo);旗語(yǔ)】,通過計(jì)數(shù)器控制對(duì)共享資源的訪問。
測(cè)試類:
package concurrent; import concurrent.thread.SemaphoreThread; import java.util.concurrent.Semaphore; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class SemaphoreTest { public static void main(String[] args) { //在Thread里聲明并不是同一個(gè)對(duì)象 Semaphore semaphore = new Semaphore(3); SemaphoreThread testA = new SemaphoreThread("A", semaphore); SemaphoreThread testB = new SemaphoreThread("B", semaphore); SemaphoreThread testC = new SemaphoreThread("C", semaphore); SemaphoreThread testD = new SemaphoreThread("D", semaphore); SemaphoreThread testE = new SemaphoreThread("E", semaphore); SemaphoreThread testF = new SemaphoreThread("F", semaphore); SemaphoreThread testG = new SemaphoreThread("G", semaphore); testA.start(); testB.start(); testC.start(); testD.start(); testE.start(); testF.start(); testG.start(); } }
線程寫法:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Semaphore; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class SemaphoreThread extends Thread { private static final Logger logger = LogManager.getLogger(SemaphoreThread.class); //創(chuàng)建有3個(gè)信號(hào)量的信號(hào)量計(jì)數(shù)器 public Semaphore semaphore; public SemaphoreThread(String name, Semaphore semaphore) { setName(name); this.semaphore = semaphore; } @Override public void run() { try { logger.debug(getName() + " 取號(hào)等待... " + System.currentTimeMillis()); //取出一個(gè)信號(hào) semaphore.acquire(); logger.debug(getName() + " 提供服務(wù)... " + System.currentTimeMillis()); sleep(1000); logger.debug(getName() + " 完成服務(wù)... " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug(getName() + " 釋放... " + System.currentTimeMillis()); //釋放一個(gè)信號(hào) semaphore.release(); } }
執(zhí)行結(jié)果【以下所有輸出結(jié)果中[]中為線程名稱- 后為輸出的內(nèi)容】:
[C] - C 取號(hào)等待... 1470642024037 [F] - F 取號(hào)等待... 1470642024036 [E] - E 取號(hào)等待... 1470642024036 [B] - B 取號(hào)等待... 1470642024037 [D] - D 取號(hào)等待... 1470642024037 [A] - A 取號(hào)等待... 1470642023965 [D] - D 提供服務(wù)... 1470642024039 [C] - C 提供服務(wù)... 1470642024039 [G] - G 取號(hào)等待... 1470642024036 [F] - F 提供服務(wù)... 1470642024040 [D] - D 完成服務(wù)... 1470642025039 [C] - C 完成服務(wù)... 1470642025039 [D] - D 釋放... 1470642025040 [F] - F 完成服務(wù)... 1470642025040 [C] - C 釋放... 1470642025041 [B] - B 提供服務(wù)... 1470642025042 [A] - A 提供服務(wù)... 1470642025042 [F] - F 釋放... 1470642025043 [E] - E 提供服務(wù)... 1470642025043 [A] - A 完成服務(wù)... 1470642026043 [B] - B 完成服務(wù)... 1470642026043 [B] - B 釋放... 1470642026043 [A] - A 釋放... 1470642026043 [G] - G 提供服務(wù)... 1470642026044 [E] - E 完成服務(wù)... 1470642026045 [E] - E 釋放... 1470642026045 [G] - G 完成服務(wù)... 1470642027045 [G] - G 釋放... 1470642027046
可以看到,當(dāng)3個(gè)信號(hào)量被領(lǐng)取完之后,之后的線程會(huì)阻塞在領(lǐng)取信號(hào)的位置,當(dāng)有信號(hào)量釋放之后才會(huì)繼續(xù)執(zhí)行。
CountDownLatchCountDownLatch【倒計(jì)時(shí)鎖】,線程中調(diào)用countDownLatch.await()使進(jìn)程進(jìn)入阻塞狀態(tài),當(dāng)達(dá)成指定次數(shù)后(通過countDownLatch.countDown())繼續(xù)執(zhí)行每個(gè)線程中剩余的內(nèi)容。
測(cè)試類:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class package concurrent; import concurrent.thread.CountDownLatchThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class CountDownLatchTest { private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class); public static void main(String[] args) throws InterruptedException { //設(shè)定當(dāng)達(dá)成三個(gè)計(jì)數(shù)時(shí)觸發(fā) CountDownLatch countDownLatch = new CountDownLatch(3); new CountDownLatchThread("A", countDownLatch).start(); new CountDownLatchThread("B", countDownLatch).start(); new CountDownLatchThread("C", countDownLatch).start(); new CountDownLatchThread("D", countDownLatch).start(); new CountDownLatchThread("E", countDownLatch).start(); for (int i = 3; i > 0; i--) { Thread.sleep(1000); logger.debug(i); countDownLatch.countDown(); } } }
線程類:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class CountDownLatchThread extends Thread { private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class); //計(jì)數(shù)器 private CountDownLatch countDownLatch; public CountDownLatchThread(String name, CountDownLatch countDownLatch) { setName(name); this.countDownLatch = countDownLatch; } @Override public void run() { logger.debug("執(zhí)行操作..."); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)..."); try { //讓線程進(jìn)入阻塞狀態(tài),等待計(jì)數(shù)達(dá)成后釋放 countDownLatch.await(); logger.debug("計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
執(zhí)行結(jié)果:
[E] - 執(zhí)行操作... [B] - 執(zhí)行操作... [A] - 執(zhí)行操作... [C] - 執(zhí)行操作... [D] - 執(zhí)行操作... [main] DEBUG concurrent.CountDownLatchTest - 3 [B] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [E] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [D] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [A] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [main] DEBUG concurrent.CountDownLatchTest - 2 [main] DEBUG concurrent.CountDownLatchTest - 1 [E] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [C] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [B] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [D] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [A] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行...CyclicBarrier
CyclicBarrier【Cyclic周期,循環(huán)的 Barrier屏障,障礙】循環(huán)的等待阻塞的線程個(gè)數(shù)到達(dá)指定數(shù)量后使參與計(jì)數(shù)的線程繼續(xù)執(zhí)行并可執(zhí)行特定線程(使用不同構(gòu)造函數(shù)可以不設(shè)定到達(dá)后執(zhí)行),其他線程仍處于阻塞等待再一次達(dá)成指定個(gè)數(shù)。
測(cè)試類:
package concurrent; import concurrent.thread.CyclicBarrierThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class CyclicBarrierTest { private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class); public static void main(String[] args) { //可以使用CyclicBarrier(int parties)不設(shè)定到達(dá)后執(zhí)行的內(nèi)容 CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { logger.debug("---計(jì)數(shù)到達(dá)后執(zhí)行的內(nèi)容----"); }); new CyclicBarrierThread("A", cyclicBarrier).start(); new CyclicBarrierThread("B", cyclicBarrier).start(); new CyclicBarrierThread("C", cyclicBarrier).start(); new CyclicBarrierThread("D", cyclicBarrier).start(); new CyclicBarrierThread("E", cyclicBarrier).start(); new CyclicBarrierThread("A2", cyclicBarrier).start(); new CyclicBarrierThread("B2", cyclicBarrier).start(); new CyclicBarrierThread("C2", cyclicBarrier).start(); new CyclicBarrierThread("D2", cyclicBarrier).start(); new CyclicBarrierThread("E2", cyclicBarrier).start(); //需要注意的是,如果線程數(shù)不是上面設(shè)置的等待數(shù)量的整數(shù)倍,比如這個(gè)程序中又加了個(gè)線程, // 那么當(dāng)達(dá)到5個(gè)數(shù)量時(shí),只會(huì)執(zhí)行達(dá)到時(shí)的五個(gè)線程的內(nèi)容, // 剩余一個(gè)線程會(huì)出于阻塞狀態(tài)導(dǎo)致主線程無法退出,程序無法結(jié)束 // new CyclicBarrierThread("F", cyclicBarrier).start();//將這行注釋去掉程序無法自動(dòng)結(jié)束 } }
線程類:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class CyclicBarrierThread extends Thread { private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class); private CyclicBarrier cyclicBarrier; public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) { super(name); this.cyclicBarrier = cyclicBarrier; } @Override public void run() { logger.debug("執(zhí)行操作..."); try { int time = new Random().nextInt(10) * 1000; logger.debug("休眠" + time/1000 + "秒"); sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)..."); try { //讓線程進(jìn)入阻塞狀態(tài),等待計(jì)數(shù)達(dá)成后釋放 cyclicBarrier.await(); logger.debug("計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
執(zhí)行結(jié)果:
[A] - 執(zhí)行操作... [A] - 休眠0秒 [E2] - 執(zhí)行操作... [E2] - 休眠5秒 [D2] - 執(zhí)行操作... [D2] - 休眠4秒 [C2] - 執(zhí)行操作... [C2] - 休眠4秒 [B2] - 執(zhí)行操作... [B2] - 休眠6秒 [A2] - 執(zhí)行操作... [A2] - 休眠8秒 [E] - 執(zhí)行操作... [E] - 休眠5秒 [D] - 執(zhí)行操作... [D] - 休眠0秒 [C] - 執(zhí)行操作... [C] - 休眠3秒 [B] - 執(zhí)行操作... [B] - 休眠7秒 [A] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [D] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [D2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [C2] DEBUG concurrent.CyclicBarrierTest - ---計(jì)數(shù)到達(dá)后執(zhí)行的內(nèi)容---- [C2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [A] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [C] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [D2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [D] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [E2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [E] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [B2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [B] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [A2] - 等待計(jì)數(shù)器達(dá)到標(biāo)準(zhǔn)... [A2] DEBUG concurrent.CyclicBarrierTest - ---計(jì)數(shù)到達(dá)后執(zhí)行的內(nèi)容---- [E] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [B2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [E2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [B] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行... [A2] - 計(jì)數(shù)達(dá)成,繼續(xù)執(zhí)行...
可以想象成以前不正規(guī)的長(zhǎng)途汽車站的模式:
不正規(guī)的長(zhǎng)途汽車站會(huì)等待座位坐滿之后才發(fā)車,到達(dá)目的地之后繼續(xù)等待然后循環(huán)進(jìn)行。每個(gè)人都是一個(gè)Thread,上車后觸發(fā)cyclicBarrier.await();,當(dāng)坐滿時(shí)就是達(dá)到指定達(dá)成數(shù)的時(shí)候,車輛發(fā)車就是達(dá)成后統(tǒng)一執(zhí)行的內(nèi)容,發(fā)車后車上的人們就可以聊天之類的操作了【我們暫且理解為上車后人們就都不能動(dòng)了O(∩_∩)O~】。
CountDownLatch與CyclicBarrier區(qū)別:CountDownLatch是一個(gè)或多個(gè)線程等待計(jì)數(shù)達(dá)成后繼續(xù)執(zhí)行,await()調(diào)用并沒有參與計(jì)數(shù)。
CyclicBarrier則是N個(gè)線程等待彼此執(zhí)行到零界點(diǎn)之后再繼續(xù)執(zhí)行,await()調(diào)用的同時(shí)參與了計(jì)數(shù),并且CyclicBarrier支持條件達(dá)成后執(zhí)行某個(gè)動(dòng)作,而且這個(gè)過程是循環(huán)性的。
ExchangerExchanger
測(cè)試類:
package concurrent; import concurrent.pojo.ExchangerPojo; import concurrent.thread.ExchangerThread; import java.util.HashMap; import java.util.concurrent.Exchanger; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class ExchangerTest { public static void main(String[] args) { Exchanger> exchanger = new Exchanger<>(); new ExchangerThread("A", exchanger).start(); new ExchangerThread("B", exchanger).start(); } }
實(shí)體類:
package concurrent.pojo; import com.alibaba.fastjson.JSON; import java.util.Date; import java.util.List; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class ExchangerPojo { private int intVal; private String strVal; private ListstrList; private Date date; public ExchangerPojo(int intVal, String strVal, List strList, Date date) { this.intVal = intVal; this.strVal = strVal; this.strList = strList; this.date = date; } public int getIntVal() { return intVal; } public void setIntVal(int intVal) { this.intVal = intVal; } public String getStrVal() { return strVal; } public void setStrVal(String strVal) { this.strVal = strVal; } public List getStrList() { return strList; } public void setStrList(List strList) { this.strList = strList; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return JSON.toJSONString(this); } }
線程類:
package concurrent.thread; import concurrent.pojo.ExchangerPojo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.*; import java.util.concurrent.Exchanger; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 創(chuàng)建時(shí)間:2016年08月08日 * 描述: */ public class ExchangerThread extends Thread { private Exchanger> exchanger; private static final Logger logger = LogManager.getLogger(ExchangerThread.class); public ExchangerThread(String name, Exchanger > exchanger) { super(name); this.exchanger = exchanger; } @Override public void run() { HashMap map = new HashMap<>(); logger.debug(getName() + "提供者提供數(shù)據(jù)..."); Random random = new Random(); for (int i = 0; i < 3; i++) { int index = random.nextInt(10); List list = new ArrayList<>(); for (int j = 0; j < index; j++) { list.add("list ---> " + j); } ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的數(shù)據(jù)", list, new Date()); map.put("第" + i + "個(gè)數(shù)據(jù)", pojo); } try { int time = random.nextInt(10); logger.debug(getName() + "等待" + time + "秒...."); for (int i = time; i > 0; i--) { sleep(1000); logger.debug(getName() + "---->" + i); } //等待exchange是會(huì)進(jìn)入阻塞狀態(tài),可以在一個(gè)線程中與另一線程多次交互,此處就不寫多次了 HashMap getMap = exchanger.exchange(map); time = random.nextInt(10); logger.debug(getName() + "接受到數(shù)據(jù)等待" + time + "秒...."); for (int i = time; i > 0; i--) { sleep(1000); logger.debug(getName() + "---->" + i); } getMap.forEach((x, y) -> { logger.debug(x + " -----> " + y.toString()); }); } catch (InterruptedException e) { e.printStackTrace(); } } }
執(zhí)行結(jié)果:
[B] - B提供者提供數(shù)據(jù)... [A] - A提供者提供數(shù)據(jù)... [A] - A等待2秒.... [B] - B等待0秒.... [A] - A---->2 [A] - A---->1 [B] - B接受到數(shù)據(jù)等待1秒.... [A] - A接受到數(shù)據(jù)等待4秒.... [B] - B---->1 [A] - A---->4 [B] - 第0個(gè)數(shù)據(jù) -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的數(shù)據(jù)"} [B] - 第1個(gè)數(shù)據(jù) -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的數(shù)據(jù)"} [B] - 第2個(gè)數(shù)據(jù) -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的數(shù)據(jù)"} [A] - A---->3 [A] - A---->2 [A] - A---->1 [A] - 第0個(gè)數(shù)據(jù) -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的數(shù)據(jù)"} [A] - 第1個(gè)數(shù)據(jù) -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的數(shù)據(jù)"} [A] - 第2個(gè)數(shù)據(jù) -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"],"strVal":"B提供的數(shù)據(jù)"}Phaser
Phaser個(gè)人感覺兼具了CountDownLatch與CyclicBarrier的功能,并提供了分階段的能力。
測(cè)試代碼:
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; /** * 拿客 * 網(wǎng)站:www.coderknock.com * QQ群:213732117 * 三產(chǎn) 創(chuàng)建于 2016年08月08日 21:25:30。 */ public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest.class); public static void main(String[] args) { Phaser phaser = new Phaser() { /**此方法有2個(gè)作用: * 1、當(dāng)每一個(gè)階段執(zhí)行完畢,此方法會(huì)被自動(dòng)調(diào)用,因此,重載此方法寫入的代碼會(huì)在每個(gè)階段執(zhí)行完畢時(shí)執(zhí)行,相當(dāng)于CyclicBarrier的barrierAction。 * 2、當(dāng)此方法返回true時(shí),意味著Phaser被終止,因此可以巧妙的設(shè)置此方法的返回值來終止所有線程。例如:若此方法返回值為 phase>=3,其含義為當(dāng)整個(gè)線程執(zhí)行了4個(gè)階段后,程序終止。 * */ @Override protected boolean onAdvance(int phase, int registeredParties) { logger.debug("階段--->" + phase); logger.debug("注冊(cè)的線程數(shù)量--->" + registeredParties); return super.onAdvance(phase, registeredParties); } }; for (int i = 3; i > 0; i--) { new PhaserThread("第" + i + "個(gè)", phaser).start(); } } }
線程代碼:
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.Phaser; /** * 拿客 * 網(wǎng)站:www.coderknock.com * QQ群:213732117 * 三產(chǎn) 創(chuàng)建于 2016年08月08日 21:16:55。 */ public class PhaserThread extends Thread { private Phaser phaser; private static final Logger logger = LogManager.getLogger(PhaserThread.class); public PhaserThread(String name, Phaser phaser) { super(name); this.phaser = phaser; //把當(dāng)前線程注冊(cè)到Phaser this.phaser.register(); logger.debug("name為" + name + "的線程注冊(cè)了" + this.phaser.getRegisteredParties() + "個(gè)線程"); } @Override public void run() { logger.debug("進(jìn)入..."); phaser.arrive(); for (int i = 6; i > 0; i--) { int time = new Random().nextInt(5); try { logger.debug("睡眠" + time + "秒"); sleep(time * 1000); if (i == 1) { logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties()); logger.debug("最后一次觸發(fā),并注銷自身"); phaser.arriveAndDeregister(); logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties()); } else { logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties()); logger.debug(i + "--->觸發(fā)并阻塞..."); phaser.arriveAndAwaitAdvance();//相當(dāng)于CyclicBarrier.await(); logger.debug("未完成的線程數(shù)量:" + phaser.getUnarrivedParties()); } } catch (InterruptedException e) { e.printStackTrace(); } } logger.debug("注銷完成之后注冊(cè)的線程數(shù)量--->" + phaser.getRegisteredParties()); } }
執(zhí)行結(jié)果:
[main] - name為第3個(gè)的線程注冊(cè)了1個(gè)線程 [main] - name為第2個(gè)的線程注冊(cè)了2個(gè)線程 [main] - name為第1個(gè)的線程注冊(cè)了3個(gè)線程 [第3個(gè)] - 進(jìn)入... [第2個(gè)] - 進(jìn)入... [第3個(gè)] - 睡眠2秒 [第2個(gè)] - 睡眠1秒 [第1個(gè)] - 進(jìn)入... [第1個(gè)] - 階段--->0 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第1個(gè)] - 睡眠4秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 6--->觸發(fā)并阻塞... [第3個(gè)] - 未完成的線程數(shù)量:2 [第3個(gè)] - 6--->觸發(fā)并阻塞... [第1個(gè)] - 未完成的線程數(shù)量:1 [第1個(gè)] - 6--->觸發(fā)并阻塞... [第1個(gè)] - 階段--->1 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 睡眠1秒 [第3個(gè)] - 睡眠0秒 [第2個(gè)] - 睡眠4秒 [第3個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 5--->觸發(fā)并阻塞... [第1個(gè)] - 未完成的線程數(shù)量:2 [第1個(gè)] - 5--->觸發(fā)并阻塞... [第2個(gè)] - 未完成的線程數(shù)量:1 [第2個(gè)] - 5--->觸發(fā)并阻塞... [第2個(gè)] - 階段--->2 [第2個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 睡眠0秒 [第3個(gè)] - 睡眠2秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 睡眠2秒 [第2個(gè)] - 4--->觸發(fā)并阻塞... [第3個(gè)] - 未完成的線程數(shù)量:2 [第1個(gè)] - 未完成的線程數(shù)量:2 [第3個(gè)] - 4--->觸發(fā)并阻塞... [第1個(gè)] - 4--->觸發(fā)并阻塞... [第1個(gè)] - 階段--->3 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 睡眠2秒 [第3個(gè)] - 睡眠1秒 [第2個(gè)] - 睡眠4秒 [第3個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 3--->觸發(fā)并阻塞... [第1個(gè)] - 未完成的線程數(shù)量:2 [第1個(gè)] - 3--->觸發(fā)并阻塞... [第2個(gè)] - 未完成的線程數(shù)量:1 [第2個(gè)] - 3--->觸發(fā)并阻塞... [第2個(gè)] - 階段--->4 [第2個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 睡眠2秒 [第1個(gè)] - 睡眠2秒 [第3個(gè)] - 睡眠4秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 2--->觸發(fā)并阻塞... [第1個(gè)] - 2--->觸發(fā)并阻塞... [第3個(gè)] - 未完成的線程數(shù)量:1 [第3個(gè)] - 2--->觸發(fā)并阻塞... [第3個(gè)] - 階段--->5 [第3個(gè)] - 注冊(cè)的線程數(shù)量--->3 [第3個(gè)] - 未完成的線程數(shù)量:3 [第1個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 未完成的線程數(shù)量:3 [第3個(gè)] - 睡眠2秒 [第1個(gè)] - 睡眠3秒 [第2個(gè)] - 睡眠0秒 [第2個(gè)] - 未完成的線程數(shù)量:3 [第2個(gè)] - 最后一次觸發(fā),并注銷自身 [第2個(gè)] - 未完成的線程數(shù)量:2 [第2個(gè)] - 注銷完成之后注冊(cè)的線程數(shù)量--->2 [第3個(gè)] - 未完成的線程數(shù)量:2 [第3個(gè)] - 最后一次觸發(fā),并注銷自身 [第3個(gè)] - 未完成的線程數(shù)量:1 [第3個(gè)] - 注銷完成之后注冊(cè)的線程數(shù)量--->1 [第1個(gè)] - 未完成的線程數(shù)量:1 [第1個(gè)] - 最后一次觸發(fā),并注銷自身 [第1個(gè)] - 階段--->6 [第1個(gè)] - 注冊(cè)的線程數(shù)量--->0 [第1個(gè)] - 未完成的線程數(shù)量:0 [第1個(gè)] - 注銷完成之后注冊(cè)的線程數(shù)量--->0
上面代碼中,當(dāng)所有線程進(jìn)行到arriveAndAwaitAdvance()時(shí)會(huì)觸發(fā)計(jì)數(shù)并且將線程阻塞,等計(jì)數(shù)數(shù)量等于注冊(cè)線程數(shù)量【即所有線程都執(zhí)行到了約定的地方時(shí),會(huì)放行,是所有線程得以繼續(xù)執(zhí)行,并觸發(fā)onAction事件】。我們可以在onAction中根據(jù)不同階段執(zhí)行不同內(nèi)容的操作。
實(shí)現(xiàn)分階段的CountDownLatch的功能只需將上面的測(cè)試類更改如下:
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; import static jodd.util.ThreadUtil.sleep; /** * 拿客 * 網(wǎng)站:www.coderknock.com * QQ群:213732117 * 三產(chǎn) 創(chuàng)建于 2016年08月08日 21:25:30。 */ public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest.class); public static void main(String[] args) { //這里其實(shí)相當(dāng)于已經(jīng)注冊(cè)了3個(gè)線程,但是并沒有實(shí)際的線程 int coutNum=3; Phaser phaser = new Phaser(coutNum) { /**此方法有2個(gè)作用: * 1、當(dāng)每一個(gè)階段執(zhí)行完畢,此方法會(huì)被自動(dòng)調(diào)用,因此,重載此方法寫入的代碼會(huì)在每個(gè)階段執(zhí)行完畢時(shí)執(zhí)行,相當(dāng)于CyclicBarrier的barrierAction。 * 2、當(dāng)此方法返回true時(shí),意味著Phaser被終止,因此可以巧妙的設(shè)置此方法的返回值來終止所有線程。例如:若此方法返回值為 phase>=3,其含義為當(dāng)整個(gè)線程執(zhí)行了4個(gè)階段后,程序終止。 * */ @Override protected boolean onAdvance(int phase, int registeredParties) { logger.debug("階段--->" + phase); logger.debug("注冊(cè)的線程數(shù)量--->" + registeredParties); return registeredParties==coutNum;//當(dāng)后只剩下coutNum個(gè)線程時(shí)說明所有真實(shí)的注冊(cè)的線程已經(jīng)運(yùn)行完成,測(cè)試可以終止Phaser } }; for (int i = 3; i > 0; i--) { new PhaserThread("第" + i + "個(gè)", phaser).start(); } //當(dāng)phaser未終止時(shí)循環(huán)注冊(cè)這塊兒可以使用實(shí)際的業(yè)務(wù)處理 while (!phaser.isTerminated()) { sleep(1000); logger.debug("觸發(fā)一次"); phaser.arrive(); //相當(dāng)于countDownLatch.countDown(); } } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/66787.html
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來?yè)羝魄皟蓚€(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來?yè)羝魄皟蓚€(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購(gòu),是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場(chǎng)景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來?yè)羝魄皟蓚€(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供的隊(duì)列式...
摘要:在中一般來說通過來創(chuàng)建所需要的線程池,如高并發(fā)原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細(xì)原理解析 - 后端 - 掘金今天我們來研究學(xué)習(xí)一下AbstractQueuedSynchronizer類的相關(guān)原理,java.util.concurrent包中很多類都依賴于這個(gè)類所提供的隊(duì)列式...
面試舊敵之紅黑樹(直白介紹深入理解) - Android - 掘金 讀完本文你將了解到: 什么是紅黑樹 黑色高度 紅黑樹的 5 個(gè)特性 紅黑樹的左旋右旋 指定節(jié)點(diǎn) x 的左旋 右圖轉(zhuǎn)成左圖 指定節(jié)點(diǎn) y 的右旋左圖轉(zhuǎn)成右圖 紅黑樹的平衡插入 二叉查找樹的插入 插入后調(diào)整紅黑樹結(jié)構(gòu) 調(diào)整思想 插入染紅后... java 多線程同步以及線程間通信詳解 & 消費(fèi)者生產(chǎn)者模式 & 死鎖 & Thread...
閱讀 3251·2021-10-13 09:39
閱讀 2013·2021-09-27 13:36
閱讀 3074·2021-09-22 16:02
閱讀 2596·2021-09-10 10:51
閱讀 1578·2019-08-29 17:15
閱讀 1532·2019-08-29 16:14
閱讀 3504·2019-08-26 11:55
閱讀 2549·2019-08-26 11:50