摘要:用于限定中線程數的最大值。該線程池中的任務隊列維護著等待執行的對象。線程池和消息隊列筆者在實際工程應用中,使用過多線程和消息隊列處理過異步任務。以上是筆者在學習實踐之后對于多線程和消息隊列的粗淺認識,初學者切莫混淆兩者的作用。
多線程編程很難,難點在于多線程代碼的執行不是按照我們直覺上的執行順序。所以多線程編程必須要建立起一個宏觀的認識。
線程池是多線程編程中的一個重要概念。為了能夠更好地使用多線程,學習好線程池當然是必須的。
為什么要使用線程池?平時我們在使用多線程的時候,通常都是架構師配置好了線程池的 Bean,我們需要使用的時候,提交一個線程即可,不需要過多關注其內部原理。
在學習一門新的技術之前,我們還是先了解下為什么要使用它,使用它能夠解決什么問題:
創建/銷毀線程伴隨著系統開銷,過于頻繁的創建/銷毀線程,會很大程度上影響處理效率
例如:
記創建線程消耗時間T1,執行任務消耗時間T2,銷毀線程消耗時間T3
如果T1+T3>T2,那么是不是說開啟一個線程來執行這個任務太不劃算了!
正好,線程池緩存線程,可用已有的閑置線程來執行新任務,避免了T1+T3帶來的系統開銷
線程并發數量過多,搶占系統資源從而導致阻塞
我們知道線程能共享系統資源,如果同時執行的線程過多,就有可能導致系統資源不足而產生阻塞的情況
運用線程池能有效的控制線程最大并發數,避免以上的問題
對線程進行一些簡單的管理
比如:延時執行、定時循環執行的策略等
運用線程池都能進行很好的實現
創建一個線程池在 Java 中,新建一個線程池對象非常簡單,Java 本身提供了工具類java.util.concurrent.Executors,可以使用如下代碼創建一個固定數量線程的線程池:
ExecutorService service = Executors.newFixedThreadPool(10);
注意:以上代碼用來測試還可以,實際使用中最好能夠顯示地指定相關參數。
我們可以看下其內部源碼實現:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
在阿里巴巴代碼規范中,建議我們自己指定線程池的相關參數,為的是讓開發人員能夠自行理解線程池創建中的每個參數,根據實際情況,創建出合理的線程池。接下來,我們來剖析下java.util.concurrent.ThreadPoolExecutor的構造方法參數。
ThreadPoolExecutor 淺析java.util.concurrent.ThreadPoolExecutor有多個構造方法,我們拿參數最多的構造方法來舉例,以下是阿里巴巴代碼規范中給出的創建線程池的范例:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
貼一張IDEA中的圖更方便看:
首先最重要的幾個參數,可能就是:corePoolSize,maximumPoolSize,workQueue了,先看下這幾個參數的解釋:
corePoolSize
用于設定 thread pool 需要時刻保持的最小 core threads 的數量,即便這些 core threads 處于空閑狀態啥事都不做也不會將它們回收掉,當然前提是你沒有設置 allowCoreThreadTimeOut 為 true。至于 pool 是如何做到保持這些個 threads 不死的,我們稍后再說。
maximumPoolSize
用于限定 pool 中線程數的最大值。如果你自己構造了 pool 且傳入了一個 Unbounded 的 queue 且沒有設置它的 capacity,那么不好意思,最大線程數會永遠 <= corePoolSize,maximumPoolSize 變成了無效的。
workQueue
該線程池中的任務隊列:維護著等待執行的 Runnable 對象。當所有的核心線程都在干活時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務
由于本文是初步了解線程池,所以先理解這幾個參數,上文對于這三個參數的解釋,基本上跟JDK源碼中的注釋一致(java.util.concurrent.ThreadPoolExecutor#execute里的代碼)。
我們編寫個程序來方便理解:
// 創建線程池 ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); // 等待執行的runnable Runnable runnable = () -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; // 啟動的任務數量 int counts = 1224; for (int i = 0; i < counts; i++) { service.execute(runnable); } // 監控線程池執行情況的代碼 ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("當前排隊線程數:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("當前活動線程數:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("執行完成線程數:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("總線程數:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
線程池的容量與我們啟動的任務數量息息相關。
已知:
corePoolSize = 5
maximumPoolSize = 200
workQueue.size() = 1024
我們修改同時 execute 添加到線程池的 Runnable 數量 counts:
counts <= corePoolSize:所有的任務均為核心線程執行,沒有任何 Runnable 被添加到 workQueue中
當前排隊線程數:0 當前活動線程數:3 執行完成線程數:0 總線程數:3
corePoolSize < counts <= corePoolSize + workQueue.size():所有任務均為核心線程執行,當核心線程處于繁忙狀態,則將任務添加到 workQueue 中等待
當前排隊線程數:15 當前活動線程數:5 執行完成線程數:0 總線程數:20
corePoolSize + workQueue.size() < counts <= maximumPoolSize + workQueue.size():corePoolSize 個線程由核心線程執行,超出隊列長度 workQueue.size() 的任務,將另啟動非核心線程執行
當前排隊線程數:1024 當前活動線程數:105 執行完成線程數:0 總線程數:1129
counts > maximumPoolSize + workQueue.size():將會報異常java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]線程池踩坑:線程嵌套導致阻塞
這次的踩坑才是我寫這篇文章的初衷,借此機會好好了解下線程池的各個概念。本身這段時間在研究爬蟲,為了盡量提高爬蟲的效率,用到了多線程處理。由于代碼寫得比較隨性,所以遇到了一個阻塞的問題,研究了一下才搞明白,模擬的代碼如下:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); @Test public void testBlock() { Runnable runnableOuter = () -> { try { Runnable runnableInner1 = () -> { try { TimeUnit.SECONDS.sleep(3); // 模擬比較耗時的爬蟲操作 } catch (InterruptedException e) { e.printStackTrace(); } }; Future> submit = service.submit(runnableInner1); submit.get(); // 實際業務中,runnableInner2需要用到此處返回的參數,所以必須get Runnable runnableInner2 = () -> { try { TimeUnit.SECONDS.sleep(5); // 模擬比較耗時的爬蟲操作 } catch (InterruptedException e) { e.printStackTrace(); } }; Future> submit2 = service.submit(runnableInner2); submit2.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }; for (int i = 0; i < 20; i++) { service.execute(runnableOuter); } ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("當前排隊線程數:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("當前活動線程數:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("執行完成線程數:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("總線程數:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
線程池是前文的線程池,參數完全不變。線程的監控代碼也一致。當我們運行這個單元測試的時候,會發現打印出來的結果一直是如下:
當前排隊線程數:15 當前活動線程數:5 執行完成線程數:0 總線程數:20 當前排隊線程數:20 當前活動線程數:5 執行完成線程數:0 總線程數:25 當前排隊線程數:20 當前活動線程數:5 執行完成線程數:0 總線程數:25 ……略
根本問題是 Runnable 內部還嵌套了 Runnable ,且他們都提交到了一個線程池。下面分步驟說明問題:
runnableOuter 被提交到了線程池
runnableOuter 開始執行,runnableInner1 被提交到線程池,對 runnableInner1 的結果進行 get,導致runnableOuter 被阻塞
于此同時,更多的 runnableOuter 被提交到線程池,核心線程被 runnableOuter 和 runnableInner1 占滿,多余的線程 runnableInner2 被加入 workQueue 中等待執行
runnableInner2 被提交到線程池,但是因為核心線程已滿,被提交到了 workQueue ,也處于阻塞狀態,此時對 runnableInner2 的結果進行 get,導致 runnableOuter 被阻塞
runnableOuter 被阻塞,無法釋放核心線程資源,而 runnableInner2 又因為無法得到核心線程資源,只能呆在 workQueue 里,導致整個程序卡死,無法返回。(有點類似死鎖,互相占有了資源,對方不釋放,我也不釋放)
用圖表示大概為:
既然明白了出錯的原因,那么解決起來就簡單了。這個案例告訴我們,設計一個多線程程序,一定要自頂向下有一個良好的設計,然后再開始編碼,不能夠盲目地使用多線程、線程池,這樣只會導致程序出現莫名其妙的錯誤。
動態修改 corePoolSize & maximumPoolSize其實這個我沒怎么關注過,曾經在一次面試中被問到過。很簡單,java.util.concurrent.ThreadPoolExecutor提供了Setter方法,可以直接設置相關參數。按我目前的實踐經驗,幾乎沒有用到過,但是知道這個聊勝于無吧。特定的復雜場景下應該很有用。
線程池和消息隊列筆者在實際工程應用中,使用過多線程和消息隊列處理過異步任務。很多新手工程師往往弄不清楚這兩者的區別。按筆者的淺見:
多線程是用來充分利用多核 CPU 以提高程序性能的一種開發技術,線程池可以維持一個隊列保存等待處理的多線程任務,但是由于此隊列是內存控制的,所以斷電或系統故障后未執行的任務會丟失。
消息隊列是為消息處理而生的一門技術。其根據消費者的自身消費能力進行消費的特性使其廣泛用于削峰的高并發任務處理。此外利用其去耦合的特性也可以實現代碼上的解耦。消息隊列大多可以對其消息進行持久化,即使斷電也能夠恢復未被消費的任務并繼續處理。
以上是筆者在學習實踐之后對于多線程和消息隊列的粗淺認識,初學者切莫混淆兩者的作用。
參考文獻:
Deep thinking in Java thread pool
線程池,這一篇或許就夠了
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73575.html
摘要:參數說明,線程池保留的最小線程數。,線程池中允許擁有的最大線程數。,線程池的運行狀態。除非線程池狀態發生了變化,發退回到外層循環重新執行,判斷線程池的狀態。是線程池的核心控制狀態,包含的線程池運行狀態和有效線程數。 Java是一門多線程的語言,基本上生產環境的Java項目都離不開多線程。而線程則是其中最重要的系統資源之一,如果這個資源利用得不好,很容易導致程序低效率,甚至是出問題。 有...
摘要:前言最近發現很多小伙伴對于線程池的原理不是特別的理解,所以想通過這篇文章來讓大家更好的認識線程池的原理,了解到其是如何工作的講解下面我會將線程池比作一個公司的一個部門,介紹線程池如何工作的,同時介紹其中的一些關鍵組件和參數。 前言 最近發現很多小伙伴對于Java線程池ThreadPoolExecutor的原理不是特別的理解,所以想通過這篇文章來讓大家更好的認識線程池的原理,了解到其是如...
摘要:深入理解線程池線程池初探所謂線程池,就是將多個線程放在一個池子里面所謂池化技術,然后需要線程的時候不是創建一個線程,而是從線程池里面獲取一個可用的線程,然后執行我們的任務。最后的的意思是需要確保線程池已經被啟動起來了。 深入理解Java線程池 線程池初探 ?所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術),然后需要線程的時候不是創建一個線程,而是從線程池里面獲取一個可用的線程...
摘要:本文是作者自己對中線程的狀態線程間協作相關使用的理解與總結,不對之處,望指出,共勉。當中的的數目而不是已占用的位置數大于集合番一文通版集合番一文通版垃圾回收機制講得很透徹,深入淺出。 一小時搞明白自定義注解 Annotation(注解)就是 Java 提供了一種元程序中的元素關聯任何信息和著任何元數據(metadata)的途徑和方法。Annotion(注解) 是一個接口,程序可以通過...
閱讀 3517·2021-09-27 13:35
閱讀 3557·2019-08-29 17:09
閱讀 2425·2019-08-26 11:30
閱讀 697·2019-08-26 10:32
閱讀 532·2019-08-26 10:23
閱讀 1193·2019-08-26 10:20
閱讀 3149·2019-08-23 15:26
閱讀 3551·2019-08-23 14:33