摘要:參考類似的思路,最簡單的做法,我們可以直接定義一個,當隊列滿時改為調用來實現生產者的阻塞這樣,我們就無需再關心和的邏輯,只要把精力集中在生產者和消費者線程的實現邏輯上,只管往線程池提交任務就行了。
在各種并發編程模型中,生產者-消費者模式大概是最常用的了。在實際工作中,對于生產消費的速度,通常需要做一下權衡。通常來說,生產任務的速度要大于消費的速度。一個細節問題是,隊列長度,以及如何匹配生產和消費的速度。
一個典型的生產者-消費者模型如下:
在并發環境下利用J.U.C提供的Queue實現可以很方便地保證生產和消費過程中的線程安全。這里需要注意的是,Queue必須設置初始容量,防止生產者生產過快導致隊列長度暴漲,最終觸發OutOfMemory。
對于一般的生產快于消費的情況。當隊列已滿時,我們并不希望有任何任務被忽略或得不到執行,此時生產者可以等待片刻再提交任務,更好的做法是,把生產者阻塞在提交任務的方法上,待隊列未滿時繼續提交任務,這樣就沒有浪費的空轉時間了。阻塞這一點也很容易,BlockingQueue就是為此打造的,ArrayBlockingQueue和LinkedBlockingQueue在構造時都可以提供容量做限制,其中LinkedBlockingQueue是在實際操作隊列時在每次拿到鎖以后判斷容量。
更進一步,當隊列為空時,消費者拿不到任務,可以等一會兒再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,當有任務時便可以立即獲得執行,建議調用take的帶超時參數的重載方法,超時后線程退出。這樣當生產者事實上已經停止生產時,不至于讓消費者無限等待。
于是一個高效的支持阻塞的生產消費模型就實現了。
等一下,既然J.U.C已經幫我們實現了線程池,為什么還要采用這一套東西?直接用ExecutorService不是更方便?
我們來看一下ThreadPoolExecutor的基本結構:
可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已經幫我們實現好了,并且直接采用線程池的實現還有很多優勢,例如線程數的動態調整等。
但問題在于,即便你在構造ThreadPoolExecutor時手動指定了一個BlockingQueue作為隊列實現,事實上當隊列滿時,execute方法并不會阻塞,原因在于ThreadPoolExecutor調用的是BlockingQueue非阻塞的offer方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
這時候就需要做一些事情來達成一個結果:當生產者提交任務,而隊列已滿時,能夠讓生產者阻塞住,等待任務被消費。
關鍵在于,在并發環境下,隊列滿不能由生產者去判斷,不能調用ThreadPoolExecutor.getQueue().size()來判斷隊列是否滿。
線程池的實現中,當隊列滿時會調用構造時傳入的RejectedExecutionHandler去拒絕任務的處理。默認的實現是AbortPolicy,直接拋出一個RejectedExecutionException。
幾種拒絕策略在這里就不贅述了,這里和我們的需求比較接近的是CallerRunsPolicy,這種策略會在隊列滿時,讓提交任務的線程去執行任務,相當于讓生產者臨時去干了消費者干的活兒,這樣生產者雖然沒有被阻塞,但提交任務也會被暫停。
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a CallerRunsPolicy. */ public CallerRunsPolicy() { } /** * Executes task r in the caller"s thread, unless the executor * has been shut down, in which case the task is discarded. * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
但這種策略也有隱患,當生產者較少時,生產者消費任務的時間里,消費者可能已經把任務都消費完了,隊列處于空狀態,當生產者執行完任務后才能再繼續生產任務,這個過程中可能導致消費者線程的饑餓。
參考類似的思路,最簡單的做法,我們可以直接定義一個RejectedExecutionHandler,當隊列滿時改為調用BlockingQueue.put來實現生產者的阻塞:
new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { // should not be interrupted } } } };
這樣,我們就無需再關心Queue和Consumer的邏輯,只要把精力集中在生產者和消費者線程的實現邏輯上,只管往線程池提交任務就行了。
相比最初的設計,這種方式的代碼量能減少不少,而且能避免并發環境的很多問題。當然,你也可以采用另外的手段,例如在提交時采用信號量做入口限制等,但是如果僅僅是要讓生產者阻塞,那就顯得復雜了。
via ifeve
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64042.html
摘要:下面是線程相關的熱門面試題,你可以用它來好好準備面試。線程安全問題都是由全局變量及靜態變量引起的。持有自旋鎖的線程在之前應該釋放自旋鎖以便其它線程可以獲得自旋鎖。 最近看到網上流傳著,各種面試經驗及面試題,往往都是一大堆技術題目貼上去,而沒有答案。 不管你是新程序員還是老手,你一定在面試中遇到過有關線程的問題。Java語言一個重要的特點就是內置了對并發的支持,讓Java大受企業和程序員...
摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
摘要:一和并發包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于的使用。寫鎖與讀鎖之間互斥,一個線程在寫時,不允許讀操作。的注意事項不支持重入,即不可反復獲取同一把鎖。沒有返回值,也就是說無法獲取執行結果。 一、Lock 和 Condition Java 并發包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于 synchron...
摘要:第二還是大家對線程池的理解不夠深刻,比如今天要探討的內容。我認為線程池它就是一個調度任務的工具。而在線程池這個場景中卻恰好就是要利用它只是一個普通方法調用。 showImg(https://segmentfault.com/img/remote/1460000018653817); 背景 上周分享了一篇《一個線程罷工的詭異事件》,最近也在公司內部分享了這個案例。 無獨有偶,在內部分享的...
摘要:本文探討并發中的其它問題線程安全可見性活躍性等等。當閉鎖到達結束狀態時,門打開并允許所有線程通過。在從返回時被叫醒時,線程被放入鎖池,與其他線程競爭重新獲得鎖。 本文探討Java并發中的其它問題:線程安全、可見性、活躍性等等。 在行文之前,我想先推薦以下兩份資料,質量很高:極客學院-Java并發編程讀書筆記-《Java并發編程實戰》 線程安全 《Java并發編程實戰》中提到了太多的術語...
閱讀 2822·2021-10-13 09:48
閱讀 3785·2021-10-13 09:39
閱讀 3597·2021-09-22 16:04
閱讀 1828·2021-09-03 10:48
閱讀 843·2021-08-03 14:04
閱讀 2363·2019-08-29 15:18
閱讀 3406·2019-08-26 12:19
閱讀 2873·2019-08-26 12:08