国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Java多線程筆記(三):線程池

琛h。 / 3337人閱讀

摘要:類則扮演線程池工廠角色,通過可以取得一個具有特定功能的線程池。返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池,線程數(shù)量不確定,若有空閑,則會有限復用線程。所有線程在當前任務(wù)執(zhí)行完后,將返回線程池待復用。

前言

多線程的軟件設(shè)計方案確實可以最大限度地發(fā)揮現(xiàn)代多核處理器的計算能力,提高生產(chǎn)系列的吞吐量和性能。但是,若不加控制和管理的隨意使用線程,對系統(tǒng)的性能反而會產(chǎn)生不利的影響。最容易想到的后果就是線程過多導致CPU忙于切換而無力執(zhí)行其中的工作。

為了避免系統(tǒng)頻繁地創(chuàng)建和銷毀線程,我們可以讓創(chuàng)建的線程進行復用。如果有同學有過數(shù)據(jù)庫開發(fā)的經(jīng)驗,對數(shù)據(jù)庫連接池這個概念應(yīng)該不會陌生。為了避免每次數(shù)據(jù)庫查詢都重新建立和銷毀數(shù)據(jù)庫連接,我們可以使用數(shù)據(jù)庫連接池維護一些數(shù)據(jù)庫連接,使其長期保持在一個激活的狀態(tài)。當系統(tǒng)需要使用數(shù)據(jù)庫時,并不是創(chuàng)建一個新的連接,而是從連接池中獲得一個可用的連接即可。反之,當需要關(guān)閉連接時,并不真的把連接關(guān)閉,而是將這個連接“還”給連接池即可。通過此方法,通過調(diào)節(jié)線程池的基本大小和存活時間,可以幫助線程池回收空閑線程占有的資源,從而使得這些資源可以用于執(zhí)行其他的工作。

為了更好地控制多線程,JDK提供了一套Executor框架。核心成員如下圖所示

以上成員均在java.util.concurrent包中,是JDK并發(fā)包的核心類。其中ThreadPoolExecutor表示一個線程池。Executors類則扮演線程池工廠角色,通過Executors可以取得一個具有特定功能的線程池。從UML圖中亦可知,ThreadPoolExecutor實現(xiàn)了Executor接口,因此通過這個接口,任何Runnable對象都可以被ThreadPoolExecutor線程池調(diào)度。

Java提供了ExecutorService的三種實現(xiàn):

ThraedPoolExecutor:標準線程池

ScheduledThreadPoolExecutor:支持延時任務(wù)的線程池

ForkJoinPool:類似于ThraedPoolExecutor,但是使用work-stealing模式,其會為線程池中的每個線程創(chuàng)建一個隊列,從而使用work-stealing(任務(wù)竊取)算法使得線程可以從其他線程隊列里竊取任務(wù)來執(zhí)行。即如果自己的任務(wù)處理完成了,則可以去忙碌的工作線程那里去竊取任務(wù)執(zhí)行。

在本文,將會主要以ThraedPoolExecutor作為講解例子。

線程池的基本大小(Core POOL SIZE) ,較大大小(Maximum pool size) 以及存活時間等因素共同負責線程的創(chuàng)建和銷毀 。 基本大小也就是線程池的目標大小,即在沒有任務(wù)執(zhí)行是的線程池的大小,并且只有在工作隊列滿了的情況下才會創(chuàng)建超過這個數(shù)量的線程。線程池的較大大小表示可同時活動的線程數(shù)量的上限,如并且果某個線程的空閑時間超過了存活時間,那么將被標記為可回收的,并且當線程池的當前大小超過了基本大小時,這個線程將被終止。

newFixedThreadPool 工廠方法將線程池的基本大小和較大大小設(shè)置為參數(shù)中的執(zhí)行值,而且創(chuàng)建的線程池不會超時。newCachedThreadPool工廠方法將線程池的較大大小設(shè)置為Integer.MAX_VALUE,而將其基本大小設(shè)置為0,并將超時時間設(shè)置為1分鐘,這種方法創(chuàng)建的線程池可以無限擴展,并且當需求降低時會自動收縮,其他形式的線程池可以通過顯示的ThreadPoolExecutor構(gòu)造函數(shù)來溝通。

Executor框架提供了各種類型的線程池,主要有以下工廠方法。

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

以上方法返回了具有不同工作特性的線程池,具體說明如下:

