摘要:序本文主要來展示一下簡版的線程池的實(shí)現(xiàn)。默認(rèn)提供了幾個工廠方法思路主要用到的是雙端隊列,不過這里我們粗糙的實(shí)現(xiàn)的話,也可以不用到。測試實(shí)例輸出從數(shù)據(jù)來看,還是相對均勻的。
序
本文主要來展示一下簡版的work stealing線程池的實(shí)現(xiàn)。
ExecutorsExecutors默認(rèn)提供了幾個工廠方法
/** * Creates a thread pool that maintains enough threads to support * the given parallelism level, and may use multiple queues to * reduce contention. The parallelism level corresponds to the * maximum number of threads actively engaged in, or available to * engage in, task processing. The actual number of threads may * grow and shrink dynamically. A work-stealing pool makes no * guarantees about the order in which submitted tasks are * executed. * * @param parallelism the targeted parallelism level * @return the newly created thread pool * @throws IllegalArgumentException if {@code parallelism <= 0} * @since 1.8 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } /** * Creates a work-stealing thread pool using all * {@link Runtime#availableProcessors available processors} * as its target parallelism level. * @return the newly created thread pool * @see #newWorkStealingPool(int) * @since 1.8 */ public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }思路
ForkJoinPool主要用到的是雙端隊列,不過這里我們粗糙的實(shí)現(xiàn)的話,也可以不用到deque。
public class WorkStealingChannel{ private static final Logger LOGGER = LoggerFactory.getLogger(WorkStealingChannel.class); BlockingDeque [] managedQueues; AtomicLongMap stat = AtomicLongMap.create(); public WorkStealingChannel() { int nCPU = Runtime.getRuntime().availableProcessors(); int queueCount = nCPU / 2 + 1; managedQueues = new LinkedBlockingDeque[queueCount]; for(int i=0;i (); } } public void put(T item) throws InterruptedException { int targetIndex = Math.abs(item.hashCode() % managedQueues.length); BlockingQueue targetQueue = managedQueues[targetIndex]; targetQueue.put(item); } public T take() throws InterruptedException { int rdnIdx = ThreadLocalRandom.current().nextInt(managedQueues.length); int idx = rdnIdx; while (true){ idx = idx % managedQueues.length; T item = null; if(idx == rdnIdx){ item = managedQueues[idx].poll(); }else{ item = managedQueues[idx].pollLast(); } if(item != null){ LOGGER.info("take ele from queue {}",idx); stat.addAndGet(idx,1); return item; } idx++; if(idx == rdnIdx){ break; } } //走完一輪沒有,則隨機(jī)取一個等待 LOGGER.info("wait for queue:{}",rdnIdx); stat.addAndGet(rdnIdx,1); return managedQueues[rdnIdx].take(); } public AtomicLongMap getStat() { return stat; } }
這里根據(jù)cpu的數(shù)量建立了幾個deque,然后每次put的時候,根據(jù)hashcode取模放到對應(yīng)的隊列。然后獲取的時候,先從隨機(jī)一個隊列取,沒有的話,再robbin round取其他隊列的,還沒有的話,則阻塞等待指定隊列的元素。
測試實(shí)例
public class WorkStealingDemo { static final WorkStealingChannelchannel = new WorkStealingChannel<>(); static volatile boolean running = true; static class Producer extends Thread{ @Override public void run() { while(running){ try { channel.put(UUID.randomUUID().toString()); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class Consumer extends Thread{ @Override public void run() { while(running){ try { String value = channel.take(); System.out.println(value); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void stop(){ running = false; System.out.println(channel.getStat()); } public static void main(String[] args) throws InterruptedException { int nCPU = Runtime.getRuntime().availableProcessors(); int consumerCount = nCPU / 2 + 1; for (int i = 0; i < nCPU; i++) { new Producer().start(); } for (int i = 0; i < consumerCount; i++) { new Consumer().start(); } Thread.sleep(30*1000); stop(); } }
輸出
{0=660972, 1=660613, 2=661537, 3=659846, 4=659918}
從數(shù)據(jù)來看,還是相對均勻的。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/70405.html
執(zhí)行器 在前面的所有示例中,由新的線程(由其Runnable對象定義)和線程本身(由Thread對象定義)完成的任務(wù)之間存在緊密的聯(lián)系,這適用于小型應(yīng)用程序,但在大型應(yīng)用程序中,將線程管理和創(chuàng)建與應(yīng)用程序的其余部分分開是有意義的,封裝這些函數(shù)的對象稱為執(zhí)行器,以下小節(jié)詳細(xì)描述了執(zhí)行器。 執(zhí)行器接口定義三個執(zhí)行器對象類型。 線程池是最常見的執(zhí)行器實(shí)現(xiàn)類型。 Fork/Join是一個利用多個處理器的...
摘要:大多數(shù)待遇豐厚的開發(fā)職位都要求開發(fā)者精通多線程技術(shù)并且有豐富的程序開發(fā)調(diào)試優(yōu)化經(jīng)驗,所以線程相關(guān)的問題在面試中經(jīng)常會被提到。掌握了這些技巧,你就可以輕松應(yīng)對多線程和并發(fā)面試了。進(jìn)入等待通行準(zhǔn)許時,所提供的對象。 最近看到網(wǎng)上流傳著,各種面試經(jīng)驗及面試題,往往都是一大堆技術(shù)題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關(guān)線程的問題。Java語言一個重要的特點(diǎn)就...
摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務(wù)隊列,工作線程優(yōu)先處理自身隊列中的任務(wù)或順序,由線程池構(gòu)造時的參數(shù)決定,自身隊列為空時,以的順序隨機(jī)竊取其它隊列中的任務(wù)。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:方法接受對象數(shù)組作為參數(shù),目標(biāo)是對數(shù)組進(jìn)行升序排序。創(chuàng)建一個對象,并調(diào)用方法將它提交給線程池。此排序算法不直接返回結(jié)果給調(diào)用方,因此基于類。 分支/合并框架 說明 重點(diǎn)是那個浮點(diǎn)數(shù)數(shù)組排序的例子,從主函數(shù)展開,根據(jù)序號看 1、GitHub代碼歡迎star。你們輕輕的一點(diǎn),對我鼓勵特大,我有一個習(xí)慣,看完別人的文章是會點(diǎn)贊的。 2、個人認(rèn)為學(xué)習(xí)語言最好的方式就是模仿、思考別人為什么這么寫...
摘要:并不會為每個任務(wù)都創(chuàng)建工作線程,而是根據(jù)實(shí)際情況構(gòu)造線程池時的參數(shù)確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
閱讀 1580·2021-10-18 13:35
閱讀 2359·2021-10-09 09:44
閱讀 813·2021-10-08 10:05
閱讀 2707·2021-09-26 09:47
閱讀 3560·2021-09-22 15:22
閱讀 427·2019-08-29 12:24
閱讀 1993·2019-08-29 11:06
閱讀 2853·2019-08-26 12:23