摘要:中的線程池運用場景非常廣泛,幾乎所有的一步或者并發執行程序都可以使用。代碼中如果執行了方法,線程池會提前創建并啟動所有核心線程。線程池最大數量線程池允許創建的線程最大數量。被稱為是可重用固定線程數的線程池。
Java中的線程池運用場景非常廣泛,幾乎所有的一步或者并發執行程序都可以使用。那么線程池有什么好處呢,以及他的實現原理是怎么樣的呢?
在開發過程中,合理的使用線程池能夠帶來以下的一些優勢,這些優勢同時也是一些其他池化的優勢,例如數據庫連接池,http連接池等。
降低資源消耗,通過重復利用已經創建的線程降低線程創建和銷毀造成的消耗。
提高響應速度,當任務到達時,任務可以不需要等到線程創建就能立即執行。
提高線程的可管理性 線程的比較稀缺的資源,如果無限的創建,不僅會消耗系統資源,還會降低系統的穩定性。
線程池實現原理我們創建線程池完成之后,當把一個任務提交給線程池處理的時候,線程池的處理流程如下:
1)線程池判斷核心線程池的任務是否都在執行任務,如果不是,則創建一個新的線程來執行任務,如何核心線程池的線程都在執行任務,則進入下一個流程
2)線程池判斷工作隊列是否已滿,如果工作隊列沒有滿,那么新提交的任務將會存儲在工作隊列中,如果工作隊列滿了,那會進入到下一個流程
3)線程池判斷線程池中的線程是否都處于工作狀態,如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,那么使用飽和策略來處理這個任務
創建線程池可以使用java中已經內置的一些默認配置的線程池,也可以使用ThreadPoolExecutor來自己配置參數來創建線程池,阿里代碼規約中推薦后者來創建線程池,這樣創建的線程池更加符合業務的實際需求。
下面是我創建線程池的一個例子:
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
在上面的這段代碼里new ThreadPoolExecutor中一共有6個參數,我們來解析一下這些參數的意義,這樣下次創建線程池的時候能夠更加靈活的使用。
corePoolSize (核心線程的數量):任務提交到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的核心線程能夠執行新任務也會創建線程,等到需要執行的任務數大于線程池核心數大小時就不再創建。代碼中如果執行了prestartAllCoreThreads()方法,線程池會提前創建并啟動所有核心線程。
maximumPoolSize (線程池最大數量):線程池允許創建的線程最大數量。如果隊列滿了,并且創建的線程數小于最大線程數,那么線程池會再創建新的線程執行任務,如果使用無界隊列,那么這個參數設置了意義不大。
keepAliveTime (線程保持活動時間):線程池的工作線程空閑后,保持存活的時間,如果任務很多,切任務執行時間較長,這個值可以設置的大一點。
TimeUnit (上面的那個時間的單位)
BlockingQueue
ArrayBlockingQueue:基于數組的有界阻塞隊列 FIFO
LinkedBlockingQueue:基于鏈表的有界阻塞隊列 FIFO 吞吐量高于ArrayBlockingQueue, FixedThreadPool基于這個實現
SynchronousQueue: 不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入處于阻塞狀態,吞吐量高于LinkedBlockingQueue,靜態方法:Executors.newCachedThreadPool使用了這個隊列
PriorityBlockingQueue 具有優先級的無界阻塞隊列
handler (飽和策略):當隊列和線程池都滿了,說明線程池已經處于飽和狀態,那么必須采取一種策略處理新提交的任務,目前提供四種策略
AbortPolicy:直接拋出異常
CallerRunsPolicy: 只用調用者所在線程來運行任務
DiscardOldestPolicy :丟棄隊列里最近的一個任務,執行當前任務
DiscardPolicy : 不處理丟棄掉
也可以實現RejectedExecutionHandler接口自定義處理方式。
線程池創建完成之后,就可以處理提交的任務,任務如何提交到線程池呢,線程池提供了兩個方法:submit()和execute()方法。
public static void main(String[] args) { //提交沒有返回結果的任務 EXECUTOR_SERVICE.execute(new Runnable() { @Override public void run() { System.out.println("hello world"); } }); Future future = EXECUTOR_SERVICE.submit(new Runnable() { @Override public void run() { System.out.println("hello world1"); } }); //提交有返回結果的任務,返回一個future的對象,調用get方法獲取返回值,阻塞直到返回 Future future1 = EXECUTOR_SERVICE.submit(new Callable
Java對ExecutorService關閉方式有兩種,一是調用shutdown()方法,二是調用shutdownNow()方法。
這兩個方法雖然都可以關閉線程池,但是有一些區別
shutdown()
1、調用之后不允許繼續往線程池內繼續添加線程,如果繼續添加會出現RejectedExecutionException異常;
2、線程池的狀態變為SHUTDOWN狀態;
3、所有在調用shutdown()方法之前提交到ExecutorSrvice的任務都會執行;
4、一旦所有線程結束執行當前任務,ExecutorService才會真正關閉。
shutdownNow()
1、該方法返回尚未執行的 task 的 List;
2、線程池的狀態變為STOP狀態;
3、阻止所有正在等待啟動的任務, 并且停止當前正在執行的任務;
最好的關閉線程池的方法:
package multithread.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * @author pangjianfei * @Date 2019/6/21 * @desc 測試線程池 */ public class TestThreadPoolExecutor { static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); public static void main(String[] args) { EXECUTOR_SERVICE.execute( ()-> System.out.println("hello world")); Future future = EXECUTOR_SERVICE.submit(() -> System.out.println("hello world1")); Future future1 = EXECUTOR_SERVICE.submit(() -> {System.out.println("hello world1");return "succ";}); Future future2 = EXECUTOR_SERVICE.submit(() -> {Thread.sleep(5000);System.out.println("hello world2");return "線程執行成功了";}); try { System.out.println(future.get()); System.out.println(future1.get()); System.out.println(future2.get()); } catch (Exception e) { e.printStackTrace(); } Runnable run = () -> { long sum = 0; boolean flag = true; while (flag && !Thread.currentThread().isInterrupted()) { sum += 1; if (sum == Long.MAX_VALUE) { flag = false; } } }; EXECUTOR_SERVICE.execute(run); //先調用shutdown()使線程池狀態改變為SHUTDOWN EXECUTOR_SERVICE.shutdown(); try { //2s后檢測線程池內的線程是否執行完畢 if (!EXECUTOR_SERVICE.awaitTermination(2, TimeUnit.SECONDS)) { //內部實際通過interrupt來終止線程,所以當調用shutdownNow()時,isInterrupted()會返回true。 EXECUTOR_SERVICE.shutdownNow(); } } catch (InterruptedException e) { EXECUTOR_SERVICE.shutdownNow(); } } }
一般說來,大家認為線程池的大小經驗值應該這樣設置:(其中N為CPU的個數)
如果是CPU密集型應用,則線程池大小設置為N+1
如果是IO密集型應用,則線程池大小設置為2N+1
IO優化中,更加合理的方式是:最佳線程數目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數目
linux查看CPU的數量:
# 查看物理CPU個數 cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l # 查看每個物理CPU中core的個數(即核數) cat /proc/cpuinfo| grep "cpu cores"| uniq # 查看邏輯CPU的個數 cat /proc/cpuinfo| grep "processor"| wc -l
基于下面的方法可以對線程池中的一些數據進行監控:
//當前排隊線程數 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getQueue().size(); //當前活動的線程數 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getActiveCount(); //執行完成的線程數 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getCompletedTaskCount(); //總線程數 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getTaskCount();Executor框架
Executor框架的調度模型是一種二級調度模型。
在HotSpot VM 的線程模型中,Java線程會被一對一的映射為本地操作系統的線程。Java線程的啟動與銷毀都與本地線程同步。操作系統會調度所有線程并將它們分配給可用的CPU。
在上層,我們通常會創建任務,然后將任務提交到調度器,調度器(Executor框架)將這些任務映射為對應數量的線程;底層,操作系統會將這些線程映射到硬件處理器上,這里不是由應用程序控制。模型圖如下:
Executor框架主要包括三部分:
任務: 定義的被執行的任務需要實現兩個接口之一:Runable、Callable;
任務的執行: 包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor、ForkJoinPool;
任務的異步計算結果: 包括Future接口和實現Future接口的FutureTask類、ForkJoinTask類。
Executor主要包含的類和接口如下:
Executor是一個接口,他是Executor框架的基礎,他將任務的提交和執行分離。
ThreadPoolExecutor是線程池的核心類,用來執行被提交的任務
ScheduleThreadPoolExecutor是一個實現類,可以在給定的延遲后執行命令,或者定期執行命令,比Timer更加靈活。
Future接口和實現Future接口的FutureTask類,代表異步計算的結果
Executor框架最核心的類就是ThreadPoolExecutor,他是線程池的實現類,我們在上面看到過創建線程池的例子
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); 里面包含了幾個核心的參數,通過Executor框架的工具類Executors,可以直接創建3中類型的線程池。
FixedThreadPool被稱為是可重用固定線程數的線程池。為什么會這么說,我們看一下他的實現源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { /** * 核心線程數和最大線程數相同,線程等待時間為0表示多余的空閑線程會被立刻終止,LinkedBlockingQueue默認容量是Integer.MAX_VALUE,基本類似于無界隊列 */ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
SingleThreadExecutor是單個Worker線程的Executor.他的實現源碼如下:
/** * 可以看到核心線程數和最大線程數都是1,相當于是創建一個單線程順序的執行隊列中的任務 */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
CachedThreadPool是一個會根據需要創建新線程的線程池,源碼如下:
/** * 核心線程數為0,最大線程數為Integer.MAX_VALUE,線程存活時間為1分鐘,而且是使用SynchronousQueue沒有容量的隊列,這種情況下任務提交快的是時候,會創建大量的線程,耗盡CPU的資源 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
ScheduledThreadPoolExecutor用來在給定的延遲之后運行任務,或者定期執行任務,他比Timer更加靈活,Timer是創建一個后臺線程,而這個線程池可以在構造函數中指定多個對應的后臺線程。源碼如下:
/** * 這里使用的是無界的隊列,如果核心線程池已滿,那么任務會一直提交到隊列 */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledThreadPoolExecutor的執行主要包含兩部分
/**參數的作用:1:執行的線程 2、初始化延時 3、兩次開始執行最小時間間隔 4、計時單位*/ /**scheduleAtFixedRate()是按照指定頻率執行一個任務,就是前一個任務沒有執行完成,但是下次執行時間到,仍然會啟動線程執行新的任務*/ scheduledExecutorService.scheduleAtFixedRate(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS); /**參數作用同上*/ /**scheduleWithFixedDelay()是按照指定頻率間隔執行,本次執行結果之后,在延時10s執行下一次的任務*/ scheduledExecutorService.scheduleWithFixedDelay(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS); 上面的兩種方法都是向ScheduledThreadPoolExecutor的DelayQueue中添加了一個實現了RunnableScheduleFuture接口的ScheduledTask. public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); //這里對commond進行了封裝 ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
DelayQueue是一個支持優先級的隊列,這里默認會按照執行時間進行排序,time小的排在前面。線程在獲取到期需要執行的ScheduledTutureTask之后,執行這個任務,執行完成之后會修改time,將其變為下次要執行的時間,修改之后放回到DelayQueue中。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77839.html
摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...
摘要:任務性質不同的任務可以用不同規模的線程池分開處理。線程池在運行過程中已完成的任務數量。如等于線程池的最大大小,則表示線程池曾經滿了。線程池的線程數量。獲取活動的線程數。通過擴展線程池進行監控。框架包括線程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個子線程共同在運行的線程組合。 舉個容易理解的例子:有個線程組合(即線程池,咱可以比喻為一個公司),里面有3...
摘要:線程池的工作原理一個線程池管理了一組工作線程,同時它還包括了一個用于放置等待執行任務的任務隊列阻塞隊列。使用線程池可以對線程進行統一的分配和監控。線程池的注意事項雖然線程池是構建多線程應用程序的強大機制,但使用它并不是沒有風險的。 線程池的工作原理一個線程池管理了一組工作線程, 同時它還包括了一個用于放置等待執行 任務的任務隊列(阻塞隊列) 。 一個線程池管理了一組工作線程, 同時它還...
摘要:高并發系列第篇文章。簡單的說,在使用了線程池之后,創建線程變成了從線程池中獲取一個空閑的線程,然后使用,關閉線程變成了將線程歸還到線程池。如果調用了線程池的方法,線程池會提前把核心線程都創造好,并啟動線程池允許創建的最大線程數。 java高并發系列第18篇文章。 本文主要內容 什么是線程池 線程池實現原理 線程池中常見的各種隊列 自定義線程創建的工廠 常見的飽和策略 自定義飽和策略 ...
閱讀 1698·2021-11-12 10:36
閱讀 1615·2021-11-12 10:36
閱讀 3441·2021-11-02 14:46
閱讀 3798·2019-08-30 15:56
閱讀 3533·2019-08-30 15:55
閱讀 1462·2019-08-30 15:44
閱讀 1043·2019-08-30 14:00
閱讀 2735·2019-08-29 18:41