摘要:主要用于實現分而治之的算法,特別是分治之后遞歸調用的函數,例如等。最適合的是計算密集型的任務,如果存在,線程間同步,等會造成線程長時間阻塞的情況時,最好配合使用。
ForkJoinPool
commonPoolForkJoinPool 主要用于實現“分而治之”的算法,特別是分治之后遞歸調用的函數,例如 quick sort 等。
ForkJoinPool 最適合的是計算密集型的任務,如果存在 I/O,線程間同步,sleep() 等會造成線程長時間阻塞的情況時,最好配合使用 ManagedBlocker。
static { // initialize field offsets for CAS etc try { U = sun.misc.Unsafe.getUnsafe(); Class> k = ForkJoinPool.class; CTL = U.objectFieldOffset (k.getDeclaredField("ctl")); RUNSTATE = U.objectFieldOffset (k.getDeclaredField("runState")); STEALCOUNTER = U.objectFieldOffset (k.getDeclaredField("stealCounter")); Class> tk = Thread.class; PARKBLOCKER = U.objectFieldOffset (tk.getDeclaredField("parkBlocker")); Class> wk = WorkQueue.class; QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); QSCANSTATE = U.objectFieldOffset (wk.getDeclaredField("scanState")); QPARKER = U.objectFieldOffset (wk.getDeclaredField("parker")); QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); QCURRENTJOIN = U.objectFieldOffset (wk.getDeclaredField("currentJoin")); Class> ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } commonMaxSpares = DEFAULT_COMMON_MAX_SPARES; defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); modifyThreadPermission = new RuntimePermission("modifyThread"); common = java.security.AccessController.doPrivileged (new java.security.PrivilegedActionmakeCommonPool() { public ForkJoinPool run() { return makeCommonPool(); }}); int par = common.config & SMASK; // report 1 even if threads disabled commonParallelism = par > 0 ? par : 1; }
/** * Creates and returns the common pool, respecting user settings * specified via system properties. */ private static ForkJoinPool makeCommonPool() { int parallelism = -1; ForkJoinWorkerThreadFactory factory = null; UncaughtExceptionHandler handler = null; try { // ignore exceptions in accessing/parsing properties String pp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.parallelism"); String fp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.threadFactory"); String hp = System.getProperty ("java.util.concurrent.ForkJoinPool.common.exceptionHandler"); if (pp != null) parallelism = Integer.parseInt(pp); if (fp != null) factory = ((ForkJoinWorkerThreadFactory)ClassLoader. getSystemClassLoader().loadClass(fp).newInstance()); if (hp != null) handler = ((UncaughtExceptionHandler)ClassLoader. getSystemClassLoader().loadClass(hp).newInstance()); } catch (Exception ignore) { } if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); } if (parallelism < 0 && // default 1 less than #cores (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0) parallelism = 1; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-"); }配置參數
通過代碼指定,必須得在commonPool初始化之前(parallel的stream被調用之前,一般可在系統啟動后設置)注入進去,否則無法生效。
通過啟動參數指定無此限制,較為安全
parallelism(即配置線程池個數)
可以通過java.util.concurrent.ForkJoinPool.common.parallelism進行配置,最大值不能超過MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767
如果沒有指定,則默認為Runtime.getRuntime().availableProcessors() - 1.
代碼指定(必須得在commonPool初始化之前注入進去,否則無法生效)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
或者參數指定
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
threadFactory
默認為defaultForkJoinWorkerThreadFactory,沒有securityManager的話。
/** * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread. */ static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } }
代碼指定(必須得在commonPool初始化之前注入進去,否則無法生效)
System.setProperty("java.util.concurrent.ForkJoinPool.common.threadFactory",YourForkJoinWorkerThreadFactory.class.getName());
參數指定
-Djava.util.concurrent.ForkJoinPool.common.threadFactory=com.xxx.xxx.YourForkJoinWorkerThreadFactory
exceptionHandler
如果沒有設置,默認為null
/** * Callback from ForkJoinWorkerThread constructor to establish and * record its WorkQueue. * * @param wt the worker thread * @return the worker"s queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
代碼指定(必須得在commonPool初始化之前注入進去,否則無法生效)
System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",YourUncaughtExceptionHandler.class.getName());
參數指定
-Djava.util.concurrent.ForkJoinPool.common.exceptionHandler=com.xxx.xxx.YourUncaughtExceptionHandlerWorkQueue
// Mode bits for ForkJoinPool.config and WorkQueue.config static final int MODE_MASK = 0xffff << 16; // top half of int static final int LIFO_QUEUE = 0; static final int FIFO_QUEUE = 1 << 16; static final int SHARED_QUEUE = 1 << 31; // must be negative
控制是FIFO還是LIFO
/** * Takes next task, if one exists, in order specified by mode. */ final ForkJoinTask> nextLocalTask() { return (config & FIFO_QUEUE) == 0 ? pop() : poll(); }
ForkJoinPool 的每個工作線程都維護著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(ForkJoinTask)。
每個工作線程在運行中產生新的任務(通常是因為調用了 fork())時,會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。
每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(或是來自于剛剛提交到 pool的任務,或是來自于其他工作線程的工作隊列),竊取的任務位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務時,使用的是 FIFO 方式。
queue capacity
/** * Capacity of work-stealing queue array upon initialization. * Must be a power of two; at least 4, but should be larger to * reduce or eliminate cacheline sharing among queues. * Currently, it is much larger, as a partial workaround for * the fact that JVMs often place arrays in locations that * share GC bookkeeping (especially cardmarks) such that * per-write accesses encounter serious memory contention. */ static final int INITIAL_QUEUE_CAPACITY = 1 << 13; /** * Maximum size for queue arrays. Must be a power of two less * than or equal to 1 << (31 - width of array entry) to ensure * lack of wraparound of index calculations, but defined to a * value a bit less than this to help users trap runaway * programs before saturating systems. */ static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
超出報異常
/** * Initializes or doubles the capacity of array. Call either * by owner or with lock held -- it is OK for base, but not * top, to move while resizings are in progress. */ final ForkJoinTask>[] growArray() { ForkJoinTask>[] oldA = array; int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; ForkJoinTask>[] a = array = new ForkJoinTask>[size]; if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; do { // emulate poll from old array, push to new array ForkJoinTask> x; int oldj = ((b & oldMask) << ASHIFT) + ABASE; int j = ((b & mask) << ASHIFT) + ABASE; x = (ForkJoinTask>)U.getObjectVolatile(oldA, oldj); if (x != null && U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); } while (++b != t); } return a; }doc
Java 并發編程筆記:如何使用 ForkJoinPool 以及原理
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66704.html
摘要:實際上,在并行流上使用新的方法。此外,我們了解到所有并行流操作共享相同的范圍。因此,您可能希望避免實施慢速阻塞流操作,因為這可能會減慢嚴重依賴并行流的應用程序的其他部分。 流可以并行執行,以增加大量輸入元素的運行時性能。并行流ForkJoinPool通過靜態ForkJoinPool.commonPool()方法使用公共可用的流。底層線程池的大小最多使用五個線程 - 具體取決于可用物理C...
默認使用的線程池 不傳executor時默認使用ForkJoinPool.commonPool() IntStream.range(0, 15).parallel().forEach(i -> { System.out.println(Thread.currentThread()); }); 輸出 Thread[ForkJoinPool.commonPoo...
摘要:并發教程原子變量和原文譯者飛龍協議歡迎閱讀我的多線程編程系列教程的第三部分。如果你能夠在多線程中同時且安全地執行某個操作,而不需要關鍵字或上一章中的鎖,那么這個操作就是原子的。當多線程的更新比讀取更頻繁時,這個類通常比原子數值類性能更好。 Java 8 并發教程:原子變量和 ConcurrentMap 原文:Java 8 Concurrency Tutorial: Synchroni...
摘要:在有些情況下,原子操作可以在不使用關鍵字和鎖的情況下解決多線程安全問題。但其內部的結果不是一個單一的值這個類的內部維護了一組變量來減少多線程的爭用。當來自多線程的更新比讀取更頻繁時這個類往往優于其他的原子類。 原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap AtomicInteger java...
摘要:一個簡單的爬蟲代碼已托管這里有一個簡單的例子根據提供的種子爬取數據指定對應的抓取規則自己定義抓取的鏈接簡單的控制臺打印結果建筑工地上的青年如何自我成長知乎國內專做進口行業的公司多不不包括貨代公司知乎如何有效地進行后天 Web Spider 一個簡單的爬蟲 showImg(https://segmentfault.com/img/bVbckso?w=939&h=813); 代碼已托管 這...
閱讀 3129·2021-11-08 13:18
閱讀 2276·2019-08-30 15:55
閱讀 3602·2019-08-30 15:44
閱讀 3063·2019-08-30 13:07
閱讀 2774·2019-08-29 17:20
閱讀 1942·2019-08-29 13:03
閱讀 3403·2019-08-26 10:32
閱讀 3218·2019-08-26 10:15