newFixedThreadPool返回一個固定數(shù)量的線程池。當一個新任務(wù)提交時,如果有空閑線程,則執(zhí)行。否則新任務(wù)暫存在一個任務(wù)隊列中,待有空閑時,便處理在任務(wù)隊列中的任務(wù)。

newSingleThreadExecutor返回一個線程的線程池。當多余一個新任務(wù)提交時,會暫存在一個任務(wù)隊列中,待有空閑時,按先入先出的順序處理在任務(wù)隊列中的任務(wù)。

newCachedThreadPool返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池,線程數(shù)量不確定,若有空閑,則會有限復用線程。否則創(chuàng)建新線程處理任務(wù)。所有線程在當前任務(wù)執(zhí)行完后,將返回線程池待復用。

newSingleThreadScheduledExecutor返回一個ScheduledExecutorService對象,線程池大小為1。ScheduledExecutorService在Executor接口之上擴展了在給定時間執(zhí)行某任務(wù)的功能。如果在某個固定的延時之后執(zhí)行,或周期性執(zhí)行某個任務(wù)。可以用這個工廠。

newScheduledThreadPool,返回一個ScheduledExecutorService對象,但該線程可以指定線程數(shù)量。

固定大小的線程池
public class ExecutorExample {
    public static class MyTask implements Runnable{

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId() );
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] a ){
        MyTask myTask = new MyTask();

        //創(chuàng)建固定大小線程池
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i =0;i<10;i++){
            es.submit(myTask);
        }
        es.shutdown();
        //總的來說就是5個線程去執(zhí)行10個任務(wù),因此你能看到每個ID都被打印了2遍。

    }
}
任務(wù) 執(zhí)行單位

在此之前,我們得先了解線程池中最基本的執(zhí)行單位——Runable和Callable。

Executor使用Runnable作為其基本的任務(wù)表示形式。Runnable是有一種很大局限的抽象,雖然run能寫入到日志或者將結(jié)果放入某個共享的數(shù)據(jù)結(jié)構(gòu),但它不能返回一個值或者拋出一個受檢查的異常。那么Callble則可以彌補這些缺陷。

Runnable和Callable描述的都是抽象的計算任務(wù)。這些任務(wù)通常都是有范圍的,即都有一個明確的起始點,并且最終會結(jié)束。Executor執(zhí)行的任務(wù)有4個生命周期階段:

創(chuàng)建

提交

開始

完成

由于有些任務(wù)可能要執(zhí)行很長時間,因此通常能夠希望取消這些任務(wù)。在Executor框架中,已提交但尚未開始的任務(wù)可以取消,但對于那些已經(jīng)開始執(zhí)行的任務(wù),只有當它們能夠響應(yīng)中斷,才能取消。取消一個已完成的任務(wù)不會有任何影響。

Future則表示一個任務(wù)的生命周期,并提供了相應(yīng)的方法來判斷是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消的任務(wù)等。在Future規(guī)范中包含的隱含含義是,任務(wù)的生命周期只能前進,不能后退,就像ExecuteService的生命周期一樣,當某個任務(wù)完成后,它就永遠停留在“完成”狀態(tài)上。

Future.get方法的行為取決于任務(wù)的狀態(tài)(尚未開始、已經(jīng)運行、已完成)。如果任務(wù)已經(jīng)完成,那么get會立即返回或者拋出一個Exception,如果任務(wù)沒有完成,那么get將阻塞并直到任務(wù)完成。如果任務(wù)拋出了異常,那么get將異常封裝為ExecutionException并重新拋出。如果任務(wù)被取消了,那么get將拋出CancellationException。如果get拋出ExecutionException,那么可以通過getCause來獲得被封裝的初始異常。

計劃任務(wù)

newScheduledThreadPool返回一個ScheduledExecutorService對象,可以根據(jù)實際對線程進行調(diào)度。

//在給定的時間,對任務(wù)進行一次調(diào)度
public ScheduledFuture schedule(Runnable command,long delay, TimeUnit unit);
//用于對任務(wù)進行周期性調(diào)度,任務(wù)調(diào)度的頻率是一定的,它以上一個任務(wù)開始執(zhí)行時間為起點,之后的period時間后調(diào)度下一次任務(wù)。如果任務(wù)的執(zhí)行時間大于調(diào)度時間,那么任務(wù)就會在上一個任務(wù)結(jié)束后,立即被調(diào)用。
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//對任務(wù)進行周期性調(diào)度,在上一個任務(wù)結(jié)束后,再經(jīng)過delay長的時間進行任務(wù)調(diào)度。
public ScheduledFuture scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

