摘要:創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程,若無(wú)可回收,則新建線程。創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
轉(zhuǎn)載請(qǐng)注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/...
線程池Wiki 上是這樣解釋的:Thread Pool
作用:利用線程池可以大大減少在創(chuàng)建和銷毀線程上所花的時(shí)間以及系統(tǒng)資源的開(kāi)銷!
下面主要講下線程池中最重要的一個(gè)類 ThreadPoolExecutor 。
ThreadPoolExecutor 構(gòu)造器:
有四個(gè)構(gòu)造器的,挑了參數(shù)最長(zhǎng)的一個(gè)進(jìn)行講解。
七個(gè)參數(shù):
corePoolSize:核心池的大小,在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒(méi)有任何線程,而是等待有任務(wù)到來(lái)才創(chuàng)建線程去執(zhí)行任務(wù),當(dāng)有任務(wù)來(lái)之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù);
keepAliveTime:表示線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止;
unit:參數(shù)keepAliveTime的時(shí)間單位(DAYS、HOURS、MINUTES、SECONDS 等);
workQueue:阻塞隊(duì)列,用來(lái)存儲(chǔ)等待執(zhí)行的任務(wù);
ArrayBlockingQueue (有界隊(duì)列)
LinkedBlockingQueue (無(wú)界隊(duì)列)
SynchronousQueue
threadFactory:線程工廠,主要用來(lái)創(chuàng)建線程
handler:拒絕處理任務(wù)的策略
AbortPolicy:丟棄任務(wù)并拋出 RejectedExecutionException 異常。(默認(rèn)這種)
DiscardPolicy:也是丟棄任務(wù),但是不拋出異常
DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)
CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
重要方法:
execute():通過(guò)這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行;
shutdown():關(guān)閉線程池;
execute() 方法:
注:JDK 1.7 和 1.8 這個(gè)方法有點(diǎn)區(qū)別,下面代碼是 1.8 中的。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1、如果當(dāng)前的線程數(shù)小于核心線程池的大小,根據(jù)現(xiàn)有的線程作為第一個(gè) Worker 運(yùn)行的線程,新建一個(gè) Worker,addWorker 自動(dòng)的檢查當(dāng)前線程池的狀態(tài)和 Worker 的數(shù)量,防止線程池在不能添加線程的狀態(tài)下添加線程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2、如果線程入隊(duì)成功,然后還是要進(jìn)行 double-check 的,因?yàn)榫€程在入隊(duì)之后狀態(tài)是可能會(huì)發(fā)生變化的 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // recheck 防止線程池狀態(tài)的突變,如果突變,那么將 reject 線程,防止 workQueue 中增加新線程 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0)//上下兩個(gè)操作都有 addWorker 的操作,但是如果在workQueue.offer 的時(shí)候 Worker 變?yōu)?0,那么將沒(méi)有 Worker 執(zhí)行新的 task,所以增加一個(gè) Worker. addWorker(null, false); } //3、如果 task 不能入隊(duì)(隊(duì)列滿了),這時(shí)候嘗試增加一個(gè)新線程,如果增加失敗那么當(dāng)前的線程池狀態(tài)變化了或者線程池已經(jīng)滿了然后拒絕task else if (!addWorker(command, false)) reject(command); }
其中調(diào)用了 addWorker() 方法:
private boolean addWorker(Runnable firstTask, boolean core) {// firstTask: 新增一個(gè)線程并執(zhí)行這個(gè)任務(wù),可空,增加的線程從隊(duì)列獲取任務(wù);core:是否使用 corePoolSize 作為上限,否則使用 maxmunPoolSize retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * rs!=Shutdown || fistTask!=null || workQueue.isEmpty * 如果當(dāng)前的線程池的狀態(tài) > SHUTDOWN 那么拒絕 Worker 的 add 如果 =SHUTDOWN * 那么此時(shí)不能新加入不為 null 的 Task,如果在 workQueue 為 empty 的時(shí)候不能加入任何類型的 Worker, * 如果不為 empty 可以加入 task 為 null 的 Worker, 增加消費(fèi)的 Worker */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //如果當(dāng)前的數(shù)量超過(guò)了 CAPACITY,或者超過(guò)了 corePoolSize 和 maximumPoolSize(試 core 而定) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS 嘗試增加線程數(shù),如果失敗,證明有競(jìng)爭(zhēng),那么重新到 retry。 if (compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作; break retry; c = ctl.get(); // Re-read ctl //判斷當(dāng)前線程池的運(yùn)行狀態(tài),狀態(tài)發(fā)生改變,重試 retry; if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);// Worker 為內(nèi)部類,封裝了線程和任務(wù),通過(guò) ThreadFactory 創(chuàng)建線程,可能失敗拋異常或者返回 null final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable // SHUTDOWN 以后的狀態(tài)和 SHUTDOWN 狀態(tài)下 firstTask 為 null,不可新增線程 throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s;//記錄最大線程數(shù) workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//失敗回退,從 wokers 移除 w, 線程數(shù)減一,嘗試結(jié)束線程池(調(diào)用tryTerminate 方法) } return workerStarted; }
示意圖:
執(zhí)行流程:
1、當(dāng)有任務(wù)進(jìn)入時(shí),線程池創(chuàng)建線程去執(zhí)行任務(wù),直到核心線程數(shù)滿為止
2、核心線程數(shù)量滿了之后,任務(wù)就會(huì)進(jìn)入一個(gè)緩沖的任務(wù)隊(duì)列中
當(dāng)任務(wù)隊(duì)列為無(wú)界隊(duì)列時(shí),任務(wù)就會(huì)一直放入緩沖的任務(wù)隊(duì)列中,不會(huì)和最大線程數(shù)量進(jìn)行比較
當(dāng)任務(wù)隊(duì)列為有界隊(duì)列時(shí),任務(wù)先放入緩沖的任務(wù)隊(duì)列中,當(dāng)任務(wù)隊(duì)列滿了之后,才會(huì)將任務(wù)放入線程池,此時(shí)會(huì)與線程池中最大的線程數(shù)量進(jìn)行比較,如果超出了,則默認(rèn)會(huì)拋出異常。然后線程池才會(huì)執(zhí)行任務(wù),當(dāng)任務(wù)執(zhí)行完,又會(huì)將緩沖隊(duì)列中的任務(wù)放入線程池中,然后重復(fù)此操作。
shutdown() 方法:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否可以操作目標(biāo)線程 checkShutdownAccess(); //設(shè)置線程池狀態(tài)為 SHUTDOWN, 此處之后,線程池中不會(huì)增加新 Task advanceRunState(SHUTDOWN); //中斷所有的空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //轉(zhuǎn)到 Terminate tryTerminate(); }
參考資料:深入理解java線程池—ThreadPoolExecutor
JDK 自帶四種線程池分析與比較 1、newFixedThreadPool創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
2、newSingleThreadExecutor創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
3、newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程,若無(wú)可回收,則新建線程。
4、newScheduledThreadPool創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
四種線程池其實(shí)內(nèi)部方法都是調(diào)用的 ThreadPoolExecutor 類,只不過(guò)利用了其不同的構(gòu)造器方法而已(傳入自己需要傳入的參數(shù)),那么利用這個(gè)特性,我們自己也是可以實(shí)現(xiàn)自己定義的線程池的。
自定義線程池1、創(chuàng)建任務(wù)類
package com.zhisheng.thread.threadpool.demo; /** * Created by 10412 on 2017/7/24. * 任務(wù) */ public class MyTask implements Runnable { private int taskId; //任務(wù) id private String taskName; //任務(wù)名字 public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public MyTask(int taskId, String taskName) { this.taskId = taskId; this.taskName = taskName; } @Override public void run() { System.out.println("當(dāng)前正在執(zhí)行 ****** 線程Id-->" + taskId + ",任務(wù)名稱-->" + taskName); try { Thread.currentThread().sleep(5 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程Id-->" + taskId + ",任務(wù)名稱-->" + taskName + " ----------- 執(zhí)行完畢!"); } }
2、自定義拒絕策略,實(shí)現(xiàn) RejectedExecutionHandler 接口,重寫(xiě) rejectedExecution 方法
package com.zhisheng.thread.threadpool.demo; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * Created by 10412 on 2017/7/24. * 自定義拒絕策略,實(shí)現(xiàn) RejectedExecutionHandler 接口 */ public class RejectedThreadPoolHandler implements RejectedExecutionHandler { public RejectedThreadPoolHandler() { } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("WARNING 自定義拒絕策略: Task " + r.toString() + " rejected from " + executor.toString()); } }
3、創(chuàng)建線程池
package com.zhisheng.thread.threadpool.demo; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by 10412 on 2017/7/24. */ public class ThreadPool { public static void main(String[] args) { //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時(shí)間 60s,有界隊(duì)列長(zhǎng)度為 3, //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3)); //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時(shí)間 60s, 無(wú)界隊(duì)列, //ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); //核心線程數(shù)量為 2,最大線程數(shù)量 4,空閑線程存活的時(shí)間 60s,有界隊(duì)列長(zhǎng)度為 3, 使用自定義拒絕策略 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(3), new RejectedThreadPoolHandler()); for (int i = 1; i <= 10; i++) { //創(chuàng)建 10 個(gè)任務(wù) MyTask task = new MyTask(i, "任務(wù)" + i); //運(yùn)行 pool.execute(task); System.out.println("活躍的線程數(shù):"+pool.getActiveCount() + ",核心線程數(shù):" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊(duì)列的大小" + pool.getQueue().size()); } //關(guān)閉線程池 pool.shutdown(); } }
這里運(yùn)行結(jié)果就不截圖了,我在本地測(cè)試了代碼是沒(méi)問(wèn)題的,感興趣的建議還是自己跑一下,然后分析下結(jié)果是不是和前面分析的一樣,如有問(wèn)題,請(qǐng)?jiān)谖也┛拖旅嬖u(píng)論!
總結(jié)本文一開(kāi)始講了線程池的介紹和好處,然后分析了線程池中最核心的 ThreadPoolExecutor 類中構(gòu)造器的七個(gè)參數(shù)的作用、類中兩個(gè)重要的方法,然后在對(duì)比研究了下 JDK 中自帶的四種線程池的用法和內(nèi)部代碼細(xì)節(jié),最后寫(xiě)了一個(gè)自定義的線程池。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/67500.html
摘要:當(dāng)活動(dòng)線程核心線程非核心線程達(dá)到這個(gè)數(shù)值后,后續(xù)任務(wù)將會(huì)根據(jù)來(lái)進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請(qǐng)求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時(shí)默默丟棄不能執(zhí)行的新加任務(wù),不報(bào)任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點(diǎn)記錄以及采用的解決方案 深入分析 java 線程池的實(shí)現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...
摘要:中的詳解必修個(gè)多線程問(wèn)題總結(jié)個(gè)多線程問(wèn)題總結(jié)有哪些源代碼看了后讓你收獲很多,代碼思維和能力有較大的提升有哪些源代碼看了后讓你收獲很多,代碼思維和能力有較大的提升開(kāi)源的運(yùn)行原理從虛擬機(jī)工作流程看運(yùn)行原理。 自己實(shí)現(xiàn)集合框架 (三): 單鏈表的實(shí)現(xiàn) 自己實(shí)現(xiàn)集合框架 (三): 單鏈表的實(shí)現(xiàn) 基于 POI 封裝 ExcelUtil 精簡(jiǎn)的 Excel 導(dǎo)入導(dǎo)出 由于 poi 本身只是針對(duì)于 ...
摘要:線程的啟動(dòng)與銷毀都與本地線程同步。操作系統(tǒng)會(huì)調(diào)度所有線程并將它們分配給可用的。框架的成員主要成員線程池接口接口接口以及工具類。創(chuàng)建單個(gè)線程的接口與其實(shí)現(xiàn)類用于表示異步計(jì)算的結(jié)果。參考書(shū)籍并發(fā)編程的藝術(shù)方騰飛魏鵬程曉明著 在java中,直接使用線程來(lái)異步的執(zhí)行任務(wù),線程的每次創(chuàng)建與銷毀需要一定的計(jì)算機(jī)資源開(kāi)銷。每個(gè)任務(wù)創(chuàng)建一個(gè)線程的話,當(dāng)任務(wù)數(shù)量多的時(shí)候,則對(duì)應(yīng)的創(chuàng)建銷毀開(kāi)銷會(huì)消耗大量...
摘要:開(kāi)頭正式開(kāi)啟我入職的里程,現(xiàn)在已是工作了一個(gè)星期了,這個(gè)星期算是我入職的過(guò)渡期,算是知道了學(xué)校生活和工作的差距了,總之,盡快習(xí)慣這種生活吧。當(dāng)時(shí)是看的廖雪峰的博客自己也用做爬蟲(chóng)寫(xiě)過(guò)幾篇博客,不過(guò)有些是在前人的基礎(chǔ)上寫(xiě)的。 showImg(https://segmentfault.com/img/remote/1460000010867984); 開(kāi)頭 2017.08.21 正式開(kāi)啟我...
閱讀 1338·2021-11-11 16:54
閱讀 2385·2021-09-22 10:51
閱讀 2653·2019-08-30 15:44
閱讀 3205·2019-08-29 17:05
閱讀 1444·2019-08-29 17:01
閱讀 2897·2019-08-29 12:28
閱讀 2470·2019-08-26 13:50
閱讀 1730·2019-08-23 16:47