国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

線程池源碼分析——ThreadPoolExecutor

xiguadada / 516人閱讀

摘要:提高線程的可管理性線程池可以統一管理分配調優和監控。線程池的初始化狀態是。調用線程池的接口時,線程池由。當所有的任務已終止,記錄的任務數量為,阻塞隊列為空,線程池會變為狀態。線程池徹底終止,就變成狀態。

序言

我們知道,線程池幫我們重復管理線程,避免創建大量的線程增加開銷。
合理的使用線程池能夠帶來3個很明顯的好處:
1.降低資源消耗:通過重用已經創建的線程來降低線程創建和銷毀的消耗
2.提高響應速度:任務到達時不需要等待線程創建就可以立即執行。
3.提高線程的可管理性:線程池可以統一管理、分配、調優和監控。
java源生的線程池,實現于ThreadPoolExecutor類,這也是我們今天討論的重點

ThreadPoolExecutor類構造方法

Jdk使用ThreadPoolExecutor類來創建線程池,我們來看看它的構造方法。

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

int corePoolSize, //核心線程的數量

int maximumPoolSize, //最大線程數量

long keepAliveTime, //超出核心線程數量以外的線程空閑時,線程存活的時間

TimeUnit unit, //存活時間的單位,有如下幾種選擇

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

BlockingQueue workQueue, //保存待執行任務的隊列,常見的也有如下幾種:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
PriorityBlockingQueue

ThreadFactory threadFactory, //創建新線程使用的工廠

RejectedExecutionHandler handler // 當任務無法執行時的處理器(線程拒絕策略)

核心類變量 ctl變量

ThreadPoolExecutor中有一個控制狀態的屬性叫ctl,它是一個AtomicInteger類型的變量,它一個int值可以儲存兩個概念的信息:

workerCount:表明當前池中有效的線程數,通過workerCountOf方法獲得,workerCount上限是(2^29)-1。(最后存放在ctl的低29bit)

runState:表明當前線程池的狀態,通過workerCountOf方法獲得,最后存放在ctl的高3bit中,他們是整個線程池的運行生命周期,有如下取值,分別的含義是:

RUNNING:可以新加線程,同時可以處理queue中的線程。線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態,

SHUTDOWN:不增加新線程,但是處理queue中的線程。調用線程池的shutdown()方法時,線程池由RUNNING -> SHUTDOWN。

STOP 不增加新線程,同時不處理queue中的線程。調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING 當所有的任務已終止,ctl記錄的”任務數量”為0,阻塞隊列為空,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。

TERMINATED 線程池徹底終止,就變成TERMINATED狀態。線程池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

COUNT_BITS=32(integer的size)-3=29,于是五種狀態左移29位分別是:

RUNNING: 11100000000000000000000000000000

SHUTDOWN: 00000000000000000000000000000000

STOP: 00100000000000000000000000000000

TIDYING: 01000000000000000000000000000000

TERMINATED:01100000000000000000000000000000

而ThreadPoolExecutor是通過runStateOf和workerCountOf獲得者兩個概念的值的。

runStateOf和workerCountOf方法是如何剝離出ctl變量的兩個有效值呢?這其中我們可以看到CAPACITY是實現一個字段存兩個值的最重要的字段。

CAPACITY變量

CAPACITY=(1 << COUNT_BITS) – 1 轉成二進制為:000 11111111111111111111111111111,他是線程池理論上可以允許的最大的線程數。
所以很明顯,它的重點在于,其高3bit為0,低29bit為1;
這樣,workderCountOf方法中,CAPACITY和ctl進行&運算時,它能獲得高3位都是0,低29位和ctl低29位相同的值,這個值就是workerCount
同理,runStateOf方法,CAPACITY的取反和ctl進行&操作,獲得高3位和ctl高三位相等,低29位都為0的值,這個值就是runState

