国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

追蹤解析 ThreadPoolExecutor 源碼

gaomysion / 2519人閱讀

摘要:的前位數(shù)用來表示線程的數(shù)量,后面三位用來表示線程池的狀態(tài)。線程池的狀態(tài)有五種,分別是,根據(jù)單詞就能猜出大概。并且為了考慮性能問題,線程池的設(shè)計沒有使用悲觀鎖關(guān)鍵字,而是大量使用了和機(jī)制。

零 前期準(zhǔn)備 0 FBI WARNING

文章異常啰嗦且繞彎。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ThreadPoolExecutor 簡介

ThreadPoolExecutor 是 jdk4 中加入的工具,被封裝在 jdk 自帶的 Executors 框架中,是 java 中最經(jīng)典的線程池技術(shù)。

ThreadPoolExecutor 類在 concurrent 包下,和其它線程工具類一樣都由 Doug Lea 大神操刀完成。

[ 在看完 Spring ioc 和 Gson 之后有點乏了,換換口味看一些 jdk 的源碼 ]

3 Demo
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

    public static void main(String[] args){
        //創(chuàng)建線程池
        //這里使用固定線程數(shù)的線程池,線程數(shù)為 5
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for(int i = 0 ; i < 100 ; i ++){
            final int ii = i;
            //創(chuàng)建 Runnable 作為線程池的任務(wù)
            Runnable r = () -> System.out.println(ii);
            //執(zhí)行
            executorService.execute(r);
        }
    }
}
一 線程池的初始化

線程池的初始化調(diào)用的 Executors 框架的靜態(tài)方法:

//Executors.class
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue());
}

繼續(xù)追蹤這個構(gòu)造方法:

//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

繼續(xù)追蹤:

//ThreadPoolExecutor.class
public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) {

    //驗證參數(shù)的有效性                        
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();

    //本例中不涉及權(quán)限
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();

    //線程數(shù)
    this.corePoolSize = corePoolSize;
    //最大線程數(shù)
    //本例中使用固定線程數(shù)的線程池,所以線程數(shù)和最大線程數(shù)相等
    this.maximumPoolSize = maximumPoolSize;
    //用于存儲任務(wù)的隊列
    //此處使用 LinkedBlockingQueue 來儲存任務(wù),其線程安全
    this.workQueue = workQueue;
    //keepAliveTime 參數(shù)用于表示:
    //對于超出線程和隊列緩存總和的任務(wù),是否要臨時增加線程來處理
    //超出的線程的存在時間是多少
    //這里使用的是定長線程池,所以 keepAliveTime = 0,即不增加線程
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    //用于創(chuàng)建線程的工廠類
    this.threadFactory = threadFactory;
    //handler 用來處理 task 太多時候的拒絕策略
    //此例中使用的是默認(rèn)的,即定義在 ThreadPoolExecutor 中的 defaultHandler 對象
    this.handler = handler;
}
二 Worker

Worker 是 ThreadPoolExecutor 的內(nèi)部類,可以看做是 Runnable 的代理類:

//ThreadPoolExecutor.class
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    
    private static final long serialVersionUID = 6138294804551838833L;
    final Thread thread;
    Runnable firstTask;
    //完成 task 數(shù)量的計數(shù)器
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        //這個方法是 AbstractQueuedSynchronizer 中的方法,功能相當(dāng)于加鎖
        //-1 的意思是后續(xù)的任務(wù)會處于阻塞狀態(tài),即為已經(jīng)加鎖
        setState(-1);
        //在創(chuàng)建的時候存入一個要處理的 task
        //需要注意的是每個 worker 對象被創(chuàng)建出來之后是可以重復(fù)利用來處理多個 task 的
        this.firstTask = firstTask;
        //worker 會用自身作為 Runnable 對象去創(chuàng)建一個線程
        //這里調(diào)用線程工廠進(jìn)行線程創(chuàng)建
        this.thread = getThreadFactory().newThread(this);
    }

    //對于線程變量來說,其啟動的就是 worker 的 run() 方法
    public void run() {
        //runWorker(...) 方法在 ThreadPoolExecutor 里
        runWorker(this);
    }

    //獲取鎖的狀態(tài)
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //重寫了 AbstractQueuedSynchronizer 中的 tryAcquire(...) 方法
    //嘗試加鎖
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //重寫了 AbstractQueuedSynchronizer 中的 tryRelease(...) 方法
    //嘗試釋放鎖
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    //真正的加鎖方法
    public void lock() { 
        acquire(1); 
    }
    //嘗試加鎖
    public boolean tryLock() { 
        return tryAcquire(1); 
    }
    //真正的釋放鎖方法
    public void unlock() { 
        release(1); 
    }
    //判斷是否在鎖中
    public boolean isLocked() { 
        return isHeldExclusively(); 
    }
    //中斷線程
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

