摘要:的多線程機制可彌補拋出未檢查的異常,將終止線程執行,此時會錯誤的認為任務都取消了。如果想要不保留,則需要設置,此時最小的就是線程池最大的線程數。
提供Executor的工廠類
忽略了自定義的ThreadFactory、callable和unconfigurable相關的方法
newFixedxxx:在任意時刻,最多有nThreads個線程在處理task;如果所有線程都在運行時來了新的任務,它會被扔入隊列;如果有線程在執行期間因某種原因終止了運行,如果需要執行后續任務,新的線程將取代它
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
newCachedxxx:新任務到來如果線程池中有空閑的線程就復用,否則新建一個線程。如果一個線程超過60秒沒有使用,它就會被關閉移除線程池
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
newSingleThreadExecutor:僅使用一個線程來處理任務,如果這線程掛了,會產生一個新的線程來代替它。每一個任務被保證按照順序執行,而且一次只執行一個
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
使用newFixedxxx方法也能實現類似的作用,但是ThreadPoolExecutor會提供修改線程數的方法,FinalizableDelegatedExecutorService則沒有修改的途徑,它在DelegatedExecutorService的基礎
上僅提供了執行finalize時候去關閉線程,而DelegatedExecutorService僅暴漏ExecutorService自身的方法
newScheduledThreadPool:提供一個線程池來延遲或者定期執行任務
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
newSingleThreadScheduledExecutor:提供單個線程來延遲或者定期執行任務,如果執行的線程掛了,會生成新的。
return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
同樣,它保證返回的Executor自身的線程數不可修改
從上述的實現可以看出,核心在于三個部分
ThreadPoolExecutor:提供線程數相關的控制
DelegatedExecutorService:僅暴露ExecutorService自身的方法,保證線程數不變來實現語義場景
ScheduledExecutorService:提供延遲或者定期執行的功能
對應的,相應也有不同的隊列去實現不同的場景
LinkedBlockingQueue:無界阻塞隊列
SynchronousQueue:沒有消費者消費時,新的任務就會被阻塞
DelayQueue:隊列中的任務過期之后才可以執行,否則無法查詢到隊列中的元素
DelegatedExecutorService它僅僅是包裝了ExecutorService的方法,交由傳入的ExecutorService來執行,所謂的UnConfigurable實際也就是它沒有暴漏配置各種參數調整的方法
static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public ListScheduledExecutorServiceshutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future> submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List > invokeAll(Collection extends Callable > tasks) throws InterruptedException { return e.invokeAll(tasks); } public List > invokeAll(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public T invokeAny(Collection extends Callable > tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public T invokeAny(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
提供一系列的schedule方法,使得任務可以延遲或者周期性的執行,對應schedule方法會返回ScheduledFuture以供確認是否執行以及是否要取消。它的實現ScheduledThreadPoolExecutor也支持立即執行由submit提交的任務
ThreadPoolExecutor僅支持相對延遲時間,比如距離現在5分鐘后執行。類似Timer也可以管理延遲任務和周期任務,但是存在一些缺陷:
所有的定時任務只有一個線程,如果某個任務執行時間長,將影響其它TimerTask的精確性。ScheduledExecutorService的多線程機制可彌補
TimerTask拋出未檢查的異常,將終止線程執行,此時會錯誤的認為任務都取消了。1:可以使用try-catch-finally對相應執行快處理;2:通過execute執行的方法可以設置UncaughtExceptionHandler來接收未捕獲的異常,并作出處理;3:通過submit執行的,將被封裝層ExecutionException重新拋出
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize、maximumPoolSize:ThreadPoolExecutor會根據這兩自動調整線程池的大小,當一個新任務通過execute提交的時候:
如果當前運行的線程數小于corePoolSize就新建線程;
如果當前線程數在corePoolSize與maximumPoolSize之間,則只有在隊列滿的時候才會創建新的線程;
如果已經達到最大線程數,并且隊列都滿了,在這種飽和狀態下就會執行拒絕策略
默認情況下,只有新任務到達的時候才會啟動線程,可通過prestartCoreThread方法實現事先啟動
corePoolSize:默認線程池所需要維護的最小的worker的數量,就算是worker過期了也會保留。如果想要不保留,則需要設置allowCoreThreadTimeOut,此時最小的就是0
maximumPoolSize:線程池最大的線程數。java限制最多為 2^29-1,大約5億個
keepAliveTime、unit:如果當前線程池有超過corePoolSize的線程數,只要有線程空閑時間超過keepAliveTime的設定,就會被終止;unit則是它的時間單位
workQueue:任何BlockingQueue都可以使用,基本上有三種
Direct handoffs,直接交付任務。比如 SynchronousQueue,如果沒有線程消費,提交任務會失敗,當然可以新建一個線程來處理。它適合處理有依賴關系的任務,一般它的maximumPoolSizes會被設置成最大的
Unbounded queues,無界隊列。比如LinkedBlockingQueue,這意味著如果有corePoolSize個線程在執行,那么其他的任務都只能等待。它適合于處理任務都是互相獨立的,
Bounded queues,有界隊列。比如ArrayBlockingQueue,需要考慮隊列大小和最大線程數之間的關系,來達到更好的資源利用率和吞吐量
threadFactory:沒有指定的時候,使用Executors.defaultThreadFactory
RejectedExecutionHandler:通過execute添加的任務,如果Executor已經關閉或者已經飽和了(線程數達到了maximumPoolSize,并且隊列滿了),就會執行,java提供了4種策略:
AbortPolicy,拒絕的時候拋出運行時異常RejectedExecutionException;
CallerRunsPolicy,如果executor沒有關閉,那么由調用execute的線程來執行它;
DiscardPolicy,直接扔掉新的任務;
DiscardOldestPolicy,如果executor沒有關閉,那么扔掉隊列頭部的任務,再次嘗試;
ThreadPoolExecutor可自定義beforeExecutor、afterExecutor可以用來添加日志統計、計時、件事或統計信息收集功能,無論run是正常返回還是拋出異常,afterExecutor都會被執行。如果beforeExecutor拋出RuntimeException,任務和afterExecutor都不會被執行。terminated在所有任務都已經完成,并且所有工作者線程關閉后會調用,此時也可以用來執行發送通知、記錄日志等等。如何估算線程池的大小
計算密集型,通常在擁有$N_{cpu}$個處理器的系統上,線程池大小設置為$N_{cpu}+1$能夠實現最優的利用率;
$N_{cpu}$ cpu的個數
I/O密集型或者其它阻塞型的任務,定義 $N_{cpu}$為CPU的個數,$U_{cpu}$為CPU的利用率,$W/C$為等待時間與計算時間的比率,此時線程池的最優大小為
$$N_{threads}=N_{cpu}*U_{cpu}*(1+W/C)$$
場景說明將一個網站的業務抽象成如下幾塊
接收客戶端請求與處理請求
頁面渲染返回的文本和圖片
獲取頁面的廣告
接收請求與處理請求 理論模型理論上,服務端通過實現約定的接口就可以實現接收請求和處理連續不斷的請求過來
ServerSocket socket = new ServerSocket(80); while(true){ Socket conn = socket.accept(); handleRequest(conn) }
缺點:每次只能處理一個請求,新請求到來時,必須等到正在處理的請求處理完成,才能接收新的請求
顯示的創建多線程為每個請求創建新的線程提供服務
ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } new Thread(task).start(); }
缺點:
線程的創建和銷毀都有一定的開銷,延遲對請求的處理;
創建后的線程多于可用處理器的數量,造成線程閑置,這會給垃圾回收帶來壓力
存活的大量線程競爭CPU資源會產生很多性能開銷
系統上對可創建的線程數存在限制
使用線程池使用java自帶的Executor框架。
private static final Executor exec = Executors.newFixedThreadPool(100); ... ServerSocket socket = new ServerSocket(80); while(true){ final Socket conn = socket.accept(); Runnable task = new Runnable(){ public void run(){ handleRequest(conn); } } exec.execute(task); } ...
線程池策略通過實現預估好的線程需求,限制并發任務的數量,重用現有的線程,解決每次創建線程的資源耗盡、競爭過于激烈和頻繁創建的問題,也囊括了線程的優勢,解耦了任務提交和任務執行。
頁面渲染返回的文本和圖片 串行渲染renderText(source); ListimageData = new ArrayList (); for(ImageInfo info:scaForImageInfo(source)){ imageData.add(info.downloadImage()); } for(ImageData data:imageData){ renderImage(data); }
缺點:圖像的下載大部分時間在等待I/O操作執行完成,這期間CPU幾乎不做任何工作,使得用戶看到最終頁面之前要等待過長的時間
并行化渲染過程可以分成兩個部分,1是渲染文本,1是下載圖像
private static final ExecutorService exec = Executors.newFixedThreadPool(100); ... final Listinfos=scaForImageInfo(source); Callable > task=new Callable
>(){ public List
call(){ List r = new ArrayList (); for(ImageInfo info:infos){ r.add(info.downloadImage()); } return r; } }; Future > future = exec.submit(task); renderText(source); try{ List
imageData = future.get(); for(ImageData data:imageData){ renderImage(data); } }catch(InterruptedException e){ Thread.currentThread().interrupt(); future.cancel(true); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); }
使用Callable來返回下載的圖片結果,使用future來獲得下載的圖片,這樣將減少用戶所需要的等待時間。
缺點:圖片的下載很明顯時間要比文本要慢,這樣的并行化很可能速度可能只提升了1%
使用CompletionService。
private static final ExecutorService exec; ... final Listinfos=scaForImageInfo(source); CompletionService cService = new ExecutorCompletionService (exec); for(final ImageInfo info:infos){ cService.submit(new Callable (){ public ImageData call(){ return info.downloadImage(); } }); } renderText(source); try{ for(int i=0,n=info.size();t f = cService.take(); ImageData imageData=f.get(); renderImage(imageData) } }catch(InterruptedException e){ Thread.currentThread().interrupt(); }catche(ExecutionException e){ throw launderThrowable(e.getCause()); }
核心思路為為每一幅圖像下載都創建一個獨立的任務,并在線程池中執行他們,從而將串行的下載過程轉換為并行的過程
獲取頁面的廣告廣告展示如果在一定的時間以內沒有獲取,可以不再展示,并取消超時的任務。
ExecutorService exe = Executors.newFixedThreadPool(3); ListmyTasks = new ArrayList<>(); for (int i=0;i<3;i++){ myTasks.add(new MyTask(3-i)); } try { List > futures = exe.invokeAll(myTasks, 1, TimeUnit.SECONDS); for (int i=0;i invokeAll方法對于沒有完成的任務會被取消,通過CancellationException可以捕獲,invokeAll返回的序列順序和傳入的task保持一致。結果如下:
task sleep 3 not execute ,because java.util.concurrent.CancellationException task sleep 2 not execute ,because java.util.concurrent.CancellationException task execut 1 s
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72394.html
摘要:目標線程由運行狀態轉換為就緒狀態,也就是讓出執行權限,讓其他線程得以優先執行,但其他線程能否優先執行時未知的。函數的官方解釋是意思是使調用該函數的線程讓出執行時間給其他已就緒狀態的線程。 線程允許在同一個進程中同時存在多個程序控制流,即通過線程可以實現同時處理多個任務的功能。線程會共享進程范圍內的資源,例如內存句柄和文件句柄,但每個線程都有各自的程序計數器、棧以及局部變量。 多線程的實...
摘要:面向對象面向對象的三種基本特征繼承封裝多態結構化程序設計簡介主要原則自頂向下逐步求精模塊化。在面向對象方法中,類之間共享屬性和操作的機制稱為繼承。 面向對象 面向對象的三種基本特征:繼承、封裝、多態 結構化程序設計簡介 主要原則:自頂向下、逐步求精、模塊化。 結構化分析SA方法對系統進行需求分析;結構化設計SD方法對系統進行概要設計、詳細設計;結構化編程SP方法來實現系統。 結構化程序...
閱讀 3190·2021-11-10 11:35
閱讀 1295·2019-08-30 13:20
閱讀 1117·2019-08-29 16:18
閱讀 2131·2019-08-26 13:54
閱讀 2155·2019-08-26 13:50
閱讀 955·2019-08-26 13:39
閱讀 2473·2019-08-26 12:08
閱讀 1951·2019-08-26 10:37