ScheduledExecutorService不會立即安排執(zhí)行任務(wù),它類似Linux中的crontab工具。如果任務(wù)遇到異常,則后續(xù)的所有子任務(wù)都會停止執(zhí)行。因此,必須保證異常被及時處理,為周期性任務(wù)的穩(wěn)定調(diào)度提供條件。

public class ScheduledExecutorExample {
    public static void main(String[] a){
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        //如果前面的任務(wù)沒有完成,則調(diào)度也不會啟動
        ses.scheduleAtFixedRate(new Runnable(){
            @Override
            public void run() {
                long s = System.currentTimeMillis();
                try {
                    System.out.println(Thread.currentThread().getId() + " 號線程開始工作...");
                    //模擬處理事務(wù)
                    Thread.sleep(1000);
                    long e = System.currentTimeMillis();
                    System.out.println(Thread.currentThread().getId() + " 號線程結(jié)束工作...用時:" +( (e -s)/1000) +"s");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}
核心線程池的內(nèi)部實現(xiàn)

對于核心的幾個線程池,無論是newFixedThreadPool()、newSingleThreadExecutor()還是newCacheThreadPool方法,雖然看起來創(chuàng)建的線程具有完全不同的功能特點,但其內(nèi)部均使用了ThreadPoolExecutor實現(xiàn)。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue());
    }
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,  new LinkedBlockingQueue()));
    }
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  60L, TimeUnit.SECONDS,new SynchronousQueue());
    }

由以上線程池的實現(xiàn)可以看到,它們都只是ThreadPoolExecutor類的封裝。我們看下ThreadPoolExecutor最重要的構(gòu)造函數(shù):

public ThreadPoolExecutor(
            //指定了線程池中的線程數(shù)量
            int corePoolSize,
            //指定了線程池中的最大線程數(shù)量
            int maximumPoolSize,
            //當前線程池數(shù)量超過corePoolSize時,多余的空閑線程的存活時間,即多次時間內(nèi)會被銷毀。
            long keepAliveTime,
            //keepAliveTime的單位
            TimeUnit unit,
            //任務(wù)隊列,被提交但尚未被執(zhí)行的任務(wù)。
            BlockingQueue workQueue,
            //線程工廠,用于創(chuàng)建線程,一般用默認的即可
            ThreadFactory threadFactory,
            //拒絕策略,當任務(wù)太多來不及處理,如何拒絕任務(wù)。
            RejectedExecutionHandler handler)

在這里面,大多數(shù)的參數(shù)都是較好理解的,但是workQueue和handler需要進行詳細說明。

WorkQueue

workQueue指提交但未執(zhí)行的任務(wù)隊列,它是一個BlockingQueue接口的對象,僅用于存放Runnable對象,根據(jù)隊列功能分類,在ThreadPoolExecutor的構(gòu)造函數(shù)中可使用以下幾種BlockingQueue。

直接提交的隊列:

該功能由synchronousQueue對象提供,synchronousQueue對象是一個特殊的BlockingQueue。synchronousQueue沒有容量,每一個插入操作都要等待一個響應(yīng)的刪除操作,反之每一個刪除操作都要等待對應(yīng)的插入操作。如果使用synchronousQueue,提交的任務(wù)不會被真實的保存,而總是將新任務(wù)提交給線程執(zhí)行,如果沒有空閑線程,則嘗試創(chuàng)建線程,如果線程數(shù)量已經(jīng)達到了最大值,則執(zhí)行拒絕策略,因此,使用synchronousQueue隊列,通常要設(shè)置很大的maximumPoolSize值,否則很容易執(zhí)行拒絕策略。

有界的任務(wù)隊列:

有界任務(wù)隊列可以使用ArrayBlockingQueue實現(xiàn)。ArrayBlockingQueue構(gòu)造函數(shù)必須帶有一個容量參數(shù),表示隊列的最大容量。

public ArrayBlockingQueue(int capacity)

