摘要:對于指的是線程池中正在運行的線程。總結現在就可以知道,大致的線程池實現原理首先,各自存放線程和任務,其中,任務帶有阻塞。
前言
文章主要來自:點這里 。這也是博主的博客,主要分享了自己接觸過的一些后端技術,有不對的地方希望可以提出。
線程池的相關類我們都知道,所謂線程池,那么就是相當于有一個池子,線程就放在這個池子中進行重復利用,能夠減去了線程的創建和銷毀所帶來的代價。但是這樣并不能很好的解釋線程池的原理,下面從代碼的角度分析一下線程池的實現。
對于原理,在 Java 中,有幾個接口,類 值得我們關注:
Executor
ExecutorService
AbstractExecutorService
ThreadPoolExecutor
Executorpublic interface Executor { void execute(Runnable command); }
Executor 接口只有一個 方法,execute,并且需要 傳入一個 Runnable 類型的參數。那么它的作用自然是 具體的執行參數傳入的任務。
ExecutorServicepublic interface ExecutorService extends Executor { void shutdown(); ListshutdownNow(); boolean isShutdown(); Future submit(Callable task); Future submit(Runnable task, T result); Future> submit(Runnable task); List > invokeAll(Collection extends Callable > tasks) throws InterruptedException; ...... }
ExecutorService 接口繼承了 Executor,并且提供了一些其他的方法,比如說:
shutdownNow : 關閉線程池,返回放入了線程池,但是還沒開始執行的線程。
submit : 執行的任務 允許擁有返回值。
invokeAll : 運行把任務放進集合中,進行批量的執行,并且能有返回值
這三個方法也可以說是這個接口重點擴展的方法。
Ps:execute 和 submit 區別:
submit 有返回值,execute 沒有返回值。 所以說可以根據任務有無返回值選擇對應的方法。
submit 方便異常的處理。 如果任務可能會拋出異常,而且希望外面的調用者能夠感知這些異常,那么就需要調用 submit 方法,通過捕獲 Future.get 拋出的異常。
AbstractExecutorServiceAbstractExecutorService 是一個抽象類,主要完成了 對 submit 方法,invokeAll 方法 的實現。 但是其實它的內部還是調用了 execute 方法,例如:
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureThreadPoolExecutorftask = newTaskFor(task, null); execute(ftask); return ftask; }
ThreadPoolExecutor 繼承了 AbstractExecutorService,并且實現了最重要的 execute 方法,是我們主要需要研究的類。另外,整個線程池是如何實現的呢?
在該類中,有兩個成員變量 非常的重要:
private final HashSetworkers = new HashSet (); private final BlockingQueue workQueue;
對于 workers 變量,主要存在了線程對象 Worker,Worker 實現了 Runnable 接口。而對于 workQueue 變量,主要存放了需要執行的任務。 這樣其實可以猜到, 整個線程池的實現原理應該是 workQueue 中不斷的取出需要執行的任務,放在 workers 中進行處理。
另外,當線程池中的線程用完了之后,多余的任務會等待,那么這個等待的過程是 怎么實現的呢? 其實如果熟悉 BlockingQueue,那么就會馬上知道,是利用了 BlockingQueue 的take 方法進行處理。
下面具體代碼分析:
public void execute(Runnable command) { ...... if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } ...... }
首先,這里需要先理解兩個概念。我們在創建線程池的時候,通常會指定兩個變量,一個是maximumPoolSize,另外一個是 corePoolSize。
對于 maximumPoolSize:指的是 線程池中最多允許有多少個線程。
對于 corePoolSize: 指的是線程池中正在運行的線程。
在 線程池中,有這樣的設定,我們加入一個任務進行執行,
如果現在線程池中正在運行的線程數量大于 corePoolSize 指定的值而 小于maximumPoolSize 指定的值,那么就會創建一個線程對該任務進行執行,一旦一個線程被創建運行。
如果線程池中的線程數量大于corePoolSize,那么這個任務執行完畢后,該線程會被回收;如果 小于corePoolSize,那么該線程即使空閑,也不會被回收。下個任務過來,那么就使用這個空閑線程。
對于上述代碼,首先有:
if (workerCountOf(c) < corePoolSize)
也就是說,判斷現在的線程數量是否小于corePoolSize,如果小于,那么就創建一個線程執行該任務,也就是執行
addWorker(command, true)
如果大于,那么就把該任務放進隊列當中,即
workQueue.offer(command)
那么,addWorker 是干什么的呢?
private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } ...... }
在這里可以看到一些關鍵代碼,例如 w = new Worker(firstTask), 以及 workers.add(w); 從這里 我們就可以看到,創建 線程對象 并且加入到 線程 隊列中。但是,我們現在還沒有看到具體是怎么執行任務的,繼續追蹤
w = new Worker(firstTask),如下代碼:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ...... final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } ......
對于 runWorker 方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } ...... }
在這段代碼中,就有很多關鍵的信息,比如說,Runnable task = w.firstTask;如果為空,那么就 執行 task = getTask(),如果不為空,那么就 進行 task.run(); 調用其方法,這里也就是具體的執行的任務。
現在知道了是怎么樣執行具體的任務,那么假如任務的數量 大于 線程池的數量,那么是怎么實現等待的呢,這里就需要看到getTask() 的具體實現了,如下:
private Runnable getTask() { for (;;) { ...... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
這里可以看到, 一個 for 死循環,以及
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
而 workQueue 是 BlockingQueue 類型,也就是帶有阻塞的功能。
這就是 線程如何等待執行的。
總結現在就可以知道,大致的線程池實現原理:
首先,各自存放線程和任務,其中,任務帶有阻塞。
private final HashSetworkers = new HashSet (); private final BlockingQueue workQueue;
然后,在 execute 方法中 進行 addWorker(command,true),也就是創建一個線程,把任務放進去執行;或者是直接把任務放入到任務隊列中。
接著 如果是 addWorker,那么就會 new Worker(task) -》調用其中 run() 方法,在Worker 的run() 方法中,調用 runWorker(this); 方法 -》在該方法中就會具體執行我們的任務 task.run(); 同時這個 runWorker方法相當于是個死循環,正常情況下就會一直取出 任務隊列中的任務來執行,這就保證了線程 不會銷毀。
所以,這也是為什么常說的線程池可以避免線程的頻繁創建和 銷毀帶來的性能消耗。
寫在最后寫出來,說出來才知道對不對,知道不對才能改正,改正了才能成長。
在技術方面,希望大家眼里都容不得沙子。如果有不對的地方或者需要改進的地方希望可以指出,萬分感謝。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64900.html
摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...
摘要:進程線程與協程它們都是并行機制的解決方案。選擇是任意性的,并在對實現做出決定時發生。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。此線程池支持定時以及周期性執行任務的需求。 并發與并行的概念 并發(Concurrency): 問題域中的概念—— 程序需要被設計成能夠處理多個同時(或者幾乎同時)發生的事件 并行(Parallel...
摘要:為程序員金三銀四精心挑選的余道面試題與答案,歡迎大家向我推薦你在面試過程中遇到的問題我會把大家推薦的問題添加到下面的常用面試題清單中供大家參考。 為Java程序員金三銀四精心挑選的300余道Java面試題與答案,歡迎大家向我推薦你在面試過程中遇到的問題,我會把大家推薦的問題添加到下面的常用面試題清單中供大家參考。 前兩天寫的以下博客,大家比較認可,熱度不錯,希望可以幫到準備或者正在參加...
摘要:今天給大家總結一下,面試中出鏡率很高的幾個多線程面試題,希望對大家學習和面試都能有所幫助。指令重排在單線程環境下不會出先問題,但是在多線程環境下會導致一個線程獲得還沒有初始化的實例。使用可以禁止的指令重排,保證在多線程環境下也能正常運行。 下面最近發的一些并發編程的文章匯總,通過閱讀這些文章大家再看大廠面試中的并發編程問題就沒有那么頭疼了。今天給大家總結一下,面試中出鏡率很高的幾個多線...
摘要:線程池的工作原理一個線程池管理了一組工作線程,同時它還包括了一個用于放置等待執行任務的任務隊列阻塞隊列。使用線程池可以對線程進行統一的分配和監控。線程池的注意事項雖然線程池是構建多線程應用程序的強大機制,但使用它并不是沒有風險的。 線程池的工作原理一個線程池管理了一組工作線程, 同時它還包括了一個用于放置等待執行 任務的任務隊列(阻塞隊列) 。 一個線程池管理了一組工作線程, 同時它還...
閱讀 3437·2021-09-26 09:46
閱讀 2785·2021-09-13 10:23
閱讀 3519·2021-09-07 10:24
閱讀 2392·2019-08-29 13:20
閱讀 2922·2019-08-28 17:57
閱讀 3076·2019-08-26 13:27
閱讀 1179·2019-08-26 12:09
閱讀 510·2019-08-26 10:27