追蹤一下 runWorker(...) 方法:

//ThreadPoolExecutor.class
final void runWorker(Worker w) {
    //獲取當(dāng)前所在的線程的實例對象
    Thread wt = Thread.currentThread();
    //獲取 task
    Runnable task = w.firstTask;
    //取出來之后把 task 置空
    w.firstTask = null;
    //此處釋放鎖
    w.unlock();
    //指示器,此變量為 true 的時候確認(rèn)該方法已經(jīng)執(zhí)行完畢
    boolean completedAbruptly = true;
    try {
        //此處為一個 while 循環(huán),用于不斷的執(zhí)行 task
        //getTask() 方法會從隊列里不斷抓取 task 并進(jìn)行執(zhí)行
        //當(dāng) task 為 null,且隊列里已經(jīng)沒有更多 task 的時候,就會終止循環(huán)
        while (task != null || (task = getTask()) != null) {
            //加鎖,獨占線程
            w.lock();
            //在這里會判斷線程的狀態(tài),如果存在符合中斷的情況,就會直接中斷掉
            if ((runStateAtLeast(ctl.get(), STOP) 
                    || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();

            try {
                //beforeExecute(...) 和 afterExecute(...) 方法在 ThreadPoolExecutor 中并沒有實現(xiàn)
                //是預(yù)留出來給使用者重寫,以達(dá)到業(yè)務(wù)需求的方法
                beforeExecute(wt, task);
                try {
                    //此處執(zhí)行 task
                    task.run();
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                //將執(zhí)行的 task 置空
                task = null;
                //每完成一個 task 就會加 1
                w.completedTasks++;
                //釋放鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //這個方法會銷毀掉 worker
        //同時如果檢測到有新的 task 又會重新創(chuàng)建 Worker
        processWorkerExit(w, completedAbruptly);
    }
}

Worker 是線程池中真正起完成業(yè)務(wù)邏輯的組件,是任務(wù)和線程的封裝。

三 線程池的狀態(tài)控制

線程池的狀態(tài)主要由 ctl 變量來進(jìn)行控制:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl 是一個 AtomicInteger 類型的變量,其實可以簡單理解為一個 int 值,AtomicInteger 只是能夠適應(yīng)高并發(fā)的原子化操作的需要。

ctl 的前 29 位數(shù)用來表示線程(Worker)的數(shù)量,后面三位用來表示線程池的狀態(tài)。

線程池的狀態(tài)有五種,分別是 Running、Shutdown、Stop、Tidying、Terminate,根據(jù)單詞就能猜出大概。

注意的是,這五種狀態(tài)在線程池中都以 int 變量的形式存在,從前到后依次變大,對狀態(tài)的比較有一系列方法:

//ThreadPoolExecutor.class
private static boolean runStateLessThan(int c, int s) {
    //c 的狀態(tài)值要小于 s
    return c < s;
}
//ThreadPoolExecutor.class
private static boolean runStateAtLeast(int c, int s) {
    //c 的狀態(tài)值要大于或等于 s
    return c >= s;
}
//ThreadPoolExecutor.class
private static boolean isRunning(int c) {
    //狀態(tài)里只有 RUNNING 是小于 SHUTDOWN 的
    return c < SHUTDOWN;
}

在這些方法里,傳入的參數(shù) c 一般指的是當(dāng)前線程池狀態(tài),s 是用來對比的參照狀態(tài)。

四 線程池的執(zhí)行

該 part 的起點:

executorService.execute(r);

來追蹤 execute(...) 方法:

public void execute(Runnable command) {
    //有效性驗證
    if (command == null)
        throw new NullPointerException();
    
    //ctl 是一個 AtomicInteger 類型的變量,用來記錄線程池的狀態(tài)
    int c = ctl.get();
    
    //workerCountOf(...) 方法會返回當(dāng)前運行的 Worker 的數(shù)量
    if (workerCountOf(c) < corePoolSize) {
        //Worker 的數(shù)量小于線程池容量的情況下
        //直接增加 Worker 并取出 task 去運行
        if (addWorker(command, true))
            return;
        //如果 Worker 已經(jīng)順利執(zhí)行了 task,應(yīng)該會直接返回掉
        //如果執(zhí)行中出現(xiàn)了其它情況,則會繼續(xù)往下走
        //此處刷新狀態(tài)
        c = ctl.get();
    }
    //當(dāng) Worker 數(shù)量已經(jīng)達(dá)到線程池的指定數(shù)量,或者添加 Worker 的時候出問題的時候,會進(jìn)入此判斷語句
    //先判斷線程池是否處于活躍狀態(tài),且 task 是否已經(jīng)被成功添加到隊列中
    //如果不滿足,會進(jìn)入 else 語句中,先最后嘗試一次 addWorker(...) 方法,如果不成功就拒絕 task
    //reject(...) 方法會調(diào)用 handler 的拒絕策略
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }else if (!addWorker(command, false))
        reject(command);
}
1 reject

這里先提及一下 reject(...) 方法:

//ThreadPoolExecutor.class
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

本質(zhì)是調(diào)用了 handler 對象的相關(guān)方法。在本例中,handler 對象指向了 defaultHandler:

//ThreadPoolExecutor.class
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

defaultHandler 是一個 AbortPolicy 類型的對象,而 AbortPolicy 是 ThreadPoolExecutor 的靜態(tài)內(nèi)部類。

AbortPolicy 起作用的方法為 rejectedExecution(...) 方法:

//AbortPolicy.class
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                        " rejected from " + e.toString());
}

也就是說,在 task 過多的情況下,AbortPolicy 的應(yīng)對策略是拋出異常。

2 addWorker

來看一下核心方法 addWorker(...):

//ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
    //先標(biāo)記這個 for 循環(huán),方便退出循環(huán)
    retry:
    //在每一次循環(huán)開始之前會刷新一次狀態(tài)標(biāo)識
    for (int c = ctl.get();;) {
        //這里先進(jìn)行判斷,如果線程池已經(jīng)關(guān)閉了,或者沒有 task 了,就會返回 false
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            //如果 Worker 數(shù)量已經(jīng)超出了最大值就會直接返回 false
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            //將 ctl 變量的值加 1,如果成功了就會跳出循環(huán)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //在狀態(tài)值比 SHUTDOWN 大的時候會直接跳到最外頭的循環(huán)里
            //需要注意的是最外面的 for 循環(huán)會判斷狀態(tài)值是否大于 SHUTDOWN
            //如果大于 SHUTDOWN 的話就返回 false 了
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //創(chuàng)建一個 Worker
        w = new Worker(firstTask);
        //獲取線程對象
        final Thread t = w.thread;
        if (t != null) {
            //加鎖,此處加的是一把全局的鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                //如果狀態(tài)值 c 是 RUNNING,或者 [c 是 RUNNING 或者 SHUTDOWN 且 firstTask 是 null] 就會進(jìn)入這個判斷語句
                //
                if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                    //如果這個線程已經(jīng)處于運作狀態(tài),會拋出異常
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    //workers 是一個列表,用于存儲 Worker 對象
                    workers.add(w);
                    //獲取 Worker 的數(shù)量
                    int s = workers.size();
                    //largestPoolSize 用來記錄線程池達(dá)到過的最大線程數(shù)
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //標(biāo)記 Worker 已經(jīng)被添加
                    workerAdded = true;
                }
            } finally {
                //釋放鎖
                mainLock.unlock();
            }
            //先判斷 Worker 是否已經(jīng)被添加到 workers 內(nèi)了
            if (workerAdded) {
                //這是該方法核心的啟動線程方法
                t.start();
                //標(biāo)記 Worker 已經(jīng)開始運行了
                workerStarted = true;
            }
        }
    } finally {
        //如果沒有標(biāo)記 Worker 已經(jīng)開始工作,會在這里銷毀掉 Worker
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
五 一點嘮叨

先總結(jié)一下線程池的業(yè)務(wù)邏輯:

1 接收到 task (即實現(xiàn)了 Runnable 接口的實例對象) [execute(...) 方法]

2 用 task 去嘗試創(chuàng)建一個 Worker 實例 [execute(...) 方法]
    2.1 如果 Worker 數(shù)量沒有達(dá)到線程池的指定最大值 -> 新建
    2.2 如果 Worker 數(shù)量達(dá)到了線程池的指定最大值 -> 不會再創(chuàng)建,而是把 task 儲存起來等待空閑的 Worker 去提取
    2.3 如果 task 隊列也已經(jīng)滿了,無法再添加 -> 觸發(fā)拒絕機(jī)制(handler)

3 Worker 在執(zhí)行的時候調(diào)用其內(nèi)部的 Thread 實例對象的 start() 方法 [addWorker(...) 方法]

4 該 start() 方法會調(diào)用到 Worker 的 run() 方法 [Worker.class 內(nèi)的 run() 方法]

5 Worker 的 run() 方法本質(zhì)上是封裝了 task 的 run() 方法 [runWorker(...) 方法]

主線業(yè)務(wù)邏輯不算復(fù)雜,比較艱難的是為了保證數(shù)據(jù)的一致性,線程池代碼中充斥著大量的狀態(tài)判斷和鎖機(jī)制。

并且為了考慮性能問題,線程池的設(shè)計沒有使用悲觀鎖(synchronized 關(guān)鍵字),而是大量使用了 ASQ 和 ReetrentLock 機(jī)制。

本文僅為個人的學(xué)習(xí)筆記,可能存在錯誤或者表述不清的地方,有緣補(bǔ)充

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/73156.html

相關(guān)文章

  • 追蹤解析 FutureTask 源碼

    摘要:零前期準(zhǔn)備文章異常啰嗦且繞彎。版本版本簡介是中默認(rèn)的實現(xiàn)類,常與結(jié)合進(jìn)行多線程并發(fā)操作。所以方法的主體其實就是去喚醒被阻塞的線程。本文僅為個人的學(xué)習(xí)筆記,可能存在錯誤或者表述不清的地方,有緣補(bǔ)充 零 前期準(zhǔn)備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡介 ...

    xcc3641 評論0 收藏0
  • 線程池運行模型源碼解析

    摘要:那么線程池到底是怎么利用類來實現(xiàn)持續(xù)不斷地接收提交的任務(wù)并執(zhí)行的呢接下來,我們通過的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神秘面紗。 在上一篇文章《從0到1玩轉(zhuǎn)線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務(wù)的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會從頭閱讀線程池ThreadPoolExecutor類的源代...

    MockingBird 評論0 收藏0
  • java線程池——ThreadPoolExecutor源碼解析

    摘要:將線程池狀態(tài)置為并不會立即停止,停止接收外部的任務(wù),內(nèi)部正在跑的任務(wù)和隊列里等待的任務(wù),會執(zhí)行完,才真正停止。將線程池狀態(tài)置為。 在Java中,我們經(jīng)常使用的線程池就是ThreadPoolExecutor,此外還有定時的線程池ScheduledExecutorService(),但是需要注意的是Executors.newCachedThreadPool()的線程是沒有上屆的,在使用時,...

    TerryCai 評論0 收藏0
  • 源碼解析Executors.newFixedThreadPool(int)

    摘要:創(chuàng)建一個線程池,具有固定線程數(shù),運行在共享的無界隊列中。固定線程數(shù)源碼如下是的實現(xiàn)類。線程池中允許最大的線程數(shù)。如果線程數(shù)超過了核心線程數(shù),過量的線程在關(guān)閉前等待新任務(wù)的最大時間。處理因為線程邊界和隊列容量導(dǎo)致的堵塞。 1.Executors.newFixedThreadPool(int nThreads):創(chuàng)建一個線程池,具有固定線程數(shù),運行在共享的無界隊列中。在大多數(shù)時候,線程會主...

    source 評論0 收藏0
  • 后端ing

    摘要:當(dāng)活動線程核心線程非核心線程達(dá)到這個數(shù)值后,后續(xù)任務(wù)將會根據(jù)來進(jìn)行拒絕策略處理。線程池工作原則當(dāng)線程池中線程數(shù)量小于則創(chuàng)建線程,并處理請求。當(dāng)線程池中的數(shù)量等于最大線程數(shù)時默默丟棄不能執(zhí)行的新加任務(wù),不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現(xiàn)原理 在這篇文章中,作者有條不紊的將 ja...

    roadtogeek 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<