當使用有界任務(wù)隊列時,若有新任務(wù)需要執(zhí)行時,如果線程池的實際線程數(shù)量小于corePoolSize,則會優(yōu)先創(chuàng)建線程。若大于corePoolSize,則會將新任務(wù)加入等待隊列。若等待隊列已滿,無法加入,則在總線程數(shù)不大于maximumPoolSize的前提下,創(chuàng)建新的線程執(zhí)行任務(wù)。若大于maximumPoolSize,則執(zhí)行拒絕策略。可見有界隊列僅當在任務(wù)隊列裝滿后,才可能將線程數(shù)量提升到corePoolSize以上,換言之,除非系統(tǒng)非常繁忙,否則確保核心線程數(shù)維持在corePoolSize。

無界的任務(wù)隊列:

無界隊列可以通過LinkedBlockingQueue類實現(xiàn)。與有界隊列相比,除非系統(tǒng)資源耗盡,無界隊列的任務(wù)隊列不存在任務(wù)入隊失敗的情況。若有新任務(wù)需要執(zhí)行時,如果線程池的實際線程數(shù)量小于corePoolSize,則會優(yōu)先創(chuàng)建線程執(zhí)行。但當系統(tǒng)的線程數(shù)量達到corePoolSize后就不再創(chuàng)建了,這里和有界任務(wù)隊列是有明顯區(qū)別的。若后續(xù)還有新任務(wù)加入,而又沒有空閑線程資源,則任務(wù)直接進入隊列等待。若任務(wù)創(chuàng)建和處理的速度差異很大,無界隊列會保持快速增長,知道耗盡系統(tǒng)內(nèi)存。

優(yōu)先任務(wù)隊列:

帶有優(yōu)先級別的隊列,它通過PriorityBlokingQueue實現(xiàn),可以控制任務(wù)執(zhí)行的優(yōu)先順序。它是一個特殊的無界隊列。無論是ArrayBlockingQueue還是LinkedBlockingQueue實現(xiàn)的隊列,都是按照先進先出的算法處理任務(wù),而PriorityBlokingQueue根據(jù)任務(wù)自身優(yōu)先級順序先后執(zhí)行,在確保系統(tǒng)性能同時,也能很好的質(zhì)量保證(總是確保高優(yōu)先級的任務(wù)優(yōu)先執(zhí)行)。

開發(fā)人員以免有時會將線程池的基本大小設(shè)置為零,從而最終銷毀工作者線程以免阻礙JVM的退出。然而,如果在線程池中沒有使用SynchronousQueue作為其工作隊列(例如在newCachedThreadPool中就是如此,它的核心池設(shè)為0,但它的任務(wù)隊列使用的是SynchronousQueue),那么這種方式將產(chǎn)生一些奇怪的行為。如果線程池中的線程數(shù)量等于線程池的基本大小,那么僅當在工作隊列已滿的情況下ThreadPoolExecutor才會創(chuàng)建新的線程。因此,如果線程池的基本大小為零并且其工作隊列有一定的容量,那么當把任務(wù)提交給該線程池時,只有當線程池的工作隊列被填滿后,才會開始執(zhí)行任務(wù),而這種行為通常不是我們所希望的。在Java6中,可以通過allowCoreThreadTimeOut來使線程池中的所有線程超時。對于一個大小有限的線程池并且在該線程池中包含了一個工作隊列,如果希望 這個線程池在沒有任務(wù)的情況下能銷毀所有的線程 ,那么可以啟用這個特性并將基本大小設(shè)置為零。

調(diào)度邏輯可以總結(jié)為

飽和策略

線程池中的線程已經(jīng)用完了,無法繼續(xù)為新任務(wù)服務(wù),同時,等待隊列也已經(jīng)排滿了,再也塞不下新任務(wù)了。這時候我們就需要拒絕策略機制合理的處理這個問題。
JDK內(nèi)置的拒絕策略如下:

AbortPolicy : 默認策略。直接拋出異常DiscardExecutionException,調(diào)用者可以考慮捕獲這個異常,編寫自己的處理代碼。

CallerRunsPolicy : 該策略既不會拋棄任務(wù),也不會拋出異常,而是將某些任務(wù)回退到調(diào)用者。

DiscardOldestPolicy : 丟棄最老的一個請求,也就是即將被執(zhí)行的一個任務(wù),并嘗試再次提交當前任務(wù)。

DiscardPolicy : 該策略默默地丟棄無法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,這是最好的一種方案。

