摘要:線程池的作用降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的資源浪費(fèi)。而高位的部分,位表示線程池的狀態(tài)。當(dāng)線程池中的線程數(shù)達(dá)到后,就會(huì)把到達(dá)的任務(wù)放到中去線程池的最大長(zhǎng)度。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于時(shí),才起作用。
線程池的作用
降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的資源浪費(fèi)。
提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),不需要等到線程創(chuàng)建就能立即執(zhí)行。
方便管理線程。線程是稀缺資源,如果無(wú)限制地創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以對(duì)線程進(jìn)行統(tǒng)一的分配,優(yōu)化及監(jiān)控。
設(shè)置線程池大小設(shè)置線程池的大小可以從以下幾個(gè)方面分析入手:
系統(tǒng)中有多少個(gè)cpu?
多大的內(nèi)存?
任務(wù)是計(jì)算密集型、I/O密集集還是二者皆可?
是否需要像JDBC連接這樣的稀缺資源?
對(duì)于計(jì)算密集型的任務(wù),在擁有N個(gè)cpu的機(jī)器上,通常將線程池大小設(shè)置為N+1時(shí),能夠?qū)崿F(xiàn)最優(yōu)的利用率。 對(duì)于包含I/O操作或者其他阻塞操作的任務(wù),由于線程并不會(huì)一直執(zhí)行,因此線程池的規(guī)模應(yīng)該更大。
可通過如下公式進(jìn)行估計(jì):
$$N_{threads} = N_{cpu}*U_{cpu}*(1+frac{W}{C})$$
其中:
$$ U_{cpu} = target CPU utilization, 0 le U_{cpu} le 1$$
$$ frac{W}{C} = ration of wait time to compute time$$
可以通過Rumtime來獲得CUP的數(shù)目:
int N_CPUS = Runtime.getRuntime().availableProcessor();
當(dāng)然,CPU周期并不是唯一影響線程池大小的資源,還包括內(nèi)存、文件句柄、套接字句柄和數(shù)據(jù)庫(kù)連接等。計(jì)算方法:計(jì)算每個(gè)任務(wù)對(duì)該資源的需求量,然后用該資源的可用總量除以每個(gè)任務(wù)的需要量,所得結(jié)果就是線程池的大小上限。
線程池的實(shí)現(xiàn)原理 ThreadPoolExecutorJava的線程池針對(duì)不同應(yīng)用的場(chǎng)景,主要有固定長(zhǎng)度類型、可變長(zhǎng)度類型以及定時(shí)執(zhí)行等幾種。針對(duì)這幾種類型的創(chuàng)建,java中有一個(gè)專門的Executors類提供了一系列的方法封裝了具體的實(shí)現(xiàn)。這些功能和用途不一樣的線程池主要依賴ThreadPoolExecutor, ScheduledThreadPoolExecutor等幾個(gè)類。如前面文章討論所說,這些類和相關(guān)類的主要結(jié)構(gòu)如下:
ThreadPoolExecutor是實(shí)現(xiàn)線程池最核心的類之一。在分析ThreadPoolExecutor的實(shí)現(xiàn)原理之前,讓來看看實(shí)現(xiàn)線程池需要考慮的點(diǎn):
從線程池本身的定義來看,它是將一組事先創(chuàng)建好的線程放在一個(gè)資源池里,當(dāng)需要的時(shí)候就將該線程分配給具體的任務(wù)來執(zhí)行。那么,這個(gè)池子的大小如何確定?線程池肯定要面臨多個(gè)線程資源訪問的情況,是不是本身的結(jié)構(gòu)要保證線程安全呢?如果線程池創(chuàng)建好之后后續(xù)有若干任務(wù)使用了線程資源,當(dāng)池里面的資源使用完之后要如何安排?是給線程擴(kuò)容,創(chuàng)建更多的線程資源,還是增加一個(gè)隊(duì)列,讓一些任務(wù)先在里面排隊(duì)呢?在一些極端的情況下,比如任務(wù)數(shù)量實(shí)在太多線程池處理不過來,對(duì)于這些任務(wù)怎么處理呢?線程執(zhí)行的時(shí)候會(huì)碰到異常或都錯(cuò)誤的情況,這些異常要如何處理?如何保證這些異常的處理不會(huì)導(dǎo)致線程池其他任務(wù)的正常運(yùn)行不出錯(cuò)呢?
總結(jié)一下,這些問題可以歸納為如下幾點(diǎn):
線程池的結(jié)構(gòu);
線程池的任務(wù)分配策略;
線程池的異常和錯(cuò)誤處理機(jī)制;
下面結(jié)合ThreadPoolExecutor的實(shí)現(xiàn)源碼來詳細(xì)分析一下。
線程數(shù)量和線程池狀態(tài)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl是線程池的主要控制狀態(tài),它是一個(gè)AtomicInteger的數(shù)值,它表示了兩部分內(nèi)容:
workerCount, 表示有效線程數(shù);
runState, 表示線程池狀態(tài),是否運(yùn)行,停止等。
ctl是用一個(gè)integer(32)來包含線程池狀態(tài)和數(shù)量的表示,高三位為線程池的狀態(tài),后(2^29)-1為線程數(shù)限制。 這就是為什么前面用一個(gè)Integer.SIZE-3來作為位數(shù)。這樣這個(gè)整數(shù)的0-28位表示的就是線程的數(shù)目。而高位的部分,29-31位表示線程池的狀態(tài)。這里定義的主要有5種狀態(tài),分別對(duì)應(yīng)值是從-1到3. 他們對(duì)應(yīng)著線程的running, shutdown, stop, tidying, terminated這幾個(gè)狀態(tài)。
線程池的結(jié)構(gòu)除了以上部分外,線程池里還有如下成員:
private final BlockingQueueworkQueue; private final ReentrantLock mainLock = new ReentrantLock(); /** * Set containing all worker threads in pool. Accessed only when holding mainLock. */ private final HashSet workers = new HashSet (); /** * Wait condition to support awaitTermination */ private final Condition termination = mainLock.newCondition(); /** * Tracks largest attained pool size. Accessed only under * mainLock. */ private int largestPoolSize; /** * Counter for completed tasks. Updated only on termination of * worker threads. Accessed only under mainLock. */ private long completedTaskCount; private volatile ThreadFactory threadFactory; /** * Handler called when saturated or shutdown in execute. */ private volatile RejectedExecutionHandler handler; /** * Timeout in nanoseconds for idle threads waiting for work. * Threads use this timeout when there are more than corePoolSize * present or if allowCoreThreadTimeOut. Otherwise they wait * forever for new work. */ private volatile long keepAliveTime; /** * If false (default), core threads stay alive even when idle. * If true, core threads use keepAliveTime to time out waiting * for work. */ private volatile boolean allowCoreThreadTimeOut; /** * Core pool size is the minimum number of workers to keep alive * (and not allow to time out etc) unless allowCoreThreadTimeOut * is set, in which case the minimum is zero. */ private volatile int corePoolSize; /** * Maximum pool size. Note that the actual maximum is internally * bounded by CAPACITY. */ private volatile int maximumPoolSize;
workerQueue: 一個(gè)BlockingQueue
worker: 一個(gè)HashSet
mainLock: 一個(gè)訪問workers所需要使用的鎖。從前面的workQueue,workers這兩個(gè)結(jié)構(gòu)可以看出,如果要往線程池里面增加執(zhí)行任務(wù)或者執(zhí)行完畢一個(gè)任務(wù),都要訪問這兩個(gè)結(jié)構(gòu)。所以大多數(shù)情況下為了保證線程安全,就需要使用mainLock這個(gè)鎖。
corePoolSize:處于活躍狀態(tài)的最少worker數(shù)目。每個(gè)worker會(huì)創(chuàng)建一個(gè)新的線程去執(zhí)行任務(wù)。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)來才創(chuàng)建線程去執(zhí)行,除非調(diào)用了prestartAllCoreThreads()或prestartCoreThread()方法。當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到workerQueue中去;
maximumPoolSize: 線程池的最大長(zhǎng)度。當(dāng)線程池里面的線程數(shù)達(dá)到這個(gè)數(shù)字時(shí)就不能再往里面加了,此時(shí)會(huì)根據(jù)設(shè)置的handler參數(shù),即拒絕處理任務(wù)策略來處理新到來的任務(wù)。
keepAliveTime: 表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才起作用。當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程數(shù)為0。
threadFactory: 線程工廠,主要用來創(chuàng)建線程;
largestPoolSize: 用來記錄線程池中曾經(jīng)有過的最大線程數(shù),跟線程池的容易沒有任何關(guān)系。
handler: 表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)任務(wù)的執(zhí)行者——Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/66069.html
摘要:我的是忙碌的一年,從年初備戰(zhàn)實(shí)習(xí)春招,年三十都在死磕源碼,三月份經(jīng)歷了阿里五次面試,四月順利收到實(shí)習(xí)。因?yàn)槲倚睦砗芮宄业哪繕?biāo)是阿里。所以在收到阿里之后的那晚,我重新規(guī)劃了接下來的學(xué)習(xí)計(jì)劃,將我的短期目標(biāo)更新成拿下阿里轉(zhuǎn)正。 我的2017是忙碌的一年,從年初備戰(zhàn)實(shí)習(xí)春招,年三十都在死磕JDK源碼,三月份經(jīng)歷了阿里五次面試,四月順利收到實(shí)習(xí)offer。然后五月懷著忐忑的心情開始了螞蟻金...
摘要:比如需要用多線程或分布式集群統(tǒng)計(jì)一堆用戶的相關(guān)統(tǒng)計(jì)值,由于用戶的統(tǒng)計(jì)值是共享數(shù)據(jù),因此需要保證線程安全。如果類是無(wú)狀態(tài)的,那它永遠(yuǎn)是線程安全的。參考探索并發(fā)編程二寫線程安全的代碼 線程安全類 保證類線程安全的措施: 不共享線程間的變量; 設(shè)置屬性變量為不可變變量; 每個(gè)共享的可變變量都使用一個(gè)確定的鎖保護(hù); 保證線程安全的思路: 1. 通過架構(gòu)設(shè)計(jì) 通過上層的架構(gòu)設(shè)計(jì)和業(yè)務(wù)分析來避...
摘要:內(nèi)存模型是圍繞著在并發(fā)過程中如何處理原子性可見性和有序性這個(gè)特征來建立的,我們來看下哪些操作實(shí)現(xiàn)了這個(gè)特性。可見性可見性是指當(dāng)一個(gè)線程修改了共享變量的值,其他線程能夠立即得知這個(gè)修改。 Java內(nèi)存模型是圍繞著在并發(fā)過程中如何處理原子性、可見性和有序性這3個(gè)特征來建立的,我們來看下哪些操作實(shí)現(xiàn)了這3個(gè)特性。 原子性(atomicity): 由Java內(nèi)存模型來直接保證原子性變量操作包括...
摘要:物理計(jì)算機(jī)并發(fā)問題在介紹內(nèi)存模型之前,先簡(jiǎn)單了解下物理計(jì)算機(jī)中的并發(fā)問題。基于高速緩存的存儲(chǔ)交互引入一個(gè)新的問題緩存一致性。寫入作用于主內(nèi)存變量,把操作從工作內(nèi)存中得到的變量值放入主內(nèi)存的變量中。 物理計(jì)算機(jī)并發(fā)問題 在介紹Java內(nèi)存模型之前,先簡(jiǎn)單了解下物理計(jì)算機(jī)中的并發(fā)問題。由于處理器的與存儲(chǔ)設(shè)置的運(yùn)算速度有幾個(gè)數(shù)量級(jí)的差距,所以現(xiàn)代計(jì)算機(jī)加入一層讀寫速度盡可能接近處理器的高速緩...
摘要:禁止指令重排序優(yōu)化。只有當(dāng)線程對(duì)變量執(zhí)行的前一個(gè)動(dòng)作是時(shí),才能對(duì)執(zhí)行動(dòng)作并且,只有當(dāng)對(duì)變量執(zhí)行的后一個(gè)動(dòng)作是時(shí),線程才能對(duì)變量執(zhí)行動(dòng)作。變量不需要與其他的狀態(tài)變量共同參與不變約束。在某些情況下,的同步機(jī)制性要優(yōu)于鎖。 當(dāng)一個(gè)變量定義為volatile之后,它具備兩種特性: 保證此變量對(duì)所有線程的可見性,這里的可見性是指當(dāng)一條線程修改了這個(gè)變量的值,新值對(duì)于其他線程來說是可以立即得知的...
閱讀 2596·2023-04-25 15:07
閱讀 705·2021-11-24 10:21
閱讀 2298·2021-09-22 10:02
閱讀 3517·2019-08-30 15:43
閱讀 3222·2019-08-30 13:03
閱讀 2287·2019-08-29 17:18
閱讀 3586·2019-08-29 17:07
閱讀 1873·2019-08-29 12:27