摘要:并不會(huì)為每個(gè)任務(wù)都創(chuàng)建工作線程,而是根據(jù)實(shí)際情況構(gòu)造線程池時(shí)的參數(shù)確定是喚醒已有空閑工作線程,還是新建工作線程。
本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog...一、引言
前一章——Fork/Join框架(1) 原理,我們從整體上對(duì)Fork/Join框架作了介紹。
回顧一下,F(xiàn)ork/Join框架的核心實(shí)現(xiàn)類是ForkJoinPool線程池,其它核心組件包括:ForkJoinTask(任務(wù))、ForkJoinWorkerThread(工作線程)、WorkQueue(任務(wù)隊(duì)列)。
這一章,我們將深入F/J框架的實(shí)現(xiàn)細(xì)節(jié),看看ForkJoinPool線程池究竟有何特殊之處,F(xiàn)/J框架的整個(gè)任務(wù)調(diào)度流程又是怎樣的。
二、任務(wù)調(diào)度流程在開(kāi)始之前,先來(lái)看下下面這張圖:
上圖包含了F/J框架的整個(gè)任務(wù)調(diào)度流程,這里先簡(jiǎn)要介紹下,以便讀者在有個(gè)印象,后續(xù)的源碼分析將完全按照這張圖進(jìn)行。
F/J框架調(diào)度任務(wù)的流程一共可以分為四大部分。
任務(wù)提交任務(wù)提交是整個(gè)調(diào)度流程的第一步,F(xiàn)/J框架所調(diào)度的任務(wù)來(lái)源有兩種:
①外部提交任務(wù)
所謂外部提交任務(wù),是指通過(guò)ForkJoinPool的execute/submit/invoke方法提交的任務(wù),或者非工作線程(ForkJoinWorkerThread)直接調(diào)用ForkJoinTask的fork/invoke方法提交的任務(wù):
外部提交的任務(wù)的特點(diǎn)就是調(diào)用線程是非工作線程。這個(gè)過(guò)程涉及以下方法:
ForkJoinPool.submit
ForkJoinPool.invoke
ForkJoinPool.execute
ForkJoinTask.fork
ForkJoinTask.invoke
ForkJoinPool.externalPush
ForkJoinPool.externalSubmit
②工作線程fork任務(wù)
所謂工作線程fork任務(wù),是指由ForkJoinPool所維護(hù)的工作線程(ForkJoinWorkerThread)從自身任務(wù)隊(duì)列中獲取任務(wù)(或從其它任務(wù)隊(duì)列竊取),然后執(zhí)行任務(wù)。
工作線程fork任務(wù)的特點(diǎn)就是調(diào)用線程是工作線程。這個(gè)過(guò)程涉及以下方法:
ForkJoinTask.doExec
WorkQueue.push
創(chuàng)建工作線程任務(wù)提交完成后,F(xiàn)orkJoinPool會(huì)根據(jù)情況創(chuàng)建或喚醒工作線程,以便執(zhí)行任務(wù)。
ForkJoinPool并不會(huì)為每個(gè)任務(wù)都創(chuàng)建工作線程,而是根據(jù)實(shí)際情況(構(gòu)造線程池時(shí)的參數(shù))確定是喚醒已有空閑工作線程,還是新建工作線程。這個(gè)過(guò)程還是涉及任務(wù)隊(duì)列的綁定、工作線程的注銷等過(guò)程:
ForkJoinPool.signalWork
ForkJoinPool.tryAddWorker
ForkJoinPool.createWorker
ForkJoinWorkerThread.registerWorker
ForkJoinPool.deregisterWorker
任務(wù)執(zhí)行任務(wù)入隊(duì)后,由工作線程開(kāi)始執(zhí)行,這個(gè)過(guò)程涉及任務(wù)竊取、工作線程等待等過(guò)程:
ForkJoinWorkerThread.run
ForkJoinPool.runWorker
ForkJoinPool.scan
ForkJoinPool.runTask
ForkJoinTask.doExec
ForkJoinPool.execLocalTasks
ForkJoinPool.awaitWork
任務(wù)結(jié)果獲取任務(wù)結(jié)果一般通過(guò)ForkJoinTask的join方法獲得,其主要流程如下圖:
任務(wù)結(jié)果獲取的核心涉及兩點(diǎn):
互助竊取:ForkJoinPool.helpStealer
算力補(bǔ)償:ForkJoinPool.tryCompensate
三、源碼分析通過(guò)第二部分,大致了解了F/J框架調(diào)度任務(wù)的流程,我們來(lái)看下源碼實(shí)現(xiàn)。
任務(wù)提交①外部提交任務(wù)
我們通過(guò)ForkJoinPool的submit(ForkJoinTask
publicForkJoinTask submit(ForkJoinTask task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; }
ForkJoinPool.submit內(nèi)部調(diào)用了externalPush方法:
final void externalPush(ForkJoinTask> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; // m & r & SQMASK必為偶數(shù),所以通過(guò)externalPush方法提交的任務(wù)都添加到了偶數(shù)索引的任務(wù)隊(duì)列中(沒(méi)有綁定的工作線程) if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putIntVolatile(q, QLOCK, 0); if (n <= 1) // 隊(duì)列里只有一個(gè)任務(wù) signalWork(ws, q); // 創(chuàng)建或激活一個(gè)工作線程 return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } // 未命中任務(wù)隊(duì)列時(shí)(WorkQueue == null 或 WorkQueue[i] == null),會(huì)進(jìn)入該方法 externalSubmit(task); }
當(dāng)我們首次創(chuàng)建了ForkJoinPool時(shí),任務(wù)隊(duì)列數(shù)組并沒(méi)有初始化,只有當(dāng)首次提交任務(wù)時(shí),才會(huì)初始化。
externalPush方法包含兩部分:
根據(jù)線程隨機(jī)變量、任務(wù)隊(duì)列數(shù)組信息,計(jì)算命中槽(即本次提交的任務(wù)應(yīng)該添加到任務(wù)隊(duì)列數(shù)組中的哪個(gè)隊(duì)列),如果命中且隊(duì)列中任務(wù)數(shù)<1,則創(chuàng)建或激活一個(gè)工作線程;
否則,調(diào)用externalSubmit初始化隊(duì)列,并入隊(duì)。
/** * 完整版本的externalPush. * 處理線程池提交任務(wù)時(shí)未命中隊(duì)列的情況和異常情況. */ private void externalSubmit(ForkJoinTask> task) { int r; // 線程相關(guān)的隨機(jī)數(shù) if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (; ; ) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; // CASE1: 線程池已關(guān)閉 if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } // CASE2: 初始化線程池 else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // 初始化工作隊(duì)列數(shù)組, 數(shù)組大小必須為2的冪次 int p = config & SMASK; int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; // 線程池狀態(tài)轉(zhuǎn)化為STARTED } } finally { unlockRunState(rs, (rs & ~RSLOCK) | ns); } } // CASE3: 入隊(duì)任務(wù) else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } // CASE4: 創(chuàng)建一個(gè)任務(wù)隊(duì)列 else if (((rs = runState) & RSLOCK) == 0) { q = new WorkQueue(this, null); q.hint = r; q.config = k | SHARED_QUEUE; // k為任務(wù)隊(duì)列在隊(duì)列數(shù)組中的索引: k == r & m & SQMASK, 在CASE3的IF判斷中賦值 q.scanState = INACTIVE; // 任務(wù)隊(duì)列狀態(tài)為INACTIVE rs = lockRunState(); if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
externalSubmit方法的邏輯很清晰,一共分為4種情況:
CASE1:線程池已經(jīng)關(guān)閉,則執(zhí)行終止操作,并拒絕該任務(wù)的提交;
CASE2:線程池未初始化,則進(jìn)行初始化,主要就是初始化任務(wù)隊(duì)列數(shù)組;
CASE3:命中了任務(wù)隊(duì)列,則將任務(wù)入隊(duì),并嘗試創(chuàng)建/喚醒一個(gè)工作線程(Worker);
CASE4:未命中任務(wù)隊(duì)列,則在偶數(shù)索引處創(chuàng)建一個(gè)任務(wù)隊(duì)列
②工作線程fork任務(wù)
工作線程fork的任務(wù)其實(shí)就是子任務(wù),ForkJoinTask.fork方法完成。
看下ForkJoinTask.fork方法,當(dāng)調(diào)用線程為工作線程時(shí),直接添加到其自身隊(duì)列中:
public final ForkJoinTaskfork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) // 如果調(diào)用線程為【工作線程】 ((ForkJoinWorkerThread) t).workQueue.push(this); // 直接添加到線程的自身隊(duì)列中 else ForkJoinPool.common.externalPush(this); // 外部(其它線程)提交的任務(wù) return this; }
WorkQueue.push方法,任務(wù)存入自身隊(duì)列的棧頂(top):
final void push(ForkJoinTask> task) { ForkJoinTask>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); // 任務(wù)存入棧頂(top+1) if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); // 喚醒或創(chuàng)建一個(gè)工作線程 } else if (n >= m) growArray(); // 擴(kuò)容 } }
如果當(dāng)前 WorkQueue 為新建的等待隊(duì)列(top - base <= 1),則調(diào)用signalWork方法為當(dāng)前 WorkQueue 新建或喚醒一個(gè)工作線程;創(chuàng)建工作線程
如果 WorkQueue 中的任務(wù)數(shù)組容量過(guò)小,則調(diào)用growArray方法對(duì)其進(jìn)行兩倍擴(kuò)容,
從流程圖可以看出,任務(wù)提交后,會(huì)調(diào)用signalWork方法創(chuàng)建或喚醒一個(gè)工作線程,該方法的核心其實(shí)就兩個(gè)分支:
工作線程數(shù)不足:創(chuàng)建一個(gè)工作線程;
工作線程數(shù)足夠:?jiǎn)拘岩粋€(gè)空閑(阻塞)的工作線程。
/** * 嘗試創(chuàng)建或喚醒一個(gè)工作線程. * * @param ws 任務(wù)隊(duì)列數(shù)組 * @param q 當(dāng)前操作的任務(wù)隊(duì)列WorkQueue */ final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active // CASE1: 工作線程數(shù)不足 if ((sp = (int) c) == 0) { if ((c & ADD_WORKER) != 0L) tryAddWorker(c); // 增加工作線程 break; } // CASE2: 存在空閑工作線程,則喚醒 if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } }
先來(lái)看創(chuàng)建工作線程的方法tryAddWorker,其實(shí)就是設(shè)置下字段值(活躍/總工作線程池?cái)?shù)),然后調(diào)用createWorker真正創(chuàng)建一個(gè)工作線程:
private void tryAddWorker(long c) { boolean add = false; do { // 設(shè)置活躍工作線程數(shù)、總工作線程池?cái)?shù) long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; // 創(chuàng)建工作線程 if (add) { createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int) c == 0); } ? private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { // 使用線程池工廠創(chuàng)建線程 if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); // 啟動(dòng)線程 return true; } } catch (Throwable rex) { ex = rex; } // 創(chuàng)建出現(xiàn)異常,則注銷該工作線程 deregisterWorker(wt, ex); return false; }
如果創(chuàng)建過(guò)程中出現(xiàn)異常,則調(diào)用deregisterWorker注銷線程:
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; // 1.移除workQueue if (wt != null && (w = wt.workQueue) != null) { // 獲取ForkJoinWorkerThread的等待隊(duì)列 WorkQueue[] ws; int idx = w.config & SMASK; // 計(jì)算workQueue索引 int rs = lockRunState(); // 獲取runState鎖和當(dāng)前池運(yùn)行狀態(tài) if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; // 移除workQueue unlockRunState(rs, rs & ~RSLOCK); // 解除runState鎖 } // 2.減少CTL數(shù) long c; // decrement counts do { } while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); // 3.處理被移除workQueue內(nèi)部相關(guān)參數(shù) if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } // 4.如果線程未終止,替換被移除的workQueue并喚醒內(nèi)部線程 for (; ; ) { // possibly replace WorkQueue[] ws; int m, sp; // 嘗試終止線程池 if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; // 喚醒被替換的線程,依賴于下一步 if ((sp = (int) (c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } // 創(chuàng)建工作線程替換 else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don"t need replacement break; } // 5.處理異常 if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); }
deregisterWorker方法用于工作線程運(yùn)行完畢之后終止線程或處理工作線程異常,主要就是清除已關(guān)閉的工作線程或回滾創(chuàng)建線程之前的操作,并把傳入的異常拋給 ForkJoinTask 來(lái)處理。
工作線程在構(gòu)造的過(guò)程中,會(huì)保存線程池信息和與自己綁定的任務(wù)隊(duì)列信息。它通過(guò)ForkJoinPool.registerWorker方法將自己注冊(cè)到線程池中:
protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }
final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); // 創(chuàng)建一個(gè)工作隊(duì)列, 并于該工作線程綁定 WorkQueue w = new WorkQueue(this, wt); int i = 0; // 記錄隊(duì)列在任務(wù)隊(duì)列數(shù)組中的索引, 始終為奇數(shù) int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // 經(jīng)計(jì)算后, i為奇數(shù) if (ws[i] != null) { // 槽沖突, 即WorkQueue[i]位置已經(jīng)有了任務(wù)隊(duì)列 // 重新計(jì)算索引i int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } // 設(shè)置隊(duì)列狀態(tài)為SCANNING w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
前文講過(guò),工作線程(Worker)自身的任務(wù)隊(duì)列,其數(shù)組下標(biāo)始終是奇數(shù),registerWorker方法的主要作用就是在任務(wù)隊(duì)列數(shù)組WorkQueue[]找到一個(gè)空的奇數(shù)位,然后在該位置創(chuàng)建WorkQueue。
至此,線程池的任務(wù)提交工作和工作線程創(chuàng)建工作就全部完成了,接下來(lái)開(kāi)始工作線程的執(zhí)行。
任務(wù)執(zhí)行ForkJoinWorkerThread啟動(dòng)后,會(huì)執(zhí)行它的run方法,該方法內(nèi)部調(diào)用了ForkJoinPool.runWorker方法來(lái)執(zhí)行任務(wù):
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); // 鉤子方法 pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }
runWorker方法的核心流程如下:
scan:嘗試獲取一個(gè)任務(wù);
runTask:執(zhí)行取得的任務(wù);
awaitWork:沒(méi)有任務(wù)則阻塞。
final void runWorker(WorkQueue w) { w.growArray(); // 初始化任務(wù)隊(duì)列 int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask> t; ; ) { // CASE1: 嘗試獲取一個(gè)任務(wù) if ((t = scan(w, r)) != null) w.runTask(t); // 獲取成功, 執(zhí)行任務(wù) // CASE2: 獲取失敗, 阻塞等待任務(wù)入隊(duì) else if (!awaitWork(w, r)) // 等待失敗, 跳出該方法后, 工作線程會(huì)被注銷 break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }
注意:如果awaitWork返回false,等不到任務(wù),則跳出runWorker的循環(huán),回到run中執(zhí)行finally,最后調(diào)用deregisterWorker注銷工作線程。
任務(wù)竊取——scan
private ForkJoinTask> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) { WorkQueue q; ForkJoinTask>[] a; ForkJoinTask> t; int b, n; long c; // 根據(jù)隨機(jī)數(shù)r定位一個(gè)任務(wù)隊(duì)列 if ((q = ws[k]) != null) { // 獲取workQueue if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = ((ForkJoinTask>) U.getObjectVolatile(a, i))) != null && // 取base位置任務(wù) q.base == b) { // 成功獲取到任務(wù) if (ss >= 0) { if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; // 更新base位 if (n < -1) signalWork(ws, q); // 創(chuàng)建或喚醒工作線程來(lái)運(yùn)行任務(wù) return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int) c], AC_UNIT); // 喚醒棧頂工作線程 } // base位置任務(wù)為空或base位置偏移,隨機(jī)移位重新掃描 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } if ((k = (k + 1) & m) == origin) { // continue until stable // 運(yùn)行到這里說(shuō)明已經(jīng)掃描了全部的 workQueues,但并未掃描到任務(wù) if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; // 對(duì)當(dāng)前WorkQueue進(jìn)行滅活操作 int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int) c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }
掃描并嘗試偷取一個(gè)任務(wù)。隨機(jī)選擇一個(gè)WorkQueue,獲取base位的 ForkJoinTask,成功取到后更新base位并返回任務(wù);如果取到的 WorkQueue 中任務(wù)數(shù)大于1,則調(diào)用signalWork創(chuàng)建或喚醒其他工作線程。
阻塞等待——awaitWork
如果scan方法未掃描到任務(wù),會(huì)調(diào)用awaitWork等待獲取任務(wù):
private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating return false; for (int pred = w.stackPred, spins = SPINS, ss; ; ) { if ((ss = w.scanState) >= 0) // 正在掃描,跳出循環(huán) break; else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && (v = ws[j]) != null && // see if pred parking (v.parker == null || v.scanState >= 0)) spins = SPINS; // continue spinning } } else if (w.qlock < 0) // 當(dāng)前workQueue已經(jīng)終止,返回false recheck after spins return false; else if (!Thread.interrupted()) { // 判斷線程是否被中斷,并清除中斷狀態(tài) long c, prevctl, parkTime, deadline; int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK); // 活躍線程數(shù) if ((ac <= 0 && tryTerminate(false, false)) || // 無(wú)active線程,嘗試終止 (runState & STOP) != 0) // pool terminating return false; if (ac <= 0 && ss == (int) c) { // is last waiter // 計(jì)算活躍線程數(shù)(高32位)并更新為下一個(gè)棧頂?shù)膕canState(低32位) prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); int t = (short) (c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//總線程過(guò)量 return false; // else use timed wait // 計(jì)算空閑超時(shí)時(shí)間 parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else prevctl = parkTime = deadline = 0L; Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; // 設(shè)置parker,準(zhǔn)備阻塞 if (w.scanState < 0 && ctl == c) // recheck before park U.park(false, parkTime); // 阻塞指定的時(shí)間 U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) // 正在掃描,說(shuō)明等到任務(wù),跳出循環(huán) break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) // 未等到任務(wù),更新ctl,返回false return false; // shrink pool } } return true; }
任務(wù)執(zhí)行——runTask
竊取到任務(wù)后,調(diào)用WorkQueue.runTask方法執(zhí)行任務(wù):
final void runTask(ForkJoinTask> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); // 更新currentSteal并執(zhí)行任務(wù) U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); // 依次執(zhí)行本地任務(wù) ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); // 增加偷取任務(wù)數(shù) scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); // 執(zhí)行鉤子函數(shù) } }
1.首先調(diào)用FutureTask.deExec()執(zhí)行任務(wù),其內(nèi)部會(huì)調(diào)用FutureTask.exec()方法,該方法為抽象方法,由子類實(shí)現(xiàn)。
子類實(shí)現(xiàn)該方法時(shí),一般會(huì)進(jìn)行fork,導(dǎo)致生成子任務(wù),并最終添加到調(diào)用線程自身地任務(wù)隊(duì)列中:
final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); // exec為抽象方法, 由子類實(shí)現(xiàn) } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; }
2.除了執(zhí)行竊取到的任務(wù),工作線程還會(huì)執(zhí)行自己隊(duì)列中的任務(wù),即WorkQueue.execLocalTasks方法:
final void execLocalTasks() { int b = base, m, s; ForkJoinTask>[] a = array; if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { if ((config & FIFO_QUEUE) == 0) { // LIFO, 從top -> base 遍歷執(zhí)行任務(wù) for (ForkJoinTask> t; ; ) { if ((t = (ForkJoinTask>) U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) break; U.putOrderedInt(this, QTOP, s); t.doExec(); if (base - (s = top - 1) > 0) break; } } else // FIFO, 從base -> top 遍歷執(zhí)行任務(wù) pollAndExecAll(); } }
構(gòu)建線程池時(shí)的asyncMode參數(shù),決定了工作線程執(zhí)行自身隊(duì)列中的任務(wù)的方式。如果 asyncMode == true,則以FIFO的方式執(zhí)行任務(wù);否則,以LIFO的方式執(zhí)行任務(wù)。任務(wù)結(jié)果獲取
ForkJoinTask.join()可以用來(lái)獲取任務(wù)的執(zhí)行結(jié)果。join方法的執(zhí)行邏輯如下:
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
可以看到,內(nèi)部先調(diào)用doJoin方法:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
?
doJoin方法會(huì)判斷調(diào)用線程是否是工作線程:
1.如果是非工作線程調(diào)用的join,則最終調(diào)用externalAwaitDone()阻塞等待任務(wù)的完成。
2.如果是工作線程調(diào)用的join,則存在以下情況:
如果需要join的任務(wù)已經(jīng)完成,直接返回運(yùn)行結(jié)果;
如果需要join的任務(wù)剛剛好是當(dāng)前線程所擁有的隊(duì)列的top位置,則立即執(zhí)行它。
如果該任務(wù)不在top位置,則調(diào)用awaitJoin方法等待
關(guān)鍵看下ForkJoinPool.awaitJoin等待過(guò)程中發(fā)生了什么:
final int awaitJoin(WorkQueue w, ForkJoinTask> task, long deadline) { int s = 0; if (task != null && w != null) { ForkJoinTask> prevJoin = w.currentJoin; // 獲取給定Worker的join任務(wù) U.putOrderedObject(w, QCURRENTJOIN, task); // 把currentJoin替換為給定任務(wù) // 判斷是否為CountedCompleter類型的任務(wù) CountedCompleter> cc = (task instanceof CountedCompleter) ? (CountedCompleter>) task : null; for (; ; ) { if ((s = task.status) < 0) // 已經(jīng)完成|取消|異常 跳出循環(huán) break; if (cc != null) // CountedCompleter任務(wù)由helpComplete來(lái)完成join helpComplete(w, cc, 0); else if (w.base == w.top || w.tryRemoveAndExec(task)) //嘗試執(zhí)行 helpStealer(w, task); // 隊(duì)列為空或執(zhí)行失敗,任務(wù)可能被偷,幫助偷取者執(zhí)行該任務(wù) if ((s = task.status) < 0) // 已經(jīng)完成|取消|異常,跳出循環(huán) break; // 計(jì)算任務(wù)等待時(shí)間 long ms, ns; if (deadline == 0L) ms = 0L; else if ((ns = deadline - System.nanoTime()) <= 0L) break; else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L) ms = 1L; if (tryCompensate(w)) { // 執(zhí)行補(bǔ)償操作 task.internalWait(ms); // 補(bǔ)償執(zhí)行成功,任務(wù)等待指定時(shí)間 U.getAndAddLong(this, CTL, AC_UNIT); // 更新活躍線程數(shù) } } U.putOrderedObject(w, QCURRENTJOIN, prevJoin); // 循環(huán)結(jié)束,替換為原來(lái)的join任務(wù) } return s; }
ForkJoinPool.awaitJoin方法中有三個(gè)重要方法:
tryRemoveAndExec
helpStealer
tryCompensate
這里說(shuō)下這三個(gè)方法的主要作用,不貼代碼了:
tryRemoveAndExec:
當(dāng)工作線程正在等待join的任務(wù)時(shí),它會(huì)從top位開(kāi)始自旋向下查找該任務(wù):
如果找到則移除并執(zhí)行它;
如果找不到,說(shuō)明說(shuō)明任務(wù)可能被偷,則調(diào)用helpStealer方法反過(guò)來(lái)幫助偷取者執(zhí)行它自己的任務(wù)。
helpStealer:
先定位的偷取者的任務(wù)隊(duì)列;
從偷取者的base索引開(kāi)始,每次偷取一個(gè)任務(wù)執(zhí)行。
tryCompensate:
tryCompensate主要用來(lái)補(bǔ)償工作線程因?yàn)樽枞鴮?dǎo)致的算力損失,當(dāng)工作線程自身的隊(duì)列不為空,且還有其它空閑工作線程時(shí),如果自己阻塞了,則在此之前會(huì)喚醒一個(gè)工作線程。
四、總結(jié)本章和上一章——Fork/Join框架(1) 原理,從思想、使用、實(shí)現(xiàn)等方面較完整地分析了Fork/Join框架,F(xiàn)ork/Join框架的使用需要根據(jù)實(shí)際情況劃分子任務(wù)的大小。
理解F/J框架需要先從整體上了解框架調(diào)度任務(wù)的流程(參考本章開(kāi)頭的調(diào)度圖),可以自己通過(guò)示例模擬一個(gè)任務(wù)的調(diào)度過(guò)程,然后根據(jù)實(shí)際運(yùn)用過(guò)程中遇到的問(wèn)題,再去調(diào)試及在相應(yīng)的源碼中查看實(shí)現(xiàn)原理。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/71970.html
摘要:同時(shí),它會(huì)通過(guò)的方法將自己注冊(cè)到線程池中。線程池中的每個(gè)工作線程都有一個(gè)自己的任務(wù)隊(duì)列,工作線程優(yōu)先處理自身隊(duì)列中的任務(wù)或順序,由線程池構(gòu)造時(shí)的參數(shù)決定,自身隊(duì)列為空時(shí),以的順序隨機(jī)竊取其它隊(duì)列中的任務(wù)。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見(jiàn)的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:注意線程與本地操作系統(tǒng)的線程是一一映射的。固定線程數(shù)的線程池提供了兩種創(chuàng)建具有固定線程數(shù)的的方法,固定線程池在初始化時(shí)確定其中的線程總數(shù),運(yùn)行過(guò)程中會(huì)始終維持線程數(shù)量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...
摘要:本文首發(fā)于一世流云的專欄一模式簡(jiǎn)介模式是多線程設(shè)計(jì)模式中的一種常見(jiàn)模式,它的主要作用就是異步地執(zhí)行任務(wù),并在需要的時(shí)候獲取結(jié)果。二中的模式在多線程基礎(chǔ)之模式中,我們?cè)?jīng)給出過(guò)模式的通用類關(guān)系圖。 showImg(https://segmentfault.com/img/bVbiwcx?w=1000&h=667); 本文首發(fā)于一世流云的專欄:https://segmentfault.co...
摘要:這里呢,我直接給出高并發(fā)場(chǎng)景通常都會(huì)考慮的一些解決思路和手段結(jié)尾如何有效的準(zhǔn)備面試中并發(fā)類問(wèn)題,我已經(jīng)給出我的理解。 showImg(https://segmentfault.com/img/bV7Viy?w=550&h=405); 主題 又到面試季了,從群里,看到許多同學(xué)分享了自己的面試題目,我也抽空在網(wǎng)上搜索了一些許多公司使用的面試題,目前校招和社招的面試題基本都集中在幾個(gè)大方向上...
閱讀 2109·2023-04-26 00:50
閱讀 2479·2021-10-13 09:39
閱讀 2201·2021-09-22 15:34
閱讀 1605·2021-09-04 16:41
閱讀 1336·2019-08-30 15:55
閱讀 2433·2019-08-30 15:53
閱讀 1707·2019-08-30 15:52
閱讀 748·2019-08-29 16:19