以上內(nèi)置拒絕策略均實現(xiàn)了RejectedExecutionHandler接口,若以上策略仍無法滿足實際需要,完全可以自己擴展RejectedExecutionHandler接口。RejectedExecutionHandler的定義如下。

public interface RejectedExecutionHandler {
    /**
     * @param r 請求執(zhí)行的任務(wù)
     * @param executor 當前線程池
     **/
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

那么接下來看一個簡單地演示了自定義線程池和拒絕策略的使用:

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue(10), new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + "is discard");
                    }
                });
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }

}

這個例子定義一個線程池,里面有5個常駐線程,并且最大線程數(shù)也是5個。這和固定大小的線程池是一樣的。但是它卻擁有一個只有10個容量的等待隊列。那么必定會有在大量的任務(wù)被直接丟棄。

自定義線程創(chuàng)建:ThreadFactory

線程池中的線程從何而來?來自ThreadFactory。

ThreadFactory是一個接口,它只有一個方法,用來創(chuàng)建線程:

Thread newThread(Runnable r);

當線程池需要新建線程時,就會調(diào)用這個方法。

自定義線程池可以幫我們做不少事情。我們可以跟蹤線程池在何時創(chuàng)建了多少線程,也可以自定義線程的名稱、組以及優(yōu)先級等信息,甚至可以任性地將所有的線程設(shè)置為守護線程。總之,使用自定義線程可以讓我們更加自由地設(shè)置池中所有的線程的狀態(tài)。下面的案例使用自定義ThreadFactory,一方面記錄了線程的創(chuàng)建,另一方面將所有的線程都設(shè)置為守護線程,這樣,當主線程退出后,將會強制銷毀線程池。

public class ThreadFactoryExample {

    public static class MyTask implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " coming...");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] a ) throws InterruptedException {
        MyTask myTask = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(10)
                , new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("T " + t.getId() + "_" +System.currentTimeMillis());
                        t.setDaemon(true);
                        System.out.println("Create a Thread Name is : "+t.getName());
                        return t;
                    }
                });
        for (int i=0;i<10;i++){
            es.submit(myTask);
        }
        Thread.sleep(2000);

    }
}
擴展線程池

ThreadPoolExecutor是可擴展的,它提供了幾個“鉤子”方法可以在子類化中改寫:beforeExecute、afterExecute和terminated,這些方法可以用于擴展ThreadPoolExecutor的行為。

在執(zhí)行任務(wù)的線程中將調(diào)用beforeExecute和afterExecute等方法,在這些方法中還可以添加日志、計時、監(jiān)視或統(tǒng)計信息收集的功能。無論任務(wù)是從run中正常返回,還是拋出一個異常而返回,afterExecute都會被調(diào)用。(如果任務(wù)在完成后帶有一個Error,那么就不會調(diào)用afterExecute。)如果beforeExecute拋出了一個RuntimeException,那么任務(wù)將不會被執(zhí)行,并且afterExecute也不會被調(diào)用。

栗子:

public class ExtThreadPool {

    public static class MyTask implements Runnable {