workQueue
/**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue workQueue;

一個BlockingQueue隊列,本身的結構可以保證訪問的線程安全(這里不展開了)。這是一個排隊等待隊列。當我們線程池里線程達到corePoolSize的時候,一些需要等待執行的線程就放在這個隊列里等待。

workers
/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet workers = new HashSet();

一個HashSet的集合。線程池里所有可以立即執行的線程都放在這個集合里。這也是我們直觀理解的線程的池子

mainLock
    private final ReentrantLock mainLock = new ReentrantLock();

mainLock是線程池的主鎖,是可重入鎖,當要操作workers set這個保持線程的HashSet時,需要先獲取mainLock,還有當要處理largestPoolSize、completedTaskCount這類統計數據時需要先獲取mainLock

其他重要屬性
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
 
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心線程設置存活時間
核心內部類 Worker

Worker類是線程池中具化一個線程的對象,是線程池的核心,我們來看看源碼:

/**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //設置AQS的同步狀態private volatile int state,是一個計數器,大于0代表鎖已經被獲取
            // 在調用runWorker()前,禁止interrupt中斷,在interruptIfStarted()方法中會判斷 getState()>=0
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//根據當前worker創建一個線程對象
            //當前worker本身就是一個runnable任務,也就是不會用參數的firstTask創建線程,而是調用當前worker.run()時調用firstTask.run()
            //后面在addworker中,我們會啟動worker對象中組合的Thread,而我們的執行邏輯runWorker方法是在worker的run方法中被調用。
            //為什么執行thread的run方法會調用worker的run方法呢,原因就是在這里進行了注入,將worker本身this注入到了thread中
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }//runWorker()是ThreadPoolExecutor的方法

        // Lock methods
        //
        // The value 0 represents the unlocked state. 0代表“沒被鎖定”狀態
        // The value 1 represents the locked state. 1代表“鎖定”狀態
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        /**
         * 嘗試獲取鎖
         * 重寫AQS的tryAcquire(),AQS本來就是讓子類來實現的
         */
        protected boolean tryAcquire(int unused) {
            //嘗試一次將state從0設置為1,即“鎖定”狀態,但由于每次都是state 0->1,而不是+1,那么說明不可重入
            //且state==-1時也不會獲取到鎖
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        /**
         * 嘗試釋放鎖
         * 不是state-1,而是置為0
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        /**
         * 中斷(如果運行)
         * shutdownNow時會循環對worker線程執行
         * 且不需要獲取worker鎖,即使在worker運行時也可以中斷
         */
        void interruptIfStarted() {
            Thread t;
            //如果state>=0、t!=null、且t沒有被中斷
            //new Worker()時state==-1,說明不能中斷
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

我們看worker類時,會發現最重要的幾個部分在于它里面定義了一個Thread thread和Runnable firstTask。看到這里,我們可能會比較奇怪,我們只是要一個可以執行的線程,這里放一個Thread和一個Runnable的變量做什么呢?
其實之所以Worker自己實現Runnable,并創建Thread,在firstTask外包一層,是因為要通過Worker負責控制中斷,而firstTask這個工作任務只是負責執行業務,worker的run方法調用了runWorker方法,在這里面,worker里的firstTask的run方法被執行。稍后我們會聚焦這個執行任務的runWorker方法。

核心方法

好了,基本上我們將線程池的幾個主角,ctl,workQueue,workers,Worker簡單介紹了一遍,現在,我們來看看線程池是怎么玩的。

線程的運行 execute方法

這是線程池實現類外露供給外部實現提交線程任務command的核心方法,對于無需了解線程池內部的使用者來說,這個方法就是把某個任務交給線程池,正常情況下,這個任務會在未來某個時刻被執行,實現和注釋如下:

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * * 在未來的某個時刻執行給定的任務。這個任務用一個新線程執行,或者用一個線程池中已經存在的線程執行
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 如果任務無法被提交執行,要么是因為這個Executor已經被shutdown關閉,要么是已經達到其容量上限,任務會被當前的RejectedExecutionHandler處理
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn"t, by returning false.
         * 如果運行的線程少于corePoolSize,嘗試開啟一個新線程去運行command,command作為這個線程的第一個任務
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *  如果任務成功放入隊列,我們仍需要一個雙重校驗去確認是否應該新建一個線程(因為可能存在有些線程在我們上次檢查后死了)
         *  或者 從我們進入這個方法后,pool被關閉了
         *  所以我們需要再次檢查state,如果線程池停止了需要回滾入隊列,如果池中沒有線程了,新開啟 一個線程
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 如果無法將任務入隊列(可能隊列滿了),需要新開區一個線程(自己:往maxPoolSize發展)
        * 如果失敗了,說明線程池shutdown 或者 飽和了,所以我們拒絕任務
         */
        int c = ctl.get();
        // 1、如果當前線程數少于corePoolSize(可能是由于addWorker()操作已經包含對線程池狀態的判斷,如此處沒加,而入workQueue前加了)
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;

            /**
             * 沒有成功addWorker(),再次獲取c(凡是需要再次用ctl做判斷時,都會再次調用ctl.get())
             * 失敗的原因可能是:
             * 1、線程池已經shutdown,shutdown的線程池不再接收新任務
             * 2、workerCountOf(c) < corePoolSize 判斷后,由于并發,別的線程先創建了worker線程,導致workerCount>=corePoolSize
             */
            c = ctl.get();
        }
        /**
         * 2、如果線程池RUNNING狀態,且入隊列成功
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();

            /**
             * 再次校驗放入workerQueue中的任務是否能被執行
             * 1、如果線程池不是運行狀態了,應該拒絕添加新任務,從workQueue中刪除任務
             * 2、如果線程池是運行狀態,或者從workQueue中刪除任務失敗(剛好有一個線程執行完畢,并消耗了這個任務),
             * 確保還有線程執行任務(只要有一個就夠了)
             */
            //如果再次校驗過程中,線程池不是RUNNING狀態,并且remove(command)--workQueue.remove()成功,拒絕當前command
            if (! isRunning(recheck) && remove(command))
                reject(command);

            //如果當前worker數量為0,通過addWorker(null, false)創建一個線程,其任務為null
            //為什么只檢查運行的worker數量是不是0呢?? 為什么不和corePoolSize比較呢??
            //只保證有一個worker線程可以從queue中獲取任務執行就行了??
            //因為只要還有活動的worker線程,就可以消費workerQueue中的任務
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//第一個參數為null,說明只為新建一個worker線程,沒有指定firstTask
                                       ////第二個參數為true代表占用corePoolSize,false占用maxPoolSize
        }
        /**
         * 3、如果線程池不是running狀態 或者 無法入隊列
         *   嘗試開啟新線程,擴容至maxPoolSize,如果addWork(command, false)失敗了,拒絕當前command
         */
        else if (!addWorker(command, false))
            reject(command);
    }

