摘要:線程池同時可以避免創建大量線程的開銷,提高響應速度??梢钥吹剑暮诵木€程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數后,任務都會被放到阻塞隊列中。
目錄
概述JAVA通過多線程的方式實現并發,為了方便線程池的管理,JAVA采用線程池的方式對線線程的整個生命周期進行管理。1.5后引入的Executor框架的最大優點是把任務的提交和執行解耦。
要執行任務的人只需把Task描述清楚,然后提交即可。這個Task是怎么被執行的,被誰執行的,什么時候執行的,提交的人就不用關心了。
線程池同時可以避免創建大量線程的開銷,提高響應速度。最近在閱讀JVM相關的東西,一個對象的創建需要以下過程:
檢查對應的類是否已經被加載、解析和初始化
類加載后,為新生對象分配內存
將分配到的內存空間初始為 0
對對象進行關鍵信息的設置,比如對象的hashcode等
然后執行 init 方法初始化對象
如果每次都是如此的創建線程->執行任務->銷毀線程,會造成很大的性能開銷。復用已創建好的線程可以提高系統的性能,借助池化技術的思想,通過預先創建好多個線程,放在池中,這樣可以在需要使用線程的時候直接獲取,避免多次重復創建、銷毀帶來的開銷。
線程池的“池” ThreadPoolExecutor前面提到一個名詞——池化技術,那么到底什么是池化技術呢?池化技術簡單點來說,就是提前保存大量的資源,以備不時之需。在機器資源有限的情況下,使用池化技術可以大大的提高資源的利用率,提升性能等。
在編程領域,比較典型的池化技術有:
線程池、連接池、內存池、對象池等。
在Java中創建線程池可以使用ThreadPoolExecutor,其繼承關系如下圖
其構造函數為:
代碼塊
Java
public ThreadPoolExecutor(int corePoolSize, //核心線程的數量 int maximumPoolSize, //最大線程數量 long keepAliveTime, //超出核心線程數量以外的線程空余存活時間 TimeUnit unit, //存活時間的單位 BlockingQueueworkQueue, //保存待執行任務的隊列 ThreadFactory threadFactory, //創建新線程使用的工廠 RejectedExecutionHandler handler // 當任務無法執行時的處理器 ) {...}
corePoolSize:核心線程池數量
在線程數少于核心數量時,有新任務進來就新建一個線程,即使有的線程沒事干
等超出核心數量后,就不會新建線程了,空閑的線程就得去任務隊列里取任務執行了
maximumPoolSize:最大線程數量
包括核心線程池數量 + 核心以外的數量
如果任務隊列滿了,并且池中線程數小于最大線程數,會再創建新的線程執行任務
keepAliveTime:核心池以外的線程存活時間,即沒有任務的外包的存活時間
如果給線程池設置 allowCoreThreadTimeOut(true),則核心線程在空閑時頭上也會響起死亡的倒計時
如果任務是多而容易執行的,可以調大這個參數,那樣線程就可以在存活的時間里有更大可能接受新任務
workQueue:保存待執行任務的阻塞隊列
不同的任務類型有不同的選擇,下一小節介紹
threadFactory:每個線程創建的地方
可以給線程起個好聽的名字,設置個優先級啥的
handler:飽和策略,大家都很忙,咋辦呢,有四種策略
AbortPolicy:直接拋出 RejectedExecutionException 異常,本策略也是默認的飽和策略
CallerRunsPolicy:只要線程池沒關閉,就直接用調用者所在線程來運行任務
DiscardPolicy:悄悄把任務放生,不做了
DiscardOldestPolicy:把隊列里待最久的那個任務扔了,然后再調用 execute() 嘗試執行
我們也可以實現自己的 RejectedExecutionHandler 接口自定義策略,比如如記錄日志什么的
如果把線程比作員工,那么線程池可以比作一個團隊,核心池比作團隊中正式員工數,核心池外的比作外包員工。
線程池中任務的執行順序通過Executors靜態工廠也可以構建常用的線程池,在詳細介紹之前,還需要先了解線程池中任務的執行順序
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn"t, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
從注釋中可以看到處理邏輯,從判斷條件中可以看到核心模塊
第一個紅框:workerCountOf方法根據ctl的低29位,得到線程池的當前線程數,如果線程數小于corePoolSize,則執行addWorker方法創建新的線程執行任務;
第二個紅框:判斷線程池是否在運行,如果在,任務隊列是否允許插入,插入成功再次驗證線程池是否運行,如果不在運行,移除插入的任務,然后拋出拒絕策略。如果在運行,沒有線程了,就啟用一個線程。
第三個紅框:如果添加非核心線程失敗,就直接拒絕了。
概略圖:
詳細流程圖:
Executors按照上面的總結,可以逐一分析Executors工廠類提供的現成的線程池:
1.newFixedThreadPoolpublic static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
不招外包,有固定數量核心成員的正?;ヂ摼W團隊。
可以看到,FixedThreadPool 的核心線程數和最大線程數都是指定值,也就是說當線程池中的線程數超過核心線程數后,任務都會被放到阻塞隊列中。
此外 keepAliveTime 為 0,也就是多余的空余線程會被立即終止(由于這里沒有多余線程,這個參數也沒什么意義了)。
而這里選用的阻塞隊列是 LinkedBlockingQueue,使用的是默認容量 Integer.MAX_VALUE,相當于沒有上限。
因此這個線程池執行任務的流程如下:
線程數少于核心線程數,也就是設置的線程數時,新建線程執行任務
線程數等于核心線程數后,將任務加入阻塞隊列
由于隊列容量非常大,可以一直加加加
執行完任務的線程反復去隊列中取任務執行
FixedThreadPool 用于負載比較重的服務器,為了資源的合理利用,需要限制當前線程數量。
2.newSingleThreadExecutorpublic static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
不招外包,只有一個核心成員的創業團隊。
從參數可以看出來,SingleThreadExecutor 相當于特殊的 FixedThreadPool,它的執行流程如下:
線程池中沒有線程時,新建一個線程執行任務
有一個線程以后,將任務加入阻塞隊列,不停加加加
唯一的這一個線程不停地去隊列里取任務執行
聽起來很可憐的樣子 - -。
SingleThreadExecutor 用于串行執行任務的場景,每個任務必須按順序執行,不需要并發執行。
3.newCachedThreadPoolpublic static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
全部外包,沒活最多待 60 秒的外包團隊。
可以看到,CachedThreadPool 沒有核心線程,非核心線程數無上限,也就是全部使用外包,但是每個外包空閑的時間只有 60 秒,超過后就會被回收。
CachedThreadPool 使用的隊列是 SynchronousQueue,這個隊列的作用就是傳遞任務,并不會保存。
因此當提交任務的速度大于處理任務的速度時,每次提交一個任務,就會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。
它的執行流程如下:
沒有核心線程,直接向 SynchronousQueue 中提交任務
如果有空閑線程,就去取出任務執行;如果沒有空閑線程,就新建一個
執行完任務的線程有 60 秒生存時間,如果在這個時間內可以接到新任務,就可以繼續活下去,否則就拜拜
由于空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。
CachedThreadPool 用于并發執行大量短期的小任務,或者是負載較輕的服務器。
4.newScheduledThreadPoolpublic static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
定期維護的 2B 業務團隊,核心與外包成員都有。
ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor, 最多線程數為 Integer.MAX_VALUE ,使用 DelayedWorkQueue 作為任務隊列。
ScheduledThreadPoolExecutor 添加任務和執行任務的機制與ThreadPoolExecutor 有所不同。
ScheduledThreadPoolExecutor 添加任務提供了另外兩個方法:
scheduleAtFixedRate() :按某種速率周期執行
scheduleWithFixedDelay():在某個延遲后執行
它倆的代碼如下:
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0L) throw new IllegalArgumentException(); ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period), sequencer.getAndIncrement()); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0L) throw new IllegalArgumentException(); ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), -unit.toNanos(delay), sequencer.getAndIncrement()); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
可以看到,這兩種方法都是創建了一個 ScheduledFutureTask 對象,調用 decorateTask() 方法轉成 RunnableScheduledFuture 對象,然后添加到隊列中。
看下 ScheduledFutureTask 的主要屬性:
private class ScheduledFutureTaskextends FutureTask implements RunnableScheduledFuture { //添加到隊列中的順序 private final long sequenceNumber; //何時執行這個任務 private volatile long time; //執行的間隔周期 private final long period; //實際被添加到隊列中的 task RunnableScheduledFuture outerTask = this; //在 delay queue 中的索引,便于取消時快速查找 int heapIndex; //... }
DelayQueue 中封裝了一個優先級隊列,這個隊列會對隊列中的 ScheduledFutureTask 進行排序,兩個任務的執行 time 不同時,time 小的先執行;否則比較添加到隊列中的順序 sequenceNumber ,先提交的先執行。
ScheduledThreadPoolExecutor 的執行流程如下:
調用上面兩個方法添加一個任務
線程池中的線程從 DelayQueue 中取任務
然后執行任務
具體執行任務的步驟也比較復雜:
線程從 DelayQueue 中獲取 time 大于等于當前時間的 ScheduledFutureTask
DelayQueue.take()
執行完后修改這個 task 的 time 為下次被執行的時間
然后再把這個 task 放回隊列中
DelayQueue.add()
ScheduledThreadPoolExecutor 用于需要多個后臺線程執行周期任務,同時需要限制線程數量的場景。
“不允許使用”Executors阿里巴巴Java開發手冊中明確指出,『不允許』使用Executors創建線程池。
通過上面的例子,我們知道了Executors創建的線程池存在OOM的風險,那么到底是什么原因導致的呢?我們需要深入Executors的源碼來分析一下。
其實,在上面的報錯信息中,我們是可以看出蛛絲馬跡的,在以上的代碼中其實已經說了,真正的導致OOM的其實是LinkedBlockingQueue.offer方法。
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371) at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
如果對Java中的阻塞隊列有所了解的話,看到這里或許就能夠明白原因了。
Java中的BlockingQueue主要有兩種實現,分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,必須設置容量。
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。
這里的問題就出在:不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設置LinkedBlockingQueue的容量的話,其默認容量將會是Integer.MAX_VALUE。
而newFixedThreadPool中創建LinkedBlockingQueue時,并未指定容量。此時,LinkedBlockingQueue就是一個無邊界隊列,對于一個無邊界隊列來說,是可以不斷的向隊列中加入任務的,這種情況下就有可能因為任務過多而導致內存溢出問題。
上面提到的問題主要體現在newFixedThreadPool和newSingleThreadExecutor兩個工廠方法上,并不是說newCachedThreadPool和newScheduledThreadPool這兩個方法就安全了,這兩種方式創建的最大線程數可能是Integer.MAX_VALUE,而創建這么多線程,必然就有可能導致OOM。
說回ThreadPoolService addWorker從方法execute的實現可以看出:addWorker主要負責創建新的線程并執行任務,代碼如下(這里代碼有點長,沒關系,也是分塊的,總共有5個關鍵的代碼塊):
第一個紅框:做是否能夠添加工作線程條件過濾:
判斷線程池的狀態,如果線程池的狀態值大于或等SHUTDOWN,則不處理提交的任務,直接返回;
第二個紅框:做自旋,更新創建線程數量:
通過參數core判斷當前需要創建的線程是否為核心線程,如果core為true,且當前線程數小于corePoolSize,則跳出循環,開始創建新的線程。retry 是什么?這個是java中的goto語法。只能運用在break和continue后面。
接著看后面的代碼:
第一個紅框:獲取線程池主鎖。
線程池的工作線程通過Woker類實現,通過ReentrantLock鎖保證線程安全。
第二個紅框:添加線程到workers中(線程池中)。
第三個紅框:啟動新建的線程。
接下來,我們看看workers是什么。
一個hashSet。所以,線程池底層的存儲結構其實就是一個HashSet。
worker線程處理隊列任務第一個紅框:是否是第一次執行任務,或者從隊列中可以獲取到任務。
第二個紅框:獲取到任務后,執行任務開始前操作鉤子。
第三個紅框:執行任務。
第四個紅框:執行任務后鉤子。
這兩個鉤子(beforeExecute,afterExecute)允許我們自己繼承線程池,做任務執行前后處理。
總結到這里,源代碼分析到此為止。接下來做一下簡單的總結。
所謂線程池本質是一個hashSet。多余的任務會放在阻塞隊列中。
只有當阻塞隊列滿了后,才會觸發非核心線程的創建。所以非核心線程只是臨時過來打雜的。直到空閑了,然后自己關閉了。
線程池提供了兩個鉤子(beforeExecute,afterExecute)給我們,我們繼承線程池,在執行任務前后做一些事情。
線程池原理關鍵技術:鎖(lock,cas)、阻塞隊列、hashSet(資源池)
參考文檔Java中線程池,你真的會用嗎?
深入源碼分析Java線程池的實現原理
線程池的使用與執行流程
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73502.html
摘要:近段時間在準備實習的面試,在網上看到一份面試題,就慢慢試著做,爭取每天積累一點點?,F在每天給自己在面試題編寫的任務是題,有時候忙起來可能就沒有時間寫了,但是爭取日更,即使當天沒更也會在之后的更新補上。 ????近段時間在準備實習的面試,在網上看到一份面試題,就慢慢試著做,爭取每天積累一點點。????暫時手頭上的面試題只有一份,題量還是挺大的,有208題,所以可能講的不是很詳細,只是我自...
摘要:今天給大家總結一下,面試中出鏡率很高的幾個多線程面試題,希望對大家學習和面試都能有所幫助。指令重排在單線程環境下不會出先問題,但是在多線程環境下會導致一個線程獲得還沒有初始化的實例。使用可以禁止的指令重排,保證在多線程環境下也能正常運行。 下面最近發的一些并發編程的文章匯總,通過閱讀這些文章大家再看大廠面試中的并發編程問題就沒有那么頭疼了。今天給大家總結一下,面試中出鏡率很高的幾個多線...
摘要:不過大多數講解還停留在對功能使用的層面,其底層的很多原理,很多人可能并不知曉。每個線程池里的線程就僅僅用于請求那個服務。 歡迎關注微信公眾號:石杉的架構筆記(id:shishan100) 每日更新!精品技術文章準時送上! 目錄 一、業務場景介紹 二、Spring Cloud核心組件:Eureka 三、Spring Cloud核心組件:Feign 四、Spring Cloud核心組件:R...
摘要:不過大多數講解還停留在對功能使用的層面,其底層的很多原理,很多人可能并不知曉。每個線程池里的線程就僅僅用于請求那個服務。 歡迎關注微信公眾號:石杉的架構筆記(id:shishan100) 每日更新!精品技術文章準時送上! 目錄 一、業務場景介紹 二、Spring Cloud核心組件:Eureka 三、Spring Cloud核心組件:Feign 四、Spring Cloud核心組件:R...
摘要:基礎問題的的性能及原理之區別詳解備忘筆記深入理解流水線抽象關鍵字修飾符知識點總結必看篇中的關鍵字解析回調機制解讀抽象類與三大特征時間和時間戳的相互轉換為什么要使用內部類對象鎖和類鎖的區別,,優缺點及比較提高篇八詳解內部類單例模式和 Java基礎問題 String的+的性能及原理 java之yield(),sleep(),wait()區別詳解-備忘筆記 深入理解Java Stream流水...
閱讀 2348·2021-11-15 11:37
閱讀 2625·2021-09-23 11:21
閱讀 2951·2021-09-07 10:11
閱讀 3164·2019-08-30 15:53
閱讀 2826·2019-08-29 15:13
閱讀 1606·2019-08-26 13:57
閱讀 1098·2019-08-26 12:23
閱讀 2438·2019-08-26 11:51