        private String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                System.out.println("現(xiàn)在向控制臺走來的是線程" + Thread.currentThread().getId() + "號" + "名字為:" + name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] a) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque()) {
            /**
             * 創(chuàng)建ThreadPoolExecutor的匿名內(nèi)部類的子類
             *
             * @param t
             *            the thread that will run task 將要運行任務(wù)的線程
             * @param r
             *            the task that will be executed 將要執(zhí)行的任務(wù)
             **/

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("start execute .." + ((MyTask) r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("after execute .." + ((MyTask) r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("exit execute ..");
            }
        };

        for (int i = 0; i < 10; i++) {
            MyTask myTask = new MyTask("T_" + i);
            es.execute(myTask);// execute 和 submit 的區(qū)別在future模式中再說
            Thread.sleep(100);
        }

        /**
         * 不會暴力的關(guān)閉,而會等待所有線程執(zhí)行完后關(guān)閉線程 可以簡單的理解為shutdown只是發(fā)送一個關(guān)閉信號,
         * 但在shutdown之后,線程就不能再接受其他任務(wù)了.
         **/
        es.shutdown();

    }
}
分治思想:Fork/Join框架

分治思想經(jīng)常在一些經(jīng)典的算法中能看到,算是一個非常有效地處理大量數(shù)據(jù)的方法。

在Linux平臺中,函數(shù)fork()用來創(chuàng)建子線程,使得系統(tǒng)進程可以多一個執(zhí)行分支。

而join這個方法相信了解java多線程的同學一定不會陌生,它表示等待。也就是使用fork()后系統(tǒng)多一個執(zhí)行分支(線程),所以需要等待這個執(zhí)行分支執(zhí)行完畢,才有可能得到最終的結(jié)果,因此join()就表示等待。

在實際使用中,如果毫無顧忌的使用fork()開啟線程進行處理,那么很有可能導致系統(tǒng)開啟過多的線程而嚴重影響性能。所以,在JDK中給出一個ForkJoinPool線程池,對于fork()方法并不著急開啟線程,而是提交給ForkJoinPool線程池進行處理,以節(jié)省系統(tǒng)資源。使用Fork/Join進行數(shù)據(jù)處理時候的總體結(jié)構(gòu)如下圖。

由于線程池的優(yōu)化,提交的任務(wù)和線程數(shù)量并不是一對一的關(guān)系。在絕大多數(shù)情況下,一個物理線程實際上是需要處理多個邏輯任務(wù)的。因此,每個線程必然需要擁有一個任務(wù)隊列。在實際執(zhí)行過程中,可能遇到這么一種情況:線程A執(zhí)行完了自己的所有任務(wù),而線程B還有一堆任務(wù)等著處理。此時,線程A就會“幫助”線程B,從線程B的任務(wù)列表中拿一個任務(wù)過來處理,盡可能達到平衡。值得注意的是,當線程視圖幫助別的線程時,總是從任務(wù)隊列的底部開始拿數(shù)據(jù),而線程視圖執(zhí)行自己的任務(wù)時,則是從相反的頂部開始拿。因此這種行為也十分有利于避免數(shù)據(jù)競爭。

接下來看一下ForkJoinPool的一個重要的接口:

public ForkJoinPool submit(ForkJoinPoolTasktask)

你可以向ForkJoinPool線程池提交一個ForkJoinTask任務(wù)。所謂ForkJoinTask任務(wù)就是支持fork()分解以及join等待的任務(wù)。ForkJoinTask有兩個最重要的子類,RecursiveAction和RecursiveTask。它們分別表示沒有返回值的任務(wù)和可以攜帶返回值的任務(wù)。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76404.html

相關(guān)文章

  • Java并發(fā)編程筆記(一)

    摘要:并發(fā)編程實戰(zhàn)水平很高,然而并不是本好書。一是多線程的控制,二是并發(fā)同步的管理。最后,使用和來關(guān)閉線程池,停止其中的線程。當線程調(diào)用或等阻塞時,對這個線程調(diào)用會使線程醒來,并受到,且線程的中斷標記被設(shè)置。 《Java并發(fā)編程實戰(zhàn)》水平很高,然而并不是本好書。組織混亂、長篇大論、難以消化,中文翻譯也較死板。這里是一篇批評此書的帖子,很是貼切。俗話說:看到有這么多人罵你,我就放心了。 然而知...

    cnsworder 評論0 收藏0
  • 后臺開發(fā)常問面試題集錦(問題搬運工,附鏈接)

    摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識點總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機制解讀抽象類與三大特征時間和時間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對象鎖和類鎖的區(qū)別,,優(yōu)缺點及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...

    spacewander 評論0 收藏0
  • 后臺開發(fā)常問面試題集錦(問題搬運工,附鏈接)

    摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識點總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機制解讀抽象類與三大特征時間和時間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對象鎖和類鎖的區(qū)別,,優(yōu)缺點及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...

    xfee 評論0 收藏0
  • 后臺開發(fā)常問面試題集錦(問題搬運工,附鏈接)

    摘要:基礎(chǔ)問題的的性能及原理之區(qū)別詳解備忘筆記深入理解流水線抽象關(guān)鍵字修飾符知識點總結(jié)必看篇中的關(guān)鍵字解析回調(diào)機制解讀抽象類與三大特征時間和時間戳的相互轉(zhuǎn)換為什么要使用內(nèi)部類對象鎖和類鎖的區(qū)別,,優(yōu)缺點及比較提高篇八詳解內(nèi)部類單例模式和 Java基礎(chǔ)問題 String的+的性能及原理 java之yield(),sleep(),wait()區(qū)別詳解-備忘筆記 深入理解Java Stream流水...

    makeFoxPlay 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<