我們可以簡單歸納如下(注:圖來源見水印,謝謝大神的歸納):

addWorker

在execute方法中,我們看到核心的邏輯是由addWorker方法來實現的,當我們將一個任務提交給線程池,線程池會如何處理,就是主要由這個方法加以規范:

該方法有兩個參數:

firstTask: worker線程的初始任務,可以為空

core: true:將corePoolSize作為上限,false:將maximumPoolSize作為上限

排列組合,addWorker方法有4種傳參的方式:

1、addWorker(command, true)
2、addWorker(command, false)
3、addWorker(null, false)
4、addWorker(null, true)

在execute方法中就使用了前3種,結合這個核心方法進行以下分析

第一個:線程數小于corePoolSize時,放一個需要處理的task進Workers Set。如果Workers Set長度超過corePoolSize,就返回false
第二個:當隊列被放滿時,就嘗試將這個新來的task直接放入Workers Set,而此時Workers Set的長度限制是maximumPoolSize。如果線程池也滿了的話就返回false
第三個:放入一個空的task進workers Set,長度限制是maximumPoolSize。這樣一個task為空的worker在線程執行的時候會去任務隊列里拿任務,這樣就相當于創建了一個新的線程,只是沒有馬上分配任務
第四個:這個方法就是放一個null的task進Workers Set,而且是在小于corePoolSize時,如果此時Set中的數量已經達到corePoolSize那就返回false,什么也不干。實際使用中是在prestartAllCoreThreads()方法,這個方法用來為線程池預先啟動corePoolSize個worker等待從workQueue中獲取任務執行
    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * 檢查根據當前線程池的狀態和給定的邊界(core or maximum)是否可以創建一個新的worker
     * 如果是這樣的話,worker的數量做相應的調整,如果可能的話,創建一個新的worker并啟動,參數中的firstTask作為worker的第一個任務
     * 如果方法返回false,可能因為pool已經關閉或者調用過了shutdown
     * 如果線程工廠創建線程失敗,也會失敗,返回false
     * 如果線程創建失敗,要么是因為線程工廠返回null,要么是發生了OutOfMemoryError
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        //外層循環,負責判斷線程池狀態
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 線程池的state越小越是運行狀態,runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
             * 要想這個if為true,線程池state必須已經至少是shutdown狀態了
             * 這時候以下3個條件任意一個是false都會進入if語句,即無法addWorker():
             *   1,rs == SHUTDOWN         (隱含:rs>=SHUTDOWN)false情況: 線程池狀態已經超過shutdown,
             *                               可能是stop、tidying、terminated其中一個,即線程池已經終止
             *  2,firstTask == null      (隱含:rs==SHUTDOWN)false情況: firstTask不為空,rs==SHUTDOWN 且 firstTask不為空,
             *                               return false,場景是在線程池已經shutdown后,還要添加新的任務,拒絕
             *  3,! workQueue.isEmpty()  (隱含:rs==SHUTDOWN,firstTask==null)false情況: workQueue為空,
             *                               當firstTask為空時是為了創建一個沒有任務的線程,再從workQueue中獲取任務,
             *                               如果workQueue已經為空,那么就沒有添加新worker線程的必要了
             * return false,
             */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //內層循環,負責worker數量+1
            for (;;) {
                int wc = workerCountOf(c);
                //入參core在這里起作用,表示加入的worker是加入corePool還是非corepool,換句話說,受到哪個size的約束
                //如果worker數量>線程池最大上限CAPACITY(即使用int低29位可以容納的最大值)
                //或者( worker數量>corePoolSize 或  worker數量>maximumPoolSize ),即已經超過了給定的邊界,不添加worker
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS嘗試增加線程數,,如果成功加了wc,那么break跳出檢查
                //如果失敗,證明有競爭,那么重新到retry。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果不成功,重新獲取狀態繼續檢查
                c = ctl.get();  // Re-read ctl
                //如果狀態不等于之前獲取的state,跳出內層循環,繼續去外層循環判斷
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                // else CAS失敗時因為workerCount改變了,繼續內層循環嘗試CAS對worker數量+1
            }
        }
         //worker數量+1成功的后續操作
         // 添加到workers Set集合,并啟動worker線程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建worker//構造方法做了三件事//1、設置worker這個AQS鎖的同步狀態state=-1
            w = new Worker(firstTask);  //2、將firstTask設置給worker的成員變量firstTask
                                        //3、使用worker自身這個runnable,調用ThreadFactory創建一個線程,并設置給worker的成員變量thread
            final Thread t = w.thread;
            if (t != null) {
                //獲取重入鎖,并且鎖上
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                     // rs!=SHUTDOWN ||firstTask!=null
                     // 如果線程池在運行running largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {//如果往HashSet中添加worker成功,啟動線程
                    //通過t.start()方法正式執行線程。在這里一個線程才算是真正的執行起來了。
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            //如果啟動線程失敗
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

同樣的,我們可以歸納一下:

runWorker方法

在addWorker方法中,我們將一個新增進去的worker所組合的線程屬性thread啟動了,但我們知道,在worker的構造方法中,它將自己本身注入到了thread的target屬性里,所以繞了一圈,線程啟動后,調用的還是worker的run方法,而在這里面,runWorker定義了線程執行的邏輯:

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don"t need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 我們可能使用一個初始化任務開始,即firstTask為null
     * 然后只要線程池在運行,我們就從getTask()獲取任務
     * 如果getTask()返回null,則worker由于改變了線程池狀態或參數配置而退出
     * 其它退出因為外部代碼拋異常了,這會使得completedAbruptly為true,這會導致在processWorkerExit()方法中替換當前線程
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     * 在任何任務執行之前,都需要對worker加鎖去防止在任務運行時,其它的線程池中斷操作
     * clearInterruptsForTaskRun保證除非線程池正在stoping,線程不會被設置中斷標示
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 每個任務執行前會調用beforeExecute(),其中可能拋出一個異常,這種情況下會導致線程die(跳出循環,且completedAbruptly==true),沒有執行任務
     * 因為beforeExecute()的異常沒有cache住,會上拋,跳出循環
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread"s
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread"s UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        //標識線程是不是異常終止的
        boolean completedAbruptly = true;
        try {
            //task不為null情況是初始化worker時,如果task為null,則去隊列中取線程--->getTask()
            //可以看到,只要getTask方法被調用且返回null,那么worker必定被銷毀,而確定一個線程是否應該被銷毀的邏輯,在getTask方法中
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //線程開始執行之前執行此方法,可以實現Worker未執行退出,本類中未實現
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//runWorker方法最本質的存在意義,就是調用task的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //線程執行后執行,可以實現標識Worker異常中斷的功能,本類中未實現
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;//運行過的task標null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //標識線程不是異常終止的,是因為不滿足while條件,被迫銷毀的
            completedAbruptly = false;
        } finally {
            //處理worker退出的邏輯
            processWorkerExit(w, completedAbruptly);
        }
    }

