摘要:前言在前面的三篇文章中先后介紹了框架的任務組件體系體系源碼并簡單介紹了目前的并行流應用場景框架本質上是對的擴展它依舊支持經典的使用方式即任務池的配合向池中提交任務并異步地等待結果毫無疑問前面的文章已經解釋了框架的新穎性初步了解了工作竊取
前言
在前面的三篇文章中先后介紹了ForkJoin框架的任務組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡單介紹了目前的并行流應用場景.ForkJoin框架本質上是對Executor-Runnable/Callable-Future/FutureTask的擴展,它依舊支持經典的Executor使用方式,即任務+池的配合,向池中提交任務,并異步地等待結果.
毫無疑問,前面的文章已經解釋了ForkJoin框架的新穎性,初步了解了工作竊取依托的數據結構,ForkJoinTask/CountedCompleter在執行期的行為,也提到它們一定要在ForkJoinPool中進行運行和調度,這也是本文力求解決的問題.
ForkJoinPool源碼ForkJoinPool源碼是ForkJoin框架中最復雜,最難理解的部分,且因為交叉依賴ForkJoinTask,CountedCompleter,ForkJoinWorkerThread,作者在前面多帶帶用兩篇文章分析了它們,以前兩篇文章為基礎,重復部分本文不再詳述.
首先看類簽名.
//禁止偽共享 @sun.misc.Contended //繼承自AbstractExecutorService public class ForkJoinPool extends AbstractExecutorService
前面的幾篇文章不止一次強調過ForkJoin框架的"輕量線程,輕量任務"等概念,也提到少量線程-多數計算,資源空閑時竊取任務.并介紹了基于status狀態的調度(ForkJoinTask系列),不基于status而由子任務觸發完成的調度(CountedCompleter系列),顯然它們的共性就是讓線程在正常調度的前提下盡量少的空閑,最大幅度利用cpu資源,偽共享/緩存行的問題在ForkJoin框架中顯然會是一個更大的性能大殺器.在1.8之前,一般通過補位的方式解決偽共享問題,1.8之后,官方使用@Contended注解,令虛擬機盡量注解標注的字段(字段的情況)或成員字段放置在不同的緩存行,從而規避了偽共享問題.
建立ForkJoinPool可以直接new,也可以使用Executors的入口方法.
//Executors方法,顯然ForkJoinPool被稱作工作竊取線程池.參數指定了并行度. public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, //默認線程工廠,前文中已提過默認的ForkJoinWorkerThread ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //不提供并行度. public static ExecutorService newWorkStealingPool() { return new ForkJoinPool //使用所有可用的處理器 (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //對應的,ForkJoinPool的構造器們. //不指定任何參數. public ForkJoinPool() { //并行度取MAX_CAP和可用處理器數的最小值. this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), //默認的線程工廠.無異常處理器,非異步模式. defaultForkJoinWorkerThreadFactory, null, false); } //同上,只是使用參數中的并行度. public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { //并行度需要校驗 this(checkParallelism(parallelism), //校驗線程工廠 checkFactory(factory), //參數指定的未捕獲異常處理器. handler, //前面的幾處代碼asyncMode都是false,會選用LIFO隊列,是true是會選用FIFO隊列,后面詳述. asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //線程名前綴 "ForkJoinPool-" + nextPoolId() + "-worker-"); //檢查許可,不關心. checkPermission(); } //檢查方法很簡單. //并行度不能大于MAX_CAP不能不大于0. private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; } //線程工廠非空即可. private static ForkJoinWorkerThreadFactory checkFactory (ForkJoinWorkerThreadFactory factory) { if (factory == null) throw new NullPointerException(); return factory; } //最終構造器,私有.待介紹完一些基礎字段后再述. private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; //config初始化值,用并行度與mode取或,顯然mode是FIFO時,將有一個第17位的1. this.config = (parallelism & SMASK) | mode; //np保存并行度(正數)的相反數(補碼). long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
了解其他線程池源碼的朋友可以去回憶其他線程池的構建,不論是調度線程池還是普通的線程池或者緩存池,他們其實都設置了核心線程數和最大線程數.當然這要看定義"線程池分類"的視角,以Executors入口的api分類,或許可以分類成固定線程池,緩沖池,單線程池,調度池,工作竊取池;但以真正的實現分類,其實只有ThreadPoolExecutor系列(固定線程池,單線程池都直接是ThreadPoolExecutor,調度池是它的子類,緩沖池也是ThreadPoolExecutor,只是阻塞隊列限定為SynchronizedQueue)和ForkJoinPool系列(工作竊取池).
作者更傾向于用實現的方式區分,也間接參照Executors的api使用用途的區分方式.如果不使用Executors的入口api,不論哪種ThreadPoolExecutor系列,我們都可以提供線程池的大小配置,阻塞隊列,線程空閑存活時間及單位,池滿拒絕策略,線程工廠等,而所謂的緩存池和固定池的區別只是隊列的區別.
調度池的構造參數與ThreadPoolExecutor無異,只是內限了阻塞隊列的類型,它雖然是ThreadPoolExecutor的擴展,卻不僅沒有拓充參數,反而減少了兩個參數:阻塞隊列和最大線程數.阻塞隊列被默認設置為內部類DelayQueue,它實現了BlockingQueue,最大線程數則為整數上限,同時新增的對任務的延時或重試等屬性則是依托于內部維護的一個FutureTask的擴展,并未增加到構造參數.
而到了ForkJoinPool,我們看到的是截然不同于ThreadPoolExecutor系列的構建方式.首先根本沒有提供核心線程和最大線程數,線程空閑存活時間的參數和阻塞隊列以及池滿拒絕策略;線程工廠也僅能提供生產ForkJoinWorkerThread的工廠bean;還具備一些ThreadPoolExecutor沒有的參數,如未捕獲異常處理器,同步異步模式,工作線程前綴(其實別的類型的線程工廠也可以提供線程前綴,默認就是常見的pool-前綴)等.
顯然從參數看便可猜測出若干不同于其他線程池的功能.但我們更關心其中的一些參數設置.
一般的參數都能見名知義,僅有config和ctl難以理解,此處也不詳細介紹,只說他們的初值的初始化.
config是并行度與SMASK取與運算再與mode取或,這里并行度最大是15位整數(MAX_CAP=0x7FFF),而SMASK作用于整數后16位,mode在FIFO為1<<16,LIFO是0.很好計算.
ctl其實是一個控制信號,我們后面會在具體源碼就地解釋,它的計算先通過了一個局部變量np.
np的計算方法是將并行度的相反數(補碼)轉換為長整型.前面簡單分析,并行度不會大于MAX_CAP,因此np至少前49位全部是1.
計算ctl時,將np左移AC_SHIFT即為取后16位,將np左移TC_SHIFT即取它的后32位,分別與AC_MASK和TC_SHIFT,表示取np的后16位分別放置于ctl的前16位和33至48位.而ctl的后32位初值為0.
因為生成的ctl前16位和后16位相等,如果仔細用數學驗證,可以發現,對前16位和后16位的末位同時加1,當添加了parallel次后,ctl將歸0.這也是添加worker限制的重要數理依據.
前面列舉了獲取ForkJoinPool實例的幾種方法,初步展示了構造一個ForkJoinPool的屬性,也暴露了一些實現細節,而這些細節依賴于一些字段和成員函數,我們先從它們開始.
//ForkJoinWorkerThread的線程工廠. public static interface ForkJoinWorkerThreadFactory { //創建新線程要實現的方法. public ForkJoinWorkerThread newThread(ForkJoinPool pool); } //前面看到的默認線程工廠. static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } //創建InnocuousForkJoinWorkerThread的線程工廠,上一文已經介紹過. static final class InnocuousForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext innocuousAcc; static { Permissions innocuousPerms = new Permissions(); innocuousPerms.add(modifyThreadPermission); innocuousPerms.add(new RuntimePermission( "enableContextClassLoaderOverride")); innocuousPerms.add(new RuntimePermission( "modifyThreadGroup")); innocuousAcc = new AccessControlContext(new ProtectionDomain[] { new ProtectionDomain(null, innocuousPerms) }); } public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread) java.security.AccessController.doPrivileged( new java.security.PrivilegedAction() { public ForkJoinWorkerThread run() { return new ForkJoinWorkerThread. InnocuousForkJoinWorkerThread(pool); }}, innocuousAcc); } } //空任務 static final class EmptyTask extends ForkJoinTask { private static final long serialVersionUID = -7721805057305804111L; EmptyTask() { status = ForkJoinTask.NORMAL; } //狀態直接是已正常完成. public final Void getRawResult() { return null; } public final void setRawResult(Void x) {} public final boolean exec() { return true; } }
以上是線程工廠和一個默認的EmptyTask.接下來看一些跨池和工作隊列的公用常量.
// 與邊界有關的常量 static final int SMASK = 0xffff; // 后16位. static final int MAX_CAP = 0x7fff; // 前面在定并行度時參考的最大容量. static final int EVENMASK = 0xfffe; // 后16位驗偶數 static final int SQMASK = 0x007e; // 最大64個偶數槽,從第2位至7位共6位,2的6次方. // 與WorkQueue有關 static final int SCANNING = 1; // 對WorkQueue正在運行任務的標記 static final int INACTIVE = 1 << 31; // 標記負數 static final int SS_SEQ = 1 << 16; // 版本號使用,第17位1 // ForkJoinPool和WorkQueue的config有關常量. static final int MODE_MASK = 0xffff << 16; // 能濾取前16位. static final int LIFO_QUEUE = 0;//前面提到過的,非async模式(false),值取0. static final int FIFO_QUEUE = 1 << 16;//async模式(true),值取1. static final int SHARED_QUEUE = 1 << 31; // 共享隊列標識,符號位表示負.
以上的字段含義只是粗略的描述,先有一個印象,后面看到時自然理解其含義.
接下來看核心的WorkQueue內部類.
//前面的文章說過,它是一個支持工作竊取和外部提交任務的隊列.顯然,它的實例對內存部局十分敏感, //WorkQueue本身的實例,或者內部數組元素均應避免共享同一緩存行. @sun.misc.Contended static final class WorkQueue { //隊列內部數組的初始容量,默認是2的12次方,它必須是2的幾次方,且不能小于4. //但它應該設置一個較大的值來減少隊列間的緩存行共享. //在前面的java運行時和54篇java官方文檔術語中曾提到,jvm通常會將 //數組放在能夠共享gc標記(如卡片標記)的位置,這樣每一次寫都會造成嚴重內存競態. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大內部數組容量,默認64M,也必須是2的平方,但不大于1<<(31-數組元素項寬度), //根據官方注釋,這可以確保無需計算索引概括,但定義一個略小于此的值有助于用戶在 //系統飽合前捕獲失控的程序. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // unsafe機制有關的字段. private static final sun.misc.Unsafe U; private static final int ABASE; private static final int ASHIFT; private static final long QTOP; private static final long QLOCK; private static final long QCURRENTSTEAL; static { try { U = sun.misc.Unsafe.getUnsafe(); Class> wk = WorkQueue.class; Class> ak = ForkJoinTask[].class; //top字段的句柄. QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); //qlock字段的句柄. QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); //currentSteal的句柄 QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); //ABASE是ForkJoinTask數組的首地址. ABASE = U.arrayBaseOffset(ak); //scale代表數組元素的索引大小.它必須是2的平方. int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //計算ASHIFT,它是31與scale的高位0位數量的差值.因為上一步約定了scale一定是一個正的2的幾次方, //ASHIFT的結果一定會大于1.可以理解ASHIFT是數組索引大小的有效位數. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } //插曲,在Integer類的numberOfLeadingZeros方法,果然一流的程序是數學. public static int numberOfLeadingZeros(int i) { // HD, Figure 5-6 if (i == 0) //i本身已是0,毫無疑問地返回32.本例中i是2起,所以不會. return 32; //先將n初始化1.最后會減掉首位1. int n = 1; //i的前16位不存在非零值,則將n加上16并移除i的前16位.將i轉換為一個以原i后16位開頭的新值. if (i >>> 16 == 0) { n += 16; i <<= 16; } //不論前一步結果如何,若此時i的前8位不存在非零值,則n加上8,i移除前8位.將i轉換為原i的后24位開頭的新值. if (i >>> 24 == 0) { n += 8; i <<= 8; } //不論前一步結果如何,若此時i的前4位不存在非零值,則n加上4,i移除前4位.將i轉換為原i的后28位開頭的新值. if (i >>> 28 == 0) { n += 4; i <<= 4; } //不論前一步結果如何,若此時i的前2位不存在非零值,則n加上2,i移除前2位.將i轉換為原i的后30位開頭的新值. if (i >>> 30 == 0) { n += 2; i <<= 2; } //經過前面的運算,i的前30位的非零值數量已經記入n, //在前一步的基礎上,此時i的前1位若存在非零值,則n-1,否則n保留原值. n -= i >>> 31; return n; } //回到WorkQueue // 實例字段 volatile int scanState; // 版本號,小于0代表不活躍,注釋解釋奇數代表正在掃描,但從代碼語義上看正好相反. int stackPred; // 前一個池棧控制信號(ctl),它保有前一個棧頂記錄. int nsteals; // 偷盜的任務數 int hint; // 一個隨機數,用于決定偷取任務的索引. int config; // 配置,表示池的索引和模式 volatile int qlock; // 隊列鎖,1表示鎖了,小于0表示終止,其他情況是0. volatile int base; // 底,表示下一個poll操作的插槽索引 int top; // 頂,表示下一個push操作的插槽索引 ForkJoinTask>[] array; // 存放任務元素的數組,初始不分配,首擴容會分配. final ForkJoinPool pool; // 包含該隊列的池,可能在某些時刻是null. final ForkJoinWorkerThread owner; // 持有該隊列的線程,如果隊列是共享的,owner是null. volatile Thread parker; // 在調用park阻塞的owner,非阻塞時為null volatile ForkJoinTask> currentJoin; // 被在awaitJoin中join的task. volatile ForkJoinTask> currentSteal; // 字面意思當前偷的任務,主要用來helpStealer方法使用. //工作隊列構造器,只初始化線程池,owner等字段. WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) //base和top初始均為INITIAL_QUEUE_CAPACITY的一半,也就是2的11次方. base = top = INITIAL_QUEUE_CAPACITY >>> 1; } //返回本隊列在池中的索引,使用config的2至4位表示.因為config的最后一位是奇偶位,忽略. final int getPoolIndex() { return (config & 0xffff) >>> 1; } //返回隊列中的任務數. final int queueSize() { //非owner的調用者必須先讀base,用base-top,得到的結果小于0則取相反數,否則取0. //忽略即時的負數,它并不嚴格準確. int n = base - top; return (n >= 0) ? 0 : -n; } //判斷隊列是否為空隊.本方法較為精確,對于近空隊列,要檢查是否有至少一個未被占有的任務. final boolean isEmpty() { ForkJoinTask>[] a; int n, m, s; //base大于等于top,說明空了. return ((n = base - (s = top)) >= 0 || //有容量,且恰好計算為1,可能只有一個任務. (n == -1 && //計算為1,再驗數組是不是空的. ((a = array) == null || (m = a.length - 1) < 0 || //取該位置元素的值判空,空則說明isEmpty. //取值的方式是取ForkJoinTask.class首地址加上偏移量(數組長度減一(最后一個元素位置,經典案例32-1)與運算top減一左移ASHIFT(索引大小有效位數)位)的值. U.getObject (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); } //將一個任務壓入隊列,前文提過的fork最終就會壓隊.但此方法只能由非共享隊列的持有者調用. //當使用線程池的"外部壓入"externalPush方法時,壓入共享隊列. final void push(ForkJoinTask> task) { ForkJoinTask>[] a; ForkJoinPool p; //保存當時的base top. int b = base, s = top, n; //如果數組被移除則忽略. if ((a = array) != null) { //數組最后一個下標.如長度32,則m取31這個質數.此時保存一個m,對于保存后其他push操作相當于打了屏障. int m = a.length - 1; //向數組中的指定位置壓入該任務.位置包含上面的m和s進行與運算(數組中的位置),結果左移索引有效長度位(索引長度),再加上數組首索引偏移量(起始地址). U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //將top加1. U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { //計算舊的任務數量,發現不大于1個,說明原來很可能工作線程正在阻塞等待新的任務.需要喚醒它. if ((p = pool) != null) //signalWork會根據情況,添加新的工作線程或喚醒等待任務的線程. p.signalWork(p.workQueues, this); } else if (n >= m)//2. //任務數量超出了,對數組擴容. growArray(); } } //添加任務過程主流程無鎖,包括可能出現的growArray.當原隊列為空時,它會初始化一個數組,否則擴容一倍. //持有者調用時,不需要加鎖,但當其他線程調用時,需要持有鎖.在resize過程中,base可以移動,但top不然. final ForkJoinTask>[] growArray() { //記錄老數組. ForkJoinTask>[] oldA = array; //根據老數組決定新容量,老數組空則INITIAL_QUEUE_CAPACITY否則國倍. int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) //新大小大于最大數組大小則拒絕. throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; //直接將原來的數組引用替換成新的. ForkJoinTask>[] a = array = new ForkJoinTask>[size]; //如果是初次分配,就此打住返回a,是擴容,且老數組非空則進入下面的循環拷貝. if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { //根據前面的運算,size一定是2的冪,減一用來哈希,這是經典處理辦法. int mask = size - 1; do { ForkJoinTask> x; //老數組base自增過若干次的得到b,它代表的元素對應的索引. int oldj = ((b & oldMask) << ASHIFT) + ABASE; //用b在新數組中找出索引. int j = ((b & mask) << ASHIFT) + ABASE; //老數組中用索引取出元素. x = (ForkJoinTask>)U.getObjectVolatile(oldA, oldj); if (x != null && //老數組置空,放入新數組. U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); //每處理完一個task,就將base自增1,直到top為止. } while (++b != t); } //返回新數組. return a; } //存在下一個任務,彈出,順序是后進先出.此方法僅限非共享隊列的owner調用. final ForkJoinTask> pop() { ForkJoinTask>[] a; ForkJoinTask> t; int m; //還有元素. if ((a = array) != null && (m = a.length - 1) >= 0) { //1.top至少比base大一.注意,每次循環都會讀出新的top,它是volatile修飾的. for (int s; (s = top - 1) - base >= 0;) { //top對應的索引. long j = ((m & s) << ASHIFT) + ABASE; //2.該索引沒有元素,break,返回null.而且就代表這個位置的確是null,與競態無關. //因為此方法僅owner線程使用,不會出現另一個線程計算了同樣的j,且先執行了3的情況. //出現這種情況,則是此位置的任務當先被執行并出棧,或者就從未設置過任務,后續分析這種極端情況. //故如果出現某個任務在數組的中間,提前被執行并置空(非pop或poll方式),那么再對WorkQueue進行pop時將會中斷, //留下一部分null之后的任務不能出棧,所以可以允許任務非pop或poll方式查出并執行,但為了能pop出所有任務,不能中間置null. if ((t = (ForkJoinTask>)U.getObject(a, j)) == null) break; //3.有元素,將該索引位置置null.若cas失敗,說明元素被取出了, //但下次循環即使在2處break并返回null,也不是因為競態,因為每次循環到1都會讀取新的top, //也就有新的j. if (U.compareAndSwapObject(a, j, t, null)) { //數組位置置null的同時top減1. U.putOrderedInt(this, QTOP, s); return t; } } } //循環退出,說明top位置沒有元素,也相當于說明數組為空.顯然此方法的另一個作用是將隊列壓縮,空隊列會將top先降到base+1,再循環最后一次將top降到base. return null; } //如果b是base,使用FIFO的次序嘗試無競態取底部的任務.它會在ForkJoinPool的scan和helpStealer中使用. final ForkJoinTask> pollAt(int b) { ForkJoinTask> t; ForkJoinTask>[] a; if ((a = array) != null) { //和前面一樣的的方式計算b對應的索引j int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask>)U.getObjectVolatile(a, j)) != null && //j對應位置有task且當前base==b,嘗試將task出隊. base == b && U.compareAndSwapObject(a, j, t, null)) { //出隊成功base增1.不需要額外的同步,因為兩個線程不可能同時在上面的cas成功. //當一切條件匹配(b就是base且j位置有元素),pollAt同一個b只會有一個返回非空的t. //如果多個線程傳入的b不相等,在同一時刻只有一個會等于base. base = b + 1; return t; } } return null; } //用FIFO的次序取下一個任務. final ForkJoinTask> poll() { ForkJoinTask>[] a; int b; ForkJoinTask> t; //1.循環從base取任務,當base增長到top或其他操作重置array為null則終止循環. while ((b = base) - top < 0 && (a = array) != null) { //前面已敘述過取索引的邏輯,使用一個top到base間的數與數組長度-1與運算并左移索引長度位再加上數組基準偏移量.后面不再綴述. int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //取出task t = (ForkJoinTask>)U.getObjectVolatile(a, j); //2.如果發生競態,base已經不是b,直接開啟下一輪循環把新的base讀給b. if (base == b) { if (t != null) { //3.當前t是base任務,用cas置空,base+1,返回t. //如果此處發生競態,則只有一個線程可以成功返回t并重置base(4). //不成功的線程會開啟下一輪循環,此時成功線程可能未來的及執行4更新base, //也可能已經更新base,則導致先前失敗的線程在2處通過,經5種或判隊列空返回,或非空再次循環,而 //在當前成功線程執行4成功后,所有前面失敗的線程可以在1處讀到新的base,這些線程 //在下一次循環中依舊只會有一個成功彈出t并重置base,直到所有線程執行完畢. if (U.compareAndSwapObject(a, j, t, null)) { //4重置加返回 base = b + 1; return t; } } //5.t取出的是空,發現此時臨時變量b(其他成功線程在此輪循環前置的base)已增至top-1,且當前線程又沒能成功的彈出t,說明一定會有一個線程 //將t彈出并更新base到top的值,當前線程沒必要再開下一個循環了,直接break并返回null. //t取出的是空,但是沒到top,說明只是被提前執行并置空了,那么繼續讀取新的base并循環,且若沒有其他線程去更改base,array的長度,或者把top降到 //base,則當前線程就永遠死循環下去了,因為每次循環都是125且每個變量都不變.因此為避免循環,每個任務可以提前執行,但一定不能提前離隊(置null). //也就是說:只能用poll或pop方式彈出任務,其他方式獲得任務并執行是允許的,但不能在執行后置null,留待后續源碼驗證一下. else if (b + 1 == top) // now empty break; } } //從循環退出來有兩種情況,可能是在5處滿足退出條件,或者在2處發現b已經是臟數據,下輪循環不滿足循環條件所致.兩種都應該返回null. return null; } //根據mode來取下一個本隊列元素.根據模式. final ForkJoinTask> nextLocalTask() { //當前WorkQueue的配置了FIFO,則poll,否則pop. //盡管還未看到注冊worker的源碼,在此提前透露下,ForkJoinPool也有一個config(前面講構造函數提過) //該config保存了mode信息,并原樣賦給了WorkQueue的mode.注意,相應的任務會出隊. return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } //根據模式取出下一個任務,但是不出隊. final ForkJoinTask> peek() { ForkJoinTask>[] a = array; int m; //空隊,返回null. if (a == null || (m = a.length - 1) < 0) return null; //根據mode定位要取的索引j. int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) << ASHIFT) + ABASE; //返回讀出的值,不出隊. return (ForkJoinTask>)U.getObjectVolatile(a, j); } //如果參數t是當前隊的top,則彈出. final boolean tryUnpush(ForkJoinTask> t) { ForkJoinTask>[] a; int s; if ((a = array) != null && (s = top) != base && //1.滿足非空條件.嘗試用t去當當作計算出的索引位置的原任務的值并cas為null來出隊. U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { //cas成功,說明t確實是top,將top減一返回true. U.putOrderedInt(this, QTOP, s); return true; } //2.cas失敗或不滿足1的條件,返回false. return false; } //移除并取消隊列中所有已知的任務,忽略異常. final void cancelAll() { ForkJoinTask> t; if ((t = currentJoin) != null) { //有currentJoin,引用置空,取消并忽略異常. currentJoin = null; ForkJoinTask.cancelIgnoringExceptions(t); } if ((t = currentSteal) != null) { //有currentSteal,引用置空,取消并忽略異常. currentSteal = null; ForkJoinTask.cancelIgnoringExceptions(t); } //除了上面兩個,就只剩下數組中的任務了.按LILO的順序彈出并依次取消,忽略所有異常. while ((t = poll()) != null) ForkJoinTask.cancelIgnoringExceptions(t); } // 以下是執行方法. //按FIFO順序從隊首彈出任務并執行所有非空任務. final void pollAndExecAll() { for (ForkJoinTask> t; (t = poll()) != null;) //很明顯,如果未按嚴格順序執行,先執行中間的一個任務, //再調用本方法,則會半路中止. t.doExec(); } //移除并執行完所有本隊列的任務,如果是先進先出,則執行前面的pollAndExecAll方法. //否則pop循環執行到空為止.按前面的分析,只要堅持只能pop或poll彈出,其他方式執行任務但不能置空的原則, //可以保證pop或poll出現空的情況只能是競態發生的情況. final void execLocalTasks() { int b = base, m, s; ForkJoinTask>[] a = array; //初始滿足條件,top至少比base大1.隊列非空. if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //不是FIFO模式. if ((config & FIFO_QUEUE) == 0) { for (ForkJoinTask> t;;) { //原子getAndSet,查出并彈出原本的task if ((t = (ForkJoinTask>)U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) //彈出的task是空,break.說明整個工作流程中,如果未保證嚴格有序, //如先從中間的某個任務開始執行并且出隊了,再調用execLocalTasks,會導致中間停頓. //只執行不出隊,則至少不會中斷.出現t是null的情況只能是競態或末尾. break; //top減一,執行任務. U.putOrderedInt(this, QTOP, s); t.doExec(); //如果base大于等于top,則中止. if (base - (s = top - 1) > 0) break; } } //是FIFO模式,pollAndExecAll. else pollAndExecAll(); } } //重點入口方法來了,前面留下諸多關于執行任務是否出隊的討論,下面來分析入口方法. //該方法的入口是每個工作線程的run方法,因此只有一個線程. final void runTask(ForkJoinTask> task) { //傳入task是空直接不理會. if (task != null) { //標記成忙.scanState是WorkQueue的成員變量,每個WorkQueue只有一個值, //前面說過,一般情況下,每個線程會有一個WorkQueue,所以某種情況來講也可以標記為 //當前ForkJoinWorkerThread繁忙. //SCANNING常量值是1,這個操作實質上就是將scanState變量的個位置0,也就是變成了偶數并標記它要忙了. //顯然偶數才表示忙碌,這也是為什么前面覺得官方注釋scanState是奇數表示"正在掃描"很奇怪. scanState &= ~SCANNING; //將currentSteal設置為傳入的任務,并運行該任務,若該任務內部進行了分叉,則進入相應的入隊邏輯. (currentSteal = task).doExec(); //執行完該任務后,將currentSteal置空.將該task釋放掉,幫助gc. U.putOrderedObject(this, QCURRENTSTEAL, null); //調用前面提到的,根據mode選擇依次pop或poll的方式將自己的工作隊列內的任務出隊并執行的方法. execLocalTasks(); //到此,自己隊列中的所有任務都已經完成.包含偷來的任務fork后又入隊到自己隊列的子任務. //取出owner線程.處理偷取任務有關的一些信息. ForkJoinWorkerThread thread = owner; if (++nsteals < 0) //發現當前WorkQueue偷來的任務數即將溢出了,將它轉到線程池. transferStealCount(pool); //取消忙碌標記. scanState |= SCANNING; if (thread != null) //執行afterTopLevelExec勾子方法,上一節中介紹ForkJoinWorkerThread時已介紹. thread.afterTopLevelExec(); } //方法結束,注意,沒有任何操作將task從所在的數組中移除,不論這個task是哪個WorkQueue中的元素. //同時,此方法原則上講可以多次調用(盡管事實上就一次調用),入口處和出口處分別用忙碌標記來標記scanState,但重復標記顯然不影響執行. } //如果線程池中已經初始化了用于記錄的stealCounter,則用它加上當前WorkQueue的nsteals/或最大整數(發生溢出時). //并初始化當前WorkQueue的nsteals. final void transferStealCount(ForkJoinPool p) { AtomicLong sc; if (p != null && (sc = p.stealCounter) != null) { //線程池中存放了stealCounter,它是一個原子整數. int s = nsteals; nsteals = 0; //恢復0. //若nsteals是負,增加最大整數,否則增加nsteal sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); } } //如果task存在,則將它從隊列中移除并執行,發現有位于頂部的取消任務,則移除之,只用于awaitJoin. //如果隊列空并且任務不知道完成了,則返回true. final boolean tryRemoveAndExec(ForkJoinTask> task) { ForkJoinTask>[] a; int m, s, b, n; //進入if的條件,存在非空任務數組,參數task非空. if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { //循環條件,隊列非空.從s開始遍歷到b,也就是從頂到底.后進先出. while ((n = (s = top) - (b = base)) > 0) { //1.內層循環. for (ForkJoinTask> t;;) { //2.從頂開始的索引j,每次向下找一個. long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask>)U.getObject(a, j)) == null) //3.取出的是空,返回值取決于top是不是內層循環是第一次運行,外循環每次會將s更新為新top, //內循環則會每次將s減一.內循環只跑了一次的情況,顯然會返回true. //顯然這種情況下top也沒有被其他線程更新,內循環又是第一次跑,那么將足以說明當前隊列為空,該為false. //true的情況,向下遍歷了幾個元素打到了底,未進入46 10這三種要重開啟一輪外循環的情況,也沒找到task. //不管怎樣,發現空任務就返回. return s + 1 == top;// 比預期短,第一個或第n個出現了空值,但循環條件未false else if (t == task) { //找到的任務t不是空,且是目標任務. boolean removed = false; if (s + 1 == top) { //4.發現是首輪內循環,s+1==top成立,進行pop操作,將task彈出并將top減一. //顯然,task是最頂任務,可以用pop方式,將它置空. if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); //5.置removed為true. removed = true; } } //6.不是首輪循環,而且base沒有在處理期間發生改變. else if (base == b) //7.嘗試將task替換成一個EmptyTask實例.成功則removed是true, //這樣雖然該任務出了隊,但在隊上還有一個空的任務,而不會出現前面擔心的中間null //的情況,也不改變top或base的值. removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed) //8.只要任務成功出隊(不論是4還是7,則執行. task.doExec(); //9.只要找到任務,退出內循環,回到外循環重置相應的條件. break; } //10.本輪內循環沒找到匹配task的任務. else if (t.status < 0 && s + 1 == top) {//官方注釋是取消. //11.若t是完成的任務且是首輪內循環且top未變動,將該任務出隊并令top減一. if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); //12.只要進入此分支就退出內循環. break; } if (--n == 0) //13.內循環每執行到此一次,就說明有一次沒找到目標任務,減少n(開始時的base top差值).達0時返回false停止循環. //即每個內循環都只能執行n次,進入外循環時重置n. return false; } //14.結束了任何一輪內循環時,發現目標task已經完成,則停止外循環返回false. if (task.status < 0) return false; } } //15.task參數傳空,或者當前WorkQueue沒有任務,直接返回true. return true; } //簡單梳理一下tryRemoveAndExec的執行流程和生命周期. //a.顯然,一上來就判隊列的空和參數的空,如果第一個if都進不去,按約定返回true. //b.經過1初始化一個內層循環,并初始化了n,它決定內循環最多跑n次,如果內循環一直不break(9找到任務或12發現頂部任務是完成態),也假定一般碰不到14(發現目標任務完成了) //也沒有出現幾種return(3查出null,14某輪內循環目標task發現被完成了),那么最終只會耗盡次數,遍歷到底,在13處return false(確定此輪循環task不在隊列) //c.如果出現了幾種break(9,12),9其實代表查到任務,12代表頂部任務已完成(官方說取消),那就會停止內循環,重新開啟一輪外循環,初始化n,繼續從新的top到base遍歷(b). //但此時,可能找不到task了(它已經在上一輪內循環出隊或被替換成代理),但也可能實際上未出隊(該task不是top,即4,base也發生了改變造成7未執行),那么可能在本輪循環 //找到任務,在b中進入相應的break,并且成功移除并會進入d,也可能沒進入break而是再重復一次b. //d.如果某一次break成功刪除了任務,那么外循環更新了n,base,top,重啟了一次內循環,但是所有找到task的分支不會再有了,如果接下來不再碰到被完成(取消)的頂部任務11-12, //同樣也沒發現目標task完成了(不進14),那么最終的結果就是n次內循環后n降低到0,直接return false. //e.從b-d任何一次內循環在最后發現了task結束,立即返回false.否則,它可能在某一次內循環中彈出并執行了該任務,卻可能一直在等待它完成,因此這個機制可以讓等待task完成前, //幫助當前WorkQueue清理頂部無效任務等操作. //此方法適用于不論共享或者獨有的模式,只在helpComplete時使用. //它會彈出和task相同的CountedCompleter,在前一節講解CountedCompleter時已介紹過此方法. //父Completer僅能在棧鏈上找到它的父和祖先completer并幫助減掛起任務數或完成root,但在此處 //它可以幫助棧鏈上的前置(子任務),前提是要popCC彈出. final CountedCompleter> popCC(CountedCompleter> task, int mode) { int s; ForkJoinTask>[] a; Object o; //當前隊列有元素. if (base - (s = top) < 0 && (a = array) != null) { //老邏輯從頂部確定j. long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; if ((o = U.getObjectVolatile(a, j)) != null && (o instanceof CountedCompleter)) { //當前隊列中存在類型為CountedCompleter的元素.對該completer棧鏈開啟一個循環. CountedCompleter> t = (CountedCompleter>)o; for (CountedCompleter> r = t;;) { //對該CountedCompleter及它的completer棧元素進行遍歷,每一個遍歷到的臨時存放r. //找到r==task,說明有一個completer位于task的執行路徑. if (r == task) { //mode小于0,這個mode其實有誤解性,它的調用者其實是將一個WorkQueue的config傳給了這個mode. //而config只有兩處初始化,一是將線程注冊到池的時候,初始化WorkQueue, //二是外部提交的任務,使用externalSubmit時新建的WorkQueue,config會是負值且沒有owner. //它也說明是共享隊列,需要有鎖定機制.. if (mode < 0) { //另一個字段qlock派上了用場,將它置為1表示加鎖. if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { //加鎖成功,在top和array這過程中未發生變動的情況下,嘗試 //將t出隊,此時t是棧頂上的元素,它的completer棧鏈前方有task. if (top == s && array == a && U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); U.putOrderedInt(this, QLOCK, 0); return t; } //不論出隊成功還是失敗,解鎖. U.compareAndSwapInt(this, QLOCK, 1, 0); } } //非共享隊列,直接將t出列. else if (U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); return t; } //只要找到,哪怕兩處cas出現不成功的情況,也是競態失敗,break終止循環. break; } //r不等于task,找出r的父并開始下輪循環,直到root或找到task為止. else if ((r = r.completer) == null) // try parent break; } } } //空隊列,頂部不是Completer或者不是task的子任務,返回null. return null; } //嘗試在無競態下偷取此WorkQueue中與給定task處于同一個completer棧鏈上的任務并運行它, //若不成功,返回一個校驗合/控制信號給調用它的helpComplete方法. //返回規則,成功偷取則返回1;返回2代表可重試(被其他小偷擊敗),如果隊列非空但未找到匹配task,返回-1, //其他情況返回一個強制負的基準索引. final int pollAndExecCC(CountedCompleter> task) { int b, h; ForkJoinTask>[] a; Object o; if ((b = base) - top >= 0 || (a = array) == null) //空隊列,與最小整數(負值)取或作為信號h h = b | Integer.MIN_VALUE; // to sense movement on re-poll else { //從底部取索引j long j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用該索引取task取出null,說明被捷足先登了,信號置為可重試. if ((o = U.getObjectVolatile(a, j)) == null) h = 2; // retryable //取出的非空任務類型不是CountedCompleter.說明不匹配,信號-1 else if (!(o instanceof CountedCompleter)) h = -1; // unmatchable else { //是CountedCompleter類型 CountedCompleter> t = (CountedCompleter>)o; for (CountedCompleter> r = t;;) { //基本同上個方法的邏輯,只是上個方法t取的是top,這里取base. //r從t開始找它的父,直到它本身或它的父等于task.將它從底端出隊. if (r == task) { if (base == b && U.compareAndSwapObject(a, j, t, null)) { //出隊成功,因為我們找的是base,且競態成功,直接更新base即可. base = b + 1; //出隊后執行該出隊的任務.返回1代表成功. t.doExec(); h = 1; // success } //base被其他線程修改了,或者cas競態失敗.(其實是一個情況),信號2,可以從新的base開始重試. else h = 2; // lost CAS //只要找到task的子任務就break,返回競態成功或可重試的信號. break; } //迭代函數,當前r不是task,將r指向它的父,直到某一個r的父是task或者是null進入else if. else if ((r = r.completer) == null) { //能夠進來,說明r已經指向了root,卻沒有找到整條鏈上有這個task,返回信號為未匹配到. h = -1; // unmatched break; } } } } return h; } //如果當前線程擁有此隊列且明顯未被鎖定,返回true. final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; //前面提過的scanState會在一上來runTask時和1的反碼取與運算,直到運行完任務才會反向運算. //這個過程,scanState的最后一位會置0,但這與此判斷條件關系不大. //前面對scanState有所注釋,小于0代表不活躍. return (scanState >= 0 && //隊列處于活躍態且當前線程的狀態不是阻塞,不是等待,不是定時等待,則返回true. (wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } }
到此終于WorkQueue內部類的代碼告一段落.
這一段介紹了WorkQueue的內部實現機制,以及與上一節有關的提到的CountedCompleter在幫助子任務時處于WorkQueue的實現細節(似乎默認情況下即asnycMode傳true時只會從當前工作線程隊列取頂部元素,從其他隨機隊列的底部開取,有可能可以重復取,具體細節到ForkJoinPool的helpComplete相關源碼再說),以及構建好的WorkQueue會有哪些可能的狀態和相應的字段,以及若干模式(同步異步或者LIFO,FIFO等),出隊入隊的操作,還提出了隊列中元素為什么中間不能為空,如果出現要將中間元素出隊怎么辦?別忘了答案是換成一個EmptyTask.
不妨小結一下WorkQueue的大致結構.
1.它規避了偽共享.
2.它用scanState表示運行狀態,版本號,小于0代表不活躍維護了忙碌標記,也用scanState在runTask入口開始運行任務時標記為忙碌(偶數),結束后再取消忙碌狀態(奇數).注釋解釋奇數代表正在掃描,但從代碼語義上看正好相反
3.它維護了一個可以擴容的數組,也維護了足夠大的top和base,[base,top)或許可以形象地表示它的集合,pop是從top-1開始,poll從base開始,當任務壓入隊成功后,檢查若top-base達到了數組長度,也就是集合[base,top)的元素數達到或者超過了隊列數組長度,將對數組進行擴容,因使用數組長度-1與哈希值的方式,擴容前后原數組元素索引不變,新壓入隊列的元素將在此基礎上無縫添加,因此擴容也規避了出現中間任務null的情況.初始容量在runWorker時分配.
4.它維護了偷取任務的記錄和個數,并在溢出等情況及時累加給池.它也維護了阻塞者線程和主人線程.
5.它可能沒有主人線程(共享隊列),或有主人線程(非共享,注冊入池時生成)
6.它維護了隊列鎖qlock,但目前僅在popCC且當前為共享隊列情況下使用,保證爭搶的同步.
7.其他一些字段如config,currentJoin,hint,parker等,需要在后續的線程池自身代碼中結合前面的源碼繼續了解,包含stackPred據說保持前一個池棧的運行信號.
WorkQueue本質也是一個內部類,它雖然定義了一系列實現,但這些實現方法的調度還是由ForkJoinPool來實現,所以我們還是要回歸到ForkJoinPool自身的方法和公有api上,遇到使用上面WorkQueue定義好的工具方法時,我們再來回顧.
前面已經看了一些影響WorkQueue的位于ForkJoinPool的常量,再來繼續看其他的ForkJoinPool中的一些常量.
//默認線程工廠.前面提過兩個實現 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; //是否允許啟動者在方法中殺死線程的許可,我們忽略這方面的內容. private static final RuntimePermission modifyThreadPermission; //靜態的common池 static final ForkJoinPool common; common池的并行度. static final int commonParallelism; //tryComensate方法中對構造備用線程的創造. private static int commonMaxSpares; //池順序號,創建工作線程會拼接在名稱上. private static int poolNumberSequence; //同步方法同,遞增的池id. private static final synchronized int nextPoolId() { return ++poolNumberSequence; } // 以下為一些靜態配置常量. //IDLE_TIMEOUT代表了一個初始的納秒單位的超時時間,默認為2s,它用于線程觸發靜止停頓以等待新的任務. //一旦超過了這個 時長,線程將會嘗試收縮worker數量.為了避免某些如長gc等停頓的影響,這個值應該足夠大 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec //為應對定時器下沖設置的空閑超時容忍度. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms //它是commonMaxSpares靜態初始化時的初值,這個值遠超普通的需要,但距離 //MAX_CAP和一般的操作系統線程限制要差很遠,這也使得jvm能夠在資源耗盡前 //捕獲資源的濫用. private static final int DEFAULT_COMMON_MAX_SPARES = 256; //在block之前自旋等待的次數,它在awaitRunStateLock方法和awaitWork方法中使用, //但它事實上是0,因此這兩個方法其實在用隨機的自旋次數,設置為0也減少了cpu的使用. //如果將它的值改為大于0的值,那么必須設置為2的冪,至少4.這個值設置達到2048已經可以 //耗費一般上下文切換時間的一小部分. private static final int SPINS = 0; //種子生成器的默認增量.注冊新worker時詳述. private static final int SEED_INCREMENT = 0x9e3779b9;
上面都是一些常量的聲明定義,下面看一些與線程池config和ctl有關的常量,以及前面構造器提過的變量.
// 高低位 private static final long SP_MASK = 0xffffffffL;//long型低32位. private static final long UC_MASK = ~SP_MASK;//long型高32位. // 活躍數. private static final int AC_SHIFT = 48;//移位偏移量,如左移到49位開始. private static final long AC_UNIT = 0x0001L << AC_SHIFT;//1<<48代表一個活躍數單位. private static final long AC_MASK = 0xffffL << AC_SHIFT;//long型高16位(49-64) // 總數量 private static final int TC_SHIFT = 32;//移位偏移量,33位開始. private static final long TC_UNIT = 0x0001L << TC_SHIFT;//1<<32代表一個總數量 private static final long TC_MASK = 0xffffL << TC_SHIFT;//33-48位 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位 //與運行狀態有關的位,顯然后面的runState是個int型,這些移位數也明顯是int的范圍. //SHUTDOWN顯然一定是負值,其他值也都是2的冪. private static final int RSLOCK = 1;//run state鎖,簡單來說就是奇偶位. private static final int RSIGNAL = 1 << 1;//2 運行狀態的喚醒. private static final int STARTED = 1 << 2;//4,啟動 private static final int STOP = 1 << 29;//30位,代表停. private static final int TERMINATED = 1 << 30;//31位代表終止. private static final int SHUTDOWN = 1 << 31;//32位代表關閉. //實例字段. volatile long ctl; // 代表池的主要控制信號,long型 volatile int runState; // 可以鎖的運行狀態 final int config; // 同時保存了并行度和模式(開篇的構造函數) int indexSeed; // 索引種子,生成worker的索引 volatile WorkQueue[] workQueues; // 工作隊列的注冊數組. final ForkJoinWorkerThreadFactory factory;//線程工廠 final UncaughtExceptionHandler ueh; // 每一個worker線程的未捕獲異常處理器. final String workerNamePrefix; // 工作線程名稱前綴. volatile AtomicLong stealCounter; // 代表偷取任務數量,前面提過,官方注釋說也用作同步監視器
僅僅看這些字段的簡單描述是無法徹底搞清楚它們的含義的,還是要到應用的代碼來看,我們繼續向下看ForkJoinPool中的一些方法.
//嘗試對當前的runState加鎖標志位,并返回一個runState,這個runState可能是原值(無競態)或新值(競態且成功). //不太準確的語言可以說是"鎖住"runState這個字段,其實不是,從代碼上下文看, //該標志位被設置為1的期間,嘗試去lock的線程可以去更改runState的其他位,比如信號位. //而lockRunState成功的線程則是緊接著去更改ctl控制信號,工作隊列等運行時數據,故可以稱runState在鎖標志這一塊 //可以理解為運行狀態鎖. private int lockRunState() { int rs; //runState已經是奇數,表示已經鎖上了,awaitRunState return ((((rs = runState) & RSLOCK) != 0 || //發現原本沒鎖,嘗試將原rs置為rs+1,即變為奇數. !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? //原來鎖了或者嘗試競態加鎖不成功,等待加鎖成功,否則直接返回rs. awaitRunStateLock() : rs); } //自旋或阻塞等待runstate鎖可用,這與上面的runState字段有關.也是上一個方法的自旋+阻塞實現. private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //每輪循環重讀rs. if (((rs = runState) & RSLOCK) == 0) { //1.發現rs還是偶數,嘗試將它置為奇數.(鎖) if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { //2,鎖成功后發現擾動了,則擾動當前線程,catch住不符合安全策略的情況. if (wasInterrupted) { try { //2.1擾動.它將影響到后面awaitWork方法的使用. Thread.currentThread().interrupt(); } catch (SecurityException ignore) { } } //2.2返回的是新的runStatus,相當于原+1,是個奇數. //注意,此方法中只有此處一個出口,也就是說必須要鎖到結果. return ns; } } //在1中發現被鎖了或者2處爭鎖競態失敗. else if (r == 0) //3.所有循環中只會執行一次,如果簡單去看,nextSecondarySeed是一個生成 //偽隨機數的代碼,它不會返回0值.r的初值是0. r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { //4.有自旋次數,則將r的值進行一些轉換并開啟下輪循環.默認spins是0,不會有自旋次數. //從源碼來看,自旋的唯一作用就是改變r的值,使之可能重新進入3,也會根據r的結果決定是否減 //少一次自旋. //r的算法,將當前r的后6位保留,用r的后26位與前26位異或被保存為r的前26位(a). //再將(a)的結果處理,r的前21位保持不變,后11位與前11位異或并保存為r的后11位(b). //再將(b)的結果處理,r的后7位保持不變,用前25位與后25位異或并保存為r的前25位(c) //個中數學原理,有興趣的研究一下吧. //顯然,自旋次數并不是循環次數,它只能決定進入6中鎖代碼塊前要運行至少幾輪循環. r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) //經過上面的折騰r還不小于0的,減少一個自旋次數. //自旋次數不是每次循環都減一,但減到0之后不代表方法停止循環,而是進入2(成功)或者6(阻塞). --spins; } //某一次循環,r不為0,不能進入3,自旋次數也不剩余,不能進入4.則到此. else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) //5.線程池的runState表示還未開啟,或者還未初始化偷鎖(stealCounter),說明 //還沒完成初始化,此處是初始化時的競態,直接讓出當前線程的執行權.等到重新獲取執行權時, //重新循環,讀取新的runState并進行. Thread.yield(); // initialization race else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {//可重入 //6.沒能對runState加鎖,也不是5中的初始化時競態的情況,嘗試加上信號位,以stealCounter進行加鎖. //顯然,這種加信號位的加法不會因為信號位而失敗,而會因為runState的其他字段比如鎖標識位失敗,這時 //重新開始循環即可. synchronized (lock) { //明顯的double check if ((runState & RSIGNAL) != 0) { //6.1當前pool的runState有信號位的值,說明沒有線程去釋放信號位. try { //6.2runState期間沒有被去除信號位,等待. lock.wait(); } catch (InterruptedException ie) { //6.3等待過程中發生異常,且不是記錄一個標記,在2處會因它中斷當前線程. if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else //6.4當前runState沒有信號位的值,說明被釋放了,順便喚醒等待同步塊的線程.讓他們繼續轉圈. lock.notifyAll(); } } } } //解鎖runState,前面解釋過,這個鎖可以理解為對runState的鎖標志位進行設定,而設定成功的結果就是可以改信號量ctl. //它會解鎖runState,并會用新的runState替換. private void unlockRunState(int oldRunState, int newRunState) { //首先嘗試cas.cas成功可能會導致上一個方法中進入同步塊的線程改走6.4喚醒阻塞線程. if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { //cas不成功,直接強制更改. Object lock = stealCounter; runState = newRunState;// 這一步可能清除掉信號位.使上一個方法中已進入同步塊的線程改走6.4 if (lock != null) //強制更換為新的運行狀態后,喚醒所有等待lock的線程. synchronized (lock) { lock.notifyAll(); } } }
上面的幾個方法是對runState字段進行操作的,并利用了信號位,鎖標識位,運行狀態位.
顯然,雖然可以不精確地說加鎖解鎖是對runState的鎖標識位進行設置,嚴格來說,這卻是為ctl/工作隊列等運行時數據服務的(后面再述),顯然精確說是對運行時數據的修改權限加鎖.
同樣的,加鎖過程采用自旋+阻塞的方式,整個循環中同時兼容了線程池還在初始化(處理方式讓出執行權),設定了自旋次數(處理方式,隨機數判斷要不要減少自旋次數,自旋次數降0前不會阻塞)這兩種情況,也順便在阻塞被擾動的情況下暫時忽略擾動,只在成功設置鎖標識位后順手負責擾動當前線程.
簡單剝離這三種情況,加鎖過程是一輪輪的循環,會嘗試設置鎖標識位,成功則返回新標識,不成功則去設置信號位(可能已經有其他線程設
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75118.html
摘要:前言在前面的文章框架之中梳理了框架的簡要運行格架和異常處理流程顯然要理解框架的調度包含工作竊取等思想需要去中了解而對于的拓展和使用則需要了解它的一些子類前文中偶爾會提到的一個子類直譯為計數的完成器前文也說過的并行流其實就是基于了框架實現因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的簡要運行格架和異常處理流程,顯然要理解ForkJoi...
摘要:這減輕了手動重復執行相同基準測試的痛苦,并簡化了獲取結果的流程。處理項目的代碼并從標有注釋的方法處生成基準測試程序。用和運行該基準測試得到以下結果。同時,和的基線測試結果也有略微的不同。 Java 8 已經發布一段時間了,許多開發者已經開始使用 Java 8。本文也將討論最新發布在 JDK 中的并發功能更新。事實上,JDK 中已經有多處java.util.concurrent 改動,但...
摘要:前言在前面的文章和響應式編程中提到了和后者毫無疑問是一個線程池前者則是一個類似經典定義的概念官方有一個非常無語的解釋就是運行在的一個任務抽象就是運行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的 前言 在前面的文章CompletableFuture和響應式編程中提到了ForkJoinTask和ForkJoinPool,后者毫無疑問是一個線程...
摘要:使用方式要把任務提交到線程池,必須創建的一個子類,其中是并行化任務產生的結果如果沒有結果使用類型。對一個子任務調用的話,可以使一個子任務重用當前線程,避免線程池中多分配一個任務帶來的開銷。 【概念 分支和并框架的目的是以遞歸的方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體的結果,它是ExecutorService的一個實現,它把子任務分配給線程池(Fork...
摘要:對于任務的分割,要求各個子任務之間相互獨立,能夠并行獨立地執行任務,互相之間不影響。是叉子分叉的意思,即將大任務分解成并行的小任務,是連接結合的意思,即將所有并行的小任務的執行結果匯總起來。使用方法會阻塞并等待子任務執行完并得到其結果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執行任務框架,思想是將大任務分解成小任務,然后小任務又可以繼續分解,然后每個小...
閱讀 2577·2021-10-25 09:45
閱讀 1239·2021-10-14 09:43
閱讀 2297·2021-09-22 15:23
閱讀 1519·2021-09-22 14:58
閱讀 1934·2019-08-30 15:54
閱讀 3539·2019-08-30 13:00
閱讀 1354·2019-08-29 18:44
閱讀 1571·2019-08-29 16:59