摘要:類則扮演線程池工廠角色,通過可以取得一個具有特定功能的線程池。返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池,線程數(shù)量不確定,若有空閑,則會有限復用線程。所有線程在當前任務(wù)執(zhí)行完后,將返回線程池待復用。
前言
多線程的軟件設(shè)計方案確實可以最大限度地發(fā)揮現(xiàn)代多核處理器的計算能力,提高生產(chǎn)系列的吞吐量和性能。但是,若不加控制和管理的隨意使用線程,對系統(tǒng)的性能反而會產(chǎn)生不利的影響。最容易想到的后果就是線程過多導致CPU忙于切換而無力執(zhí)行其中的工作。
為了避免系統(tǒng)頻繁地創(chuàng)建和銷毀線程,我們可以讓創(chuàng)建的線程進行復用。如果有同學有過數(shù)據(jù)庫開發(fā)的經(jīng)驗,對數(shù)據(jù)庫連接池這個概念應(yīng)該不會陌生。為了避免每次數(shù)據(jù)庫查詢都重新建立和銷毀數(shù)據(jù)庫連接,我們可以使用數(shù)據(jù)庫連接池維護一些數(shù)據(jù)庫連接,使其長期保持在一個激活的狀態(tài)。當系統(tǒng)需要使用數(shù)據(jù)庫時,并不是創(chuàng)建一個新的連接,而是從連接池中獲得一個可用的連接即可。反之,當需要關(guān)閉連接時,并不真的把連接關(guān)閉,而是將這個連接“還”給連接池即可。通過此方法,通過調(diào)節(jié)線程池的基本大小和存活時間,可以幫助線程池回收空閑線程占有的資源,從而使得這些資源可以用于執(zhí)行其他的工作。
為了更好地控制多線程,JDK提供了一套Executor框架。核心成員如下圖所示
以上成員均在java.util.concurrent包中,是JDK并發(fā)包的核心類。其中ThreadPoolExecutor表示一個線程池。Executors類則扮演線程池工廠角色,通過Executors可以取得一個具有特定功能的線程池。從UML圖中亦可知,ThreadPoolExecutor實現(xiàn)了Executor接口,因此通過這個接口,任何Runnable對象都可以被ThreadPoolExecutor線程池調(diào)度。
Java提供了ExecutorService的三種實現(xiàn):
ThraedPoolExecutor:標準線程池
ScheduledThreadPoolExecutor:支持延時任務(wù)的線程池
ForkJoinPool:類似于ThraedPoolExecutor,但是使用work-stealing模式,其會為線程池中的每個線程創(chuàng)建一個隊列,從而使用work-stealing(任務(wù)竊取)算法使得線程可以從其他線程隊列里竊取任務(wù)來執(zhí)行。即如果自己的任務(wù)處理完成了,則可以去忙碌的工作線程那里去竊取任務(wù)執(zhí)行。
在本文,將會主要以ThraedPoolExecutor作為講解例子。
線程池的基本大小(Core POOL SIZE) ,較大大小(Maximum pool size) 以及存活時間等因素共同負責線程的創(chuàng)建和銷毀 。 基本大小也就是線程池的目標大小,即在沒有任務(wù)執(zhí)行是的線程池的大小,并且只有在工作隊列滿了的情況下才會創(chuàng)建超過這個數(shù)量的線程。線程池的較大大小表示可同時活動的線程數(shù)量的上限,如并且果某個線程的空閑時間超過了存活時間,那么將被標記為可回收的,并且當線程池的當前大小超過了基本大小時,這個線程將被終止。
newFixedThreadPool 工廠方法將線程池的基本大小和較大大小設(shè)置為參數(shù)中的執(zhí)行值,而且創(chuàng)建的線程池不會超時。newCachedThreadPool工廠方法將線程池的較大大小設(shè)置為Integer.MAX_VALUE,而將其基本大小設(shè)置為0,并將超時時間設(shè)置為1分鐘,這種方法創(chuàng)建的線程池可以無限擴展,并且當需求降低時會自動收縮,其他形式的線程池可以通過顯示的ThreadPoolExecutor構(gòu)造函數(shù)來溝通。
Executor框架提供了各種類型的線程池,主要有以下工廠方法。
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newSingleThreadScheduledExecutor() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
以上方法返回了具有不同工作特性的線程池,具體說明如下:
newFixedThreadPool返回一個固定數(shù)量的線程池。當一個新任務(wù)提交時,如果有空閑線程,則執(zhí)行。否則新任務(wù)暫存在一個任務(wù)隊列中,待有空閑時,便處理在任務(wù)隊列中的任務(wù)。
newSingleThreadExecutor返回一個線程的線程池。當多余一個新任務(wù)提交時,會暫存在一個任務(wù)隊列中,待有空閑時,按先入先出的順序處理在任務(wù)隊列中的任務(wù)。
newCachedThreadPool返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池,線程數(shù)量不確定,若有空閑,則會有限復用線程。否則創(chuàng)建新線程處理任務(wù)。所有線程在當前任務(wù)執(zhí)行完后,將返回線程池待復用。
newSingleThreadScheduledExecutor返回一個ScheduledExecutorService對象,線程池大小為1。ScheduledExecutorService在Executor接口之上擴展了在給定時間執(zhí)行某任務(wù)的功能。如果在某個固定的延時之后執(zhí)行,或周期性執(zhí)行某個任務(wù)。可以用這個工廠。
newScheduledThreadPool,返回一個ScheduledExecutorService對象,但該線程可以指定線程數(shù)量。
固定大小的線程池public class ExecutorExample { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId() ); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] a ){ MyTask myTask = new MyTask(); //創(chuàng)建固定大小線程池 ExecutorService es = Executors.newFixedThreadPool(5); for (int i =0;i<10;i++){ es.submit(myTask); } es.shutdown(); //總的來說就是5個線程去執(zhí)行10個任務(wù),因此你能看到每個ID都被打印了2遍。 } }任務(wù) 執(zhí)行單位
在此之前,我們得先了解線程池中最基本的執(zhí)行單位——Runable和Callable。
Executor使用Runnable作為其基本的任務(wù)表示形式。Runnable是有一種很大局限的抽象,雖然run能寫入到日志或者將結(jié)果放入某個共享的數(shù)據(jù)結(jié)構(gòu),但它不能返回一個值或者拋出一個受檢查的異常。那么Callble則可以彌補這些缺陷。
Runnable和Callable描述的都是抽象的計算任務(wù)。這些任務(wù)通常都是有范圍的,即都有一個明確的起始點,并且最終會結(jié)束。Executor執(zhí)行的任務(wù)有4個生命周期階段:
創(chuàng)建
提交
開始
完成
由于有些任務(wù)可能要執(zhí)行很長時間,因此通常能夠希望取消這些任務(wù)。在Executor框架中,已提交但尚未開始的任務(wù)可以取消,但對于那些已經(jīng)開始執(zhí)行的任務(wù),只有當它們能夠響應(yīng)中斷,才能取消。取消一個已完成的任務(wù)不會有任何影響。
Future則表示一個任務(wù)的生命周期,并提供了相應(yīng)的方法來判斷是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消的任務(wù)等。在Future規(guī)范中包含的隱含含義是,任務(wù)的生命周期只能前進,不能后退,就像ExecuteService的生命周期一樣,當某個任務(wù)完成后,它就永遠停留在“完成”狀態(tài)上。
Future.get方法的行為取決于任務(wù)的狀態(tài)(尚未開始、已經(jīng)運行、已完成)。如果任務(wù)已經(jīng)完成,那么get會立即返回或者拋出一個Exception,如果任務(wù)沒有完成,那么get將阻塞并直到任務(wù)完成。如果任務(wù)拋出了異常,那么get將異常封裝為ExecutionException并重新拋出。如果任務(wù)被取消了,那么get將拋出CancellationException。如果get拋出ExecutionException,那么可以通過getCause來獲得被封裝的初始異常。
計劃任務(wù)newScheduledThreadPool返回一個ScheduledExecutorService對象,可以根據(jù)實際對線程進行調(diào)度。
//在給定的時間,對任務(wù)進行一次調(diào)度 public ScheduledFuture> schedule(Runnable command,long delay, TimeUnit unit); //用于對任務(wù)進行周期性調(diào)度,任務(wù)調(diào)度的頻率是一定的,它以上一個任務(wù)開始執(zhí)行時間為起點,之后的period時間后調(diào)度下一次任務(wù)。如果任務(wù)的執(zhí)行時間大于調(diào)度時間,那么任務(wù)就會在上一個任務(wù)結(jié)束后,立即被調(diào)用。 public ScheduledFuture> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit); //對任務(wù)進行周期性調(diào)度,在上一個任務(wù)結(jié)束后,再經(jīng)過delay長的時間進行任務(wù)調(diào)度。 public ScheduledFuture> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
ScheduledExecutorService不會立即安排執(zhí)行任務(wù),它類似Linux中的crontab工具。如果任務(wù)遇到異常,則后續(xù)的所有子任務(wù)都會停止執(zhí)行。因此,必須保證異常被及時處理,為周期性任務(wù)的穩(wěn)定調(diào)度提供條件。
public class ScheduledExecutorExample { public static void main(String[] a){ ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); //如果前面的任務(wù)沒有完成,則調(diào)度也不會啟動 ses.scheduleAtFixedRate(new Runnable(){ @Override public void run() { long s = System.currentTimeMillis(); try { System.out.println(Thread.currentThread().getId() + " 號線程開始工作..."); //模擬處理事務(wù) Thread.sleep(1000); long e = System.currentTimeMillis(); System.out.println(Thread.currentThread().getId() + " 號線程結(jié)束工作...用時:" +( (e -s)/1000) +"s"); } catch (InterruptedException e) { e.printStackTrace(); } } },0,2, TimeUnit.SECONDS); } }核心線程池的內(nèi)部實現(xiàn)
對于核心的幾個線程池,無論是newFixedThreadPool()、newSingleThreadExecutor()還是newCacheThreadPool方法,雖然看起來創(chuàng)建的線程具有完全不同的功能特點,但其內(nèi)部均使用了ThreadPoolExecutor實現(xiàn)。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue ()); }
由以上線程池的實現(xiàn)可以看到,它們都只是ThreadPoolExecutor類的封裝。我們看下ThreadPoolExecutor最重要的構(gòu)造函數(shù):
public ThreadPoolExecutor( //指定了線程池中的線程數(shù)量 int corePoolSize, //指定了線程池中的最大線程數(shù)量 int maximumPoolSize, //當前線程池數(shù)量超過corePoolSize時,多余的空閑線程的存活時間,即多次時間內(nèi)會被銷毀。 long keepAliveTime, //keepAliveTime的單位 TimeUnit unit, //任務(wù)隊列,被提交但尚未被執(zhí)行的任務(wù)。 BlockingQueueworkQueue, //線程工廠,用于創(chuàng)建線程,一般用默認的即可 ThreadFactory threadFactory, //拒絕策略,當任務(wù)太多來不及處理,如何拒絕任務(wù)。 RejectedExecutionHandler handler)
在這里面,大多數(shù)的參數(shù)都是較好理解的,但是workQueue和handler需要進行詳細說明。
WorkQueueworkQueue指提交但未執(zhí)行的任務(wù)隊列,它是一個BlockingQueue接口的對象,僅用于存放Runnable對象,根據(jù)隊列功能分類,在ThreadPoolExecutor的構(gòu)造函數(shù)中可使用以下幾種BlockingQueue。
直接提交的隊列:
該功能由synchronousQueue對象提供,synchronousQueue對象是一個特殊的BlockingQueue。synchronousQueue沒有容量,每一個插入操作都要等待一個響應(yīng)的刪除操作,反之每一個刪除操作都要等待對應(yīng)的插入操作。如果使用synchronousQueue,提交的任務(wù)不會被真實的保存,而總是將新任務(wù)提交給線程執(zhí)行,如果沒有空閑線程,則嘗試創(chuàng)建線程,如果線程數(shù)量已經(jīng)達到了最大值,則執(zhí)行拒絕策略,因此,使用synchronousQueue隊列,通常要設(shè)置很大的maximumPoolSize值,否則很容易執(zhí)行拒絕策略。
有界的任務(wù)隊列:
有界任務(wù)隊列可以使用ArrayBlockingQueue實現(xiàn)。ArrayBlockingQueue構(gòu)造函數(shù)必須帶有一個容量參數(shù),表示隊列的最大容量。
public ArrayBlockingQueue(int capacity)
當使用有界任務(wù)隊列時,若有新任務(wù)需要執(zhí)行時,如果線程池的實際線程數(shù)量小于corePoolSize,則會優(yōu)先創(chuàng)建線程。若大于corePoolSize,則會將新任務(wù)加入等待隊列。若等待隊列已滿,無法加入,則在總線程數(shù)不大于maximumPoolSize的前提下,創(chuàng)建新的線程執(zhí)行任務(wù)。若大于maximumPoolSize,則執(zhí)行拒絕策略。可見有界隊列僅當在任務(wù)隊列裝滿后,才可能將線程數(shù)量提升到corePoolSize以上,換言之,除非系統(tǒng)非常繁忙,否則確保核心線程數(shù)維持在corePoolSize。
無界的任務(wù)隊列:
無界隊列可以通過LinkedBlockingQueue類實現(xiàn)。與有界隊列相比,除非系統(tǒng)資源耗盡,無界隊列的任務(wù)隊列不存在任務(wù)入隊失敗的情況。若有新任務(wù)需要執(zhí)行時,如果線程池的實際線程數(shù)量小于corePoolSize,則會優(yōu)先創(chuàng)建線程執(zhí)行。但當系統(tǒng)的線程數(shù)量達到corePoolSize后就不再創(chuàng)建了,這里和有界任務(wù)隊列是有明顯區(qū)別的。若后續(xù)還有新任務(wù)加入,而又沒有空閑線程資源,則任務(wù)直接進入隊列等待。若任務(wù)創(chuàng)建和處理的速度差異很大,無界隊列會保持快速增長,知道耗盡系統(tǒng)內(nèi)存。
優(yōu)先任務(wù)隊列:
帶有優(yōu)先級別的隊列,它通過PriorityBlokingQueue實現(xiàn),可以控制任務(wù)執(zhí)行的優(yōu)先順序。它是一個特殊的無界隊列。無論是ArrayBlockingQueue還是LinkedBlockingQueue實現(xiàn)的隊列,都是按照先進先出的算法處理任務(wù),而PriorityBlokingQueue根據(jù)任務(wù)自身優(yōu)先級順序先后執(zhí)行,在確保系統(tǒng)性能同時,也能很好的質(zhì)量保證(總是確保高優(yōu)先級的任務(wù)優(yōu)先執(zhí)行)。
開發(fā)人員以免有時會將線程池的基本大小設(shè)置為零,從而最終銷毀工作者線程以免阻礙JVM的退出。然而,如果在線程池中沒有使用SynchronousQueue作為其工作隊列(例如在newCachedThreadPool中就是如此,它的核心池設(shè)為0,但它的任務(wù)隊列使用的是SynchronousQueue),那么這種方式將產(chǎn)生一些奇怪的行為。如果線程池中的線程數(shù)量等于線程池的基本大小,那么僅當在工作隊列已滿的情況下ThreadPoolExecutor才會創(chuàng)建新的線程。因此,如果線程池的基本大小為零并且其工作隊列有一定的容量,那么當把任務(wù)提交給該線程池時,只有當線程池的工作隊列被填滿后,才會開始執(zhí)行任務(wù),而這種行為通常不是我們所希望的。在Java6中,可以通過allowCoreThreadTimeOut來使線程池中的所有線程超時。對于一個大小有限的線程池并且在該線程池中包含了一個工作隊列,如果希望 這個線程池在沒有任務(wù)的情況下能銷毀所有的線程 ,那么可以啟用這個特性并將基本大小設(shè)置為零。
調(diào)度邏輯可以總結(jié)為
飽和策略線程池中的線程已經(jīng)用完了,無法繼續(xù)為新任務(wù)服務(wù),同時,等待隊列也已經(jīng)排滿了,再也塞不下新任務(wù)了。這時候我們就需要拒絕策略機制合理的處理這個問題。
JDK內(nèi)置的拒絕策略如下:
AbortPolicy : 默認策略。直接拋出異常DiscardExecutionException,調(diào)用者可以考慮捕獲這個異常,編寫自己的處理代碼。
CallerRunsPolicy : 該策略既不會拋棄任務(wù),也不會拋出異常,而是將某些任務(wù)回退到調(diào)用者。
DiscardOldestPolicy : 丟棄最老的一個請求,也就是即將被執(zhí)行的一個任務(wù),并嘗試再次提交當前任務(wù)。
DiscardPolicy : 該策略默默地丟棄無法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,這是最好的一種方案。
以上內(nèi)置拒絕策略均實現(xiàn)了RejectedExecutionHandler接口,若以上策略仍無法滿足實際需要,完全可以自己擴展RejectedExecutionHandler接口。RejectedExecutionHandler的定義如下。
public interface RejectedExecutionHandler { /** * @param r 請求執(zhí)行的任務(wù) * @param executor 當前線程池 **/ void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
那么接下來看一個簡單地演示了自定義線程池和拒絕策略的使用:
public class RejectThreadPoolDemo { public static class MyTask implements Runnable { public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(10), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is discard"); } }); for (int i = 0; i < Integer.MAX_VALUE; i++) { es.submit(task); Thread.sleep(10); } } }
這個例子定義一個線程池,里面有5個常駐線程,并且最大線程數(shù)也是5個。這和固定大小的線程池是一樣的。但是它卻擁有一個只有10個容量的等待隊列。那么必定會有在大量的任務(wù)被直接丟棄。
自定義線程創(chuàng)建:ThreadFactory線程池中的線程從何而來?來自ThreadFactory。
ThreadFactory是一個接口,它只有一個方法,用來創(chuàng)建線程:
Thread newThread(Runnable r);
當線程池需要新建線程時,就會調(diào)用這個方法。
自定義線程池可以幫我們做不少事情。我們可以跟蹤線程池在何時創(chuàng)建了多少線程,也可以自定義線程的名稱、組以及優(yōu)先級等信息,甚至可以任性地將所有的線程設(shè)置為守護線程。總之,使用自定義線程可以讓我們更加自由地設(shè)置池中所有的線程的狀態(tài)。下面的案例使用自定義ThreadFactory,一方面記錄了線程的創(chuàng)建,另一方面將所有的線程都設(shè)置為守護線程,這樣,當主線程退出后,將會強制銷毀線程池。
public class ThreadFactoryExample { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName() + " coming..."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] a ) throws InterruptedException { MyTask myTask = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue擴展線程池(10) , new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("T " + t.getId() + "_" +System.currentTimeMillis()); t.setDaemon(true); System.out.println("Create a Thread Name is : "+t.getName()); return t; } }); for (int i=0;i<10;i++){ es.submit(myTask); } Thread.sleep(2000); } }
ThreadPoolExecutor是可擴展的,它提供了幾個“鉤子”方法可以在子類化中改寫:beforeExecute、afterExecute和terminated,這些方法可以用于擴展ThreadPoolExecutor的行為。
在執(zhí)行任務(wù)的線程中將調(diào)用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計時、監(jiān)視或統(tǒng)計信息收集的功能。無論任務(wù)是從run中正常返回,還是拋出一個異常而返回,afterExecute都會被調(diào)用。(如果任務(wù)在完成后帶有一個Error,那么就不會調(diào)用afterExecute。)如果beforeExecute拋出了一個RuntimeException,那么任務(wù)將不會被執(zhí)行,并且afterExecute也不會被調(diào)用。
栗子:
public class ExtThreadPool { public static class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { try { System.out.println("現(xiàn)在向控制臺走來的是線程" + Thread.currentThread().getId() + "號" + "名字為:" + name); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] a) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque分治思想:Fork/Join框架()) { /** * 創(chuàng)建ThreadPoolExecutor的匿名內(nèi)部類的子類 * * @param t * the thread that will run task 將要運行任務(wù)的線程 * @param r * the task that will be executed 將要執(zhí)行的任務(wù) **/ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("start execute .." + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("after execute .." + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("exit execute .."); } }; for (int i = 0; i < 10; i++) { MyTask myTask = new MyTask("T_" + i); es.execute(myTask);// execute 和 submit 的區(qū)別在future模式中再說 Thread.sleep(100); } /** * 不會暴力的關(guān)閉,而會等待所有線程執(zhí)行完后關(guān)閉線程 可以簡單的理解為shutdown只是發(fā)送一個關(guān)閉信號, * 但在shutdown之后,線程就不能再接受其他任務(wù)了. **/ es.shutdown(); } }
分治思想經(jīng)常在一些經(jīng)典的算法中能看到,算是一個非常有效地處理大量數(shù)據(jù)的方法。
在Linux平臺中,函數(shù)fork()用來創(chuàng)建子線程,使得系統(tǒng)進程可以多一個執(zhí)行分支。
而join這個方法相信了解java多線程的同學一定不會陌生,它表示等待。也就是使用fork()后系統(tǒng)多一個執(zhí)行分支(線程),所以需要等待這個執(zhí)行分支執(zhí)行完畢,才有可能得到最終的結(jié)果,因此join()就表示等待。
在實際使用中,如果毫無顧忌的使用fork()開啟線程進行處理,那么很有可能導致系統(tǒng)開啟過多的線程而嚴重影響性能。所以,在JDK中給出一個ForkJoinPool線程池,對于fork()方法并不著急開啟線程,而是提交給ForkJoinPool線程池進行處理,以節(jié)省系統(tǒng)資源。使用Fork/Join進行數(shù)據(jù)處理時候的總體結(jié)構(gòu)如下圖。
由于線程池的優(yōu)化,提交的任務(wù)和線程數(shù)量并不是一對一的關(guān)系。在絕大多數(shù)情況下,一個物理線程實際上是需要處理多個邏輯任務(wù)的。因此,每個線程必然需要擁有一個任務(wù)隊列。在實際執(zhí)行過程中,可能遇到這么一種情況:線程A執(zhí)行完了自己的所有任務(wù),而線程B還有一堆任務(wù)等著處理。此時,線程A就會“幫助”線程B,從線程B的任務(wù)列表中拿一個任務(wù)過來處理,盡可能達到平衡。值得注意的是,當線程視圖幫助別的線程時,總是從任務(wù)隊列的底部開始拿數(shù)據(jù),而線程視圖執(zhí)行自己的任務(wù)時,則是從相反的頂部開始拿。因此這種行為也十分有利于避免數(shù)據(jù)競爭。
接下來看一下ForkJoinPool的一個重要的接口:
publicForkJoinPool submit(ForkJoinPoolTask task)
你可以向ForkJoinPool線程池提交一個ForkJoinTask任務(wù)。所謂ForkJoinTask任務(wù)就是支持fork()分解以及join等待的任務(wù)。ForkJoinTask有兩個最重要的子類,RecursiveAction和RecursiveTask。它們分別表示沒有返回值的任務(wù)和可以攜帶返回值的任務(wù)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76404.html
摘要:并發(fā)編程實戰(zhàn)水平很高,然而并不是本好書。一是多線程的控制,二是并發(fā)同步的管理。最后,使用和來關(guān)閉線程池,停止其中的線程。當線程調(diào)用或等阻塞時,對這個線程調(diào)用會使線程醒來,并受到,且線程的中斷標記被設(shè)置。 《Java并發(fā)編程實戰(zhàn)》水平很高,然而并不是本好書。組織混亂、長篇大論、難以消化,中文翻譯也較死板。這里是一篇批評此書的帖子,很是貼切。俗話說:看到有這么多人罵你,我就放心了。 然而知...
摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識點總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機制解讀抽象類與三大特征時間和時間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對象鎖和類鎖的區(qū)別,,優(yōu)缺點及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...
摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識點總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機制解讀抽象類與三大特征時間和時間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對象鎖和類鎖的區(qū)別,,優(yōu)缺點及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...
摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識點總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機制解讀抽象類與三大特征時間和時間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對象鎖和類鎖的區(qū)別,,優(yōu)缺點及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...
閱讀 2822·2023-04-26 01:00
閱讀 753·2021-10-11 10:59
閱讀 2981·2019-08-30 11:18
閱讀 2677·2019-08-29 11:18
閱讀 1022·2019-08-28 18:28
閱讀 3014·2019-08-26 18:36
閱讀 2135·2019-08-23 18:16
閱讀 1069·2019-08-23 15:56