我們歸納:

getTask方法

runWorker方法中的getTask()方法是線程處理完一個任務后,從隊列中獲取新任務的實現,也是處理判斷一個線程是否應該被銷毀的邏輯所在:

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:  以下情況會返回null
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     *    超過了maximumPoolSize設置的線程數量(因為調用了setMaximumPoolSize())
     * 2. The pool is stopped.
     *    線程池被stop
     * 3. The pool is shutdown and the queue is empty.
     *    線程池被shutdown,并且workQueue空了
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *    線程等待任務超時
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     *         返回null表示這個worker要結束了,這種情況下workerCount-1
     */
    private Runnable getTask() {
        // timedOut 主要是判斷后面的poll是否要超時
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 用于判斷線程池狀態
         */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 對線程池狀態的判斷,兩種情況會workerCount-1,并且返回null
             * 1,線程池狀態為shutdown,且workQueue為空(反映了shutdown狀態的線程池還是要執行workQueue中剩余的任務的)
             * 2,線程池狀態為>=stop(只有TIDYING和TERMINATED會大于stop)(shutdownNow()會導致變成STOP)(此時不用考慮workQueue的情況)
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循環的CAS減少worker數量,直到成功
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?

            //allowCoreThreadTimeOut字段,表示是否允許核心線程超過閑置時間后被摧毀,默認為false
            //我們前面說過,如果getTask方法返回null,那么這個worker只有被銷毀一途
            //于是這個timed有3種情況
            //(1)當線程數沒有超過核心線程數,且默認allowCoreThreadTimeOut為false時
            //          timed值為false。看下面if的判斷邏輯,除非目前線程數大于最大值,否則下面的if始終進不去,該方法不可能返回null,worker也就不會被銷毀。
            //          因為前提"線程數不超過核心線程數"與"線程數大于最大值"兩個命題互斥,所以(1)情況,邏輯進入下面的if(返回null的線程銷毀邏輯)的可能性不存在。
            //          也就是說,當線程數沒有超過核心線程數時,線程不會被銷毀。
            //(2)當當前線程數超過核心線程數,且默認allowCoreThreadTimeOut為false時//timed值為true。
            //(3)如果allowCoreThreadTimeOut為true,則timed始終為true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //wc > maximumPoolSize則必銷毀,因為這情況下,wc>1也肯定為true
            //wc <= maximumPoolSize,且(timed && timedOut) = true,這種情況下一般也意味著worker要被銷毀,因為超時一般是由阻塞隊列為空造成的,所以workQueue.isEmpty()也大概率為真,進入if邏輯。
            
            //一般情況是這樣,那不一般的情況呢?阻塞隊列沒有為空,但是因為一些原因,還是超時了,這時候取決于wc > 1,它為真就銷毀,為假就不銷毀。
            // 也就是說,如果阻塞隊列還有任務,但是wc=1,線程池里只剩下自己這個線程了,那么就不能銷毀,這個if不滿足,我們的代碼繼續往下走
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果timed為true那么使用poll取線程。否則使用take()
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    //workQueue.poll():如果在keepAliveTime時間內,阻塞隊列還是沒有任務,返回null
                    workQueue.take();
                    //workQueue.take():如果阻塞隊列為空,當前線程會被掛起等待;當隊列中有任務加入時,線程被喚醒,take方法返回任務
                //如果正常返回,那么返回取到的task。
                if (r != null)
                    return r;
                //否則,設為超時,重新執行循環,
                timedOut = true;
            } catch (InterruptedException retry) {
            //在阻塞從workQueue中獲取任務時,可以被interrupt()中斷,代碼中捕獲了InterruptedException,重置timedOut為初始值false,再次執行第1步中的判斷,滿足就繼續獲取任務,不滿足return null,會進入worker退出的流程
                timedOut = false;
            }
        }

歸納:

processWorkerExit方法

在runWorker方法中,我們看到當不滿足while條件后,線程池會執行退出線程的操作,這個操作,就封裝在processWorkerExit方法中。

/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //參數:
        //worker:                      要結束的worker
        //completedAbruptly: 是否突然完成(是否因為異常退出)
        
    /**
     * 1、worker數量-1
     * 如果是突然終止,說明是task執行時異常情況導致,即run()方法執行時發生了異常,那么正在工作的worker線程數量需要-1
     * 如果不是突然終止,說明是worker線程沒有task可執行了,不用-1,因為已經在getTask()方法中-1了
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted 代碼和注釋正好相反啊
        decrementWorkerCount();
 
    /**
     * 2、從Workers Set中移除worker
     */
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; //把worker的完成任務數加到線程池的完成任務數
        workers.remove(w); //從HashSet中移除
    } finally {
        mainLock.unlock();
    }
 
    /**
     * 3、在對線程池有負效益的操作時,都需要“嘗試終止”線程池
     * 主要是判斷線程池是否滿足終止的狀態
     * 如果狀態滿足,但線程池還有線程,嘗試對其發出中斷響應,使其能進入退出流程
     * 沒有線程了,更新狀態為tidying->terminated
     */
    tryTerminate();
 
    /**
     * 4、是否需要增加worker線程
     * 線程池狀態是running 或 shutdown
     * 如果當前線程是突然終止的,addWorker()
     * 如果當前線程不是突然終止的,但當前線程數量 < 要維護的線程數量,addWorker()
     * 故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然后再逐漸銷毀這corePoolSize個線程
     */
    int c = ctl.get();
    //如果狀態是running、shutdown,即tryTerminate()沒有成功終止線程池,嘗試再添加一個worker
    if (runStateLessThan(c, STOP)) {
        //不是突然完成的,即沒有task任務可以獲取而完成的,計算min,并根據當前worker數量判斷是否需要addWorker()
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默認為false,即min默認為corePoolSize
             
            //如果min為0,即不需要維持核心線程數量,且workQueue不為空,至少保持一個線程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
             
            //如果線程數量大于最少數量,直接返回,否則下面至少要addWorker一個
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
         
        //添加一個沒有firstTask的worker
        //只要worker是completedAbruptly突然終止的,或者線程數量小于要維護的數量,就新添一個worker線程,即使是shutdown狀態
        addWorker(null, false);
    }
}

總而言之:如果線程池還沒有完全終止,就仍需要保持一定數量的線程。

線程池狀態是running 或 shutdown的情況下:

A、如果當前線程是突然終止的,addWorker()
B、如果當前線程不是突然終止的,但當前線程數量 < 要維護的線程數量,addWorker()
故如果調用線程池shutdown(),直到workQueue為空前,線程池都會維持corePoolSize個線程,然后再逐漸銷毀這corePoolSize個線程
submit方法

前面我們講過execute方法,其作用是將一個任務提交給線程池,以期在未來的某個時間點被執行。
submit方法在作用上,和execute方法是一樣的,將某個任務提交給線程池,讓線程池調度線程去執行它。
那么它和execute方法有什么區別呢?我們來看看submit方法的源碼:
submit方法的實現在ThreadPoolExecutor的父類AbstractExecutorService類中,有三種重載方法:

    /**
     * 提交一個 Runnable 任務用于執行,并返回一個表示該任務的 Future。該Future的get方法在成功完成時將會返回null。
     * submit 參數: task - 要提交的任務 返回:表示任務等待完成的 Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * 提交一個Runnable 任務用于執行,并返回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結果。
     * submit 參數: task - 要提交的任務 result - 完成任務時要求返回的結果 
     * 返回: 表示任務等待完成的 Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public  Future submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

    /**
     * 提交一個Callable的任務用于執行,返回一個表示任務的未決結果的 Future。該 Future 的 get 
方法在成功完成時將會返回該任務的結果。 
     * 如果想立即阻塞任務的等待,則可以使用 result = 
exec.submit(aCallable).get(); 形式的構造。
     * 參數: task - 要提交的任務 返回: 表示任務等待完成的Future
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

源碼很簡單,submit方法,將任務task封裝成FutureTask(newTaskFor方法中就是new了一個FutureTask),然后調用execute。所以submit方法和execute的所有區別,都在這FutureTask所帶來的差異化實現上

總而言之,submit方法將一個任務task用future模式封裝成FutureTask對象,提交給線程執行,并將這個FutureTask對象返回,以供主線程該任務被線程池執行之后得到執行結果

注意,獲得執行結果的方法FutureTask.get(),會阻塞執行該方法的線程,尤其是當任務被DiscardPolicy策略和DiscardOldestPolicy拒絕的時候,get方法會一直阻塞在那里,所以我們最好使用自帶超時時間的future。

線程池的關閉 shutdown方法

講完了線程池的基本運轉過程,在方法章的最后,我們來看看負責線程池生命周期最后收尾工作的幾個重要方法,首先是shutdown方法。

/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * 

This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * 開始一個順序的shutdown操作,shutdown之前被執行的已提交任務,新的任務不會再被接收了。如果線程池已經被shutdown了,該方法的調用沒有其他任何效果了。 * **該方法不會等待之前已經提交的任務執行完畢**,awaitTermination方法才有這個效果。 * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //判斷是否可以操作關閉目標線程。 checkShutdownAccess(); //advanceRunState方法,參數:目標狀態;作用:一直執行,直到成功利用CAS將狀態置為目標值。 //設置線程池狀態為SHUTDOWN,此處之后,線程池中不會增加新Task advanceRunState(SHUTDOWN); //中斷所有的空閑線程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試進行terminate操作,但其實我們上面將狀態置為shutdown,就已經算是“中止”了一個線程池了,它不會再執行任務,于外部而言,已經失去了作用。而這里,也只是嘗試去將線程池的狀態一擼到底而已,并不是一定要terminate掉。該方法我們后面會說到。 tryTerminate(); }

我們可以看到,shutdown方法只不過是中斷喚醒了所有阻塞的線程,并且把線程池狀態置為shutdown,正如注釋所說的,它沒有等待所有正在執行任務的線程執行完任務,把狀態置為shutdown,已經足夠線程池喪失基本的功能了。

在該方法中,線程池如何中斷線程是我們最需要關心的,我們來看一下interruptIdleWorkers方法:

private void interruptIdleWorkers(boolean onlyOne) {//參數onlyOne表示是否值中斷一個線程就退出,在shutdown中該值為false。
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //遍歷workers 對所有worker做中斷處理。
            for (Worker w : workers) {
                Thread t = w.thread;
                // w.tryLock()對Worker獲取鎖,因為正在執行的worker已經加鎖了(見runWorker方法,w.lock()語句)
                //所以這保證了正在運行執行Task的Worker不會被中斷。只有阻塞在getTask方法的空閑線程才會進這個if判斷(被中斷),但中斷不代表線程立刻停止,它要繼續處理到阻塞隊列為空時才會被銷毀。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

我們可以看到,在中斷方法中,我們調用了worker的tryLock方法去嘗試獲取worker的鎖,所以我們說,worker類這一層的封裝,是用來控制線程中斷的,正在執行任務的線程已經上了鎖,無法被中斷,只有在獲取阻塞隊列中的任務的線程(我們稱為空閑線程)才會有被中斷的可能。
之前我們看過getTask方法,在這個方法中, worker是不加鎖的,所以可以被中斷。我們為什么說“中斷不代表線程立刻停止,它要繼續處理到阻塞隊列為空時才會被銷毀”呢?具體邏輯,我們再來看一下getTask的源碼,以及我們的注釋(我們模擬中斷發生時的場景):

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        /**
         * 當執行過程中拋出InterruptedException 的時候,該異常被catch住,邏輯重新回到這個for循環
         * catch塊在getTask方法的最后。
         */
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            /**
             * 因為邏輯是在拋出中斷異常后來到這里的,那說明線程池的狀態已經在shutdown方法中被置為shutdown了,rs >= SHUTDOWN為true,rs >=STOP為false(只有TIDYING和TERMINATED狀態會大于stop)
             * 這時候,如果workQueue為空,判斷為真,線程被銷毀。
             * 否則,workQueue為非空,判斷為假,線程不會進入銷毀邏輯。
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();//循環的CAS減少worker數量,直到成功
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?

            //因為在catch塊中,timeOut已經為false了。
            //所以只要不發生當前線程數超過最大線程數這種極端情況,命題(wc > maximumPoolSize || (timed && timedOut)一定為false,線程依舊不被銷毀。
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //繼續執行正常的從阻塞隊列中取任務的邏輯,直到阻塞隊列徹底為空,這時候,上面第一個if判斷符合,線程被銷毀,壽命徹底結束。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //如果正常返回,那么返回取到的task。
                if (r != null)
                    return r;
                //否則,設為超時,重新執行循環,
                timedOut = true;
            } catch (InterruptedException retry) {
                //捕獲中斷異常
                timedOut = false;
            }
        }
    }

總結:正阻塞在getTask()獲取任務的worker在被中斷后,會拋出InterruptedException,不再阻塞獲取任務。捕獲中斷異常后,將繼續循環到getTask()最開始的判斷線程池狀態的邏輯,當線程池是shutdown狀態,且workQueue.isEmpty時,return null,進行worker線程退出邏輯

所以,這就是我們為什么說,shutdown方法不會立刻停止線程池,它的作用是阻止新的任務被添加進來(邏輯在addWorker方法的第一個if判斷中,可以返回去看一下),并且繼續處理完剩下的任務,然后tryTerminated,嘗試關閉。

tryTerminate方法
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     * 在以下情況將線程池變為TERMINATED終止狀態
     * shutdown 且 正在運行的worker 和 workQueue隊列 都empty
     * stop 且  沒有正在運行的worker
     * 
     * 這個方法必須在任何可能導致線程池終止的情況下被調用,如:
     * 減少worker數量
     * shutdown時從queue中移除任務
     * 
     * 這個方法不是私有的,所以允許子類ScheduledThreadPoolExecutor調用
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
             * 線程池是否需要終止
             * 如果以下3中情況任一為true,return,不進行終止
             * 1、還在運行狀態
             * 2、狀態是TIDYING、或 TERMINATED,已經終止過了
             * 3、SHUTDOWN 且 workQueue不為空
             */
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
                /**
                 * 只有shutdown狀態 且 workQueue為空,或者 stop狀態能執行到這一步
                 * 如果此時線程池還有線程(正在運行任務或正在等待任務,總之count不等于0)
                 * 中斷喚醒一個正在等任務的空閑worker
                 *(中斷喚醒的意思就是讓阻塞在阻塞隊列中的worker拋出異常,然后重新判斷狀態,getTask方法邏輯)
                 * 線程被喚醒后再次判斷線程池狀態,會return null,進入processWorkerExit()流程(runWorker邏輯)
                 */
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);//中斷workers集合中的空閑任務,參數為true,只中斷一個。(該邏輯的意義應該在于通知被阻塞在隊列中的線程:別瞎jb等了,這個線程池都要倒閉了,趕緊收拾鋪蓋準備銷毀吧你個逼玩意兒)。
                //嘗試終止失敗,返回。可能大家會有疑問,shutdown只調用了一次tryTerminate方法,如果一次嘗試失敗了,是不是就意味著shutdown方法很可能最終無法終止線程池?
                //其實看注釋,我們知道線程池在進行所有負面效益的操作時都會調用該方法嘗試終止,上面我們中斷了一個阻塞線程讓他被銷毀,他銷毀時也會嘗試終止(這其中又喚醒了一個阻塞線程去銷毀),以此類推,直到最后一個線程執行tryTerminate時,邏輯才有可能走到下面去。
                return;
            }
            /**
             * 如果狀態是SHUTDOWN,workQueue也為空了,正在運行的worker也沒有了,開始terminated
             */
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //CAS:將線程池的ctl變成TIDYING(所有的任務被終止,workCount為0,為此狀態時將會調用terminated()方法),期間ctl有變化就會失敗,會再次for循環
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        //方法為空,需子類實現
                        terminated();
                    } finally {
                        //將狀態置為TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //最后執行termination.signalAll(),并喚醒所有等待線程池終止這個Condition的線程(也就是調用了awaitTermination方法的線程,這個方法的作用是阻塞調用它的線程,直到調用該方法的線程池真的已經被終止了。)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

總結一下:tryTerminate被調用的時機主要有:
1,shutdown方法時
2,processWorkerExit方法銷毀一個線程時
3,addWorkerFailed方法添加線程失敗或啟動線程失敗時
4,remove方法,從阻塞隊列中刪掉一個任務時

shutdownNow方法

我們知道,shutdown后線程池將變成shutdown狀態,此時不接收新任務,但會處理完正在運行的 和 在阻塞隊列中等待處理的任務。

我們接下來要說的shutdownNow方法,作用是:shutdownNow后線程池將變成stop狀態,此時不接收新任務,不再處理在阻塞隊列中等待的任務,還會嘗試中斷正在處理中的工作線程。
代碼如下:

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 嘗試停止所有活動的正在執行的任務,停止等待任務的處理,并返回正在等待被執行的任務列表
     * 這個任務列表是從任務隊列中排出(刪除)的
     * 

This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * 這個方法不用等到正在執行的任務結束,要等待線程池終止可使用awaitTermination() *

There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * 除了盡力嘗試停止運行中的任務,沒有任何保證 * 取消任務是通過Thread.interrupt()實現的,所以任何響應中斷失敗的任務可能

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/71020.html

相關文章

  • 使用 Executors,ThreadPoolExecutor,創建線程源碼分析理解

    摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 評論0 收藏0
  • 后端ing

    摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...

    roadtogeek 評論0 收藏0
  • 線程源碼分析

    摘要:線程池的作用線程池能有效的處理多個線程的并發問題,避免大量的線程因為互相強占系統資源導致阻塞現象,能夠有效的降低頻繁創建和銷毀線程對性能所帶來的開銷。固定的線程數由系統資源設置。線程池的排隊策略與有關。線程池的狀態值分別是。 線程池的作用 線程池能有效的處理多個線程的并發問題,避免大量的線程因為互相強占系統資源導致阻塞現象,能夠有效的降低頻繁創建和銷毀線程對性能所帶來的開銷。 線程池的...

    enda 評論0 收藏0
  • 一看就懂的Java線程分析詳解

    摘要:任務性質不同的任務可以用不同規模的線程池分開處理。線程池在運行過程中已完成的任務數量。如等于線程池的最大大小,則表示線程池曾經滿了。線程池的線程數量。獲取活動的線程數。通過擴展線程池進行監控。框架包括線程池,,,,,,等。 Java線程池 [toc] 什么是線程池 線程池就是有N個子線程共同在運行的線程組合。 舉個容易理解的例子:有個線程組合(即線程池,咱可以比喻為一個公司),里面有3...

    Yangder 評論0 收藏0
  • Java ThreadPoolExecutor 線程源碼分析

    摘要:線程池常見實現線程池一般包含三個主要部分調度器決定由哪個線程來執行任務執行任務所能夠的最大耗時等線程隊列存放并管理著一系列線程這些線程都處于阻塞狀態或休眠狀態任務隊列存放著用戶提交的需要被執行的任務一般任務的執行的即先提交的任務先被執行調度 線程池常見實現 線程池一般包含三個主要部分: 調度器: 決定由哪個線程來執行任務, 執行任務所能夠的最大耗時等 線程隊列: 存放并管理著一系列線...

    greatwhole 評論0 收藏0
  • Java調度線程ScheduledThreadPoolExecutor源碼分析

    摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....

    kohoh_ 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<