摘要:的好處在于,在診斷問題的時候能夠知道的原因推薦使用帶有的操作函數(shù)作用用于掛起當(dāng)前線程,如果許可可用,會立馬返回,并消費掉許可。
LockSupport是用來創(chuàng)建locks的基本線程阻塞基元,比如AQS中實現(xiàn)線程掛起的方法,就是park,對應(yīng)喚醒就是unpark。JDK中有使用的如下
LockSupport提供的是一個許可,如果存在許可,線程在調(diào)用park的時候,會立馬返回,此時許可也會被消費掉,如果沒有許可,則會阻塞。調(diào)用unpark的時候,如果許可本身不可用,則會使得許可可用
許可只有一個,不可累加park源碼跟蹤
park的聲明形式有一下兩大塊
一部分多了一個Object參數(shù),作為blocker,另外的則沒有。blocker的好處在于,在診斷問題的時候能夠知道park的原因
推薦使用帶有Object的park操作park函數(shù)作用
park用于掛起當(dāng)前線程,如果許可可用,會立馬返回,并消費掉許可。
park(Object): 恢復(fù)的條件為 1:線程調(diào)用了unpark; 2:其它線程中斷了線程;3:發(fā)生了不可預(yù)料的事情
parkNanos(Object blocker, long nanos):恢復(fù)的條件為 1:線程調(diào)用了unpark; 2:其它線程中斷了線程;3:發(fā)生了不可預(yù)料的事情;4:過期時間到了
parkUntil(Object blocker, long deadline):恢復(fù)的條件為 1:線程調(diào)用了unpark; 2:其它線程中斷了線程;3:發(fā)生了不可預(yù)料的事情;4:指定的deadLine已經(jīng)到了
以park的源碼為例
public static void park(Object blocker) { //獲取當(dāng)前線程 Thread t = Thread.currentThread(); //記錄當(dāng)前線程阻塞的原因,底層就是unsafe.putObject,就是把對象存儲起來 setBlocker(t, blocker); //執(zhí)行park unsafe.park(false, 0L); //線程恢復(fù)后,去掉阻塞原因 setBlocker(t, null); }
從源碼可以看到真實的實現(xiàn)均在 unsafe
unsafe.park核心實現(xiàn)如下
JavaThread* thread=JavaThread::thread_from_jni_environment(env); ... thread->parker()->park(isAbsolute != 0, time);
就是獲取java線程的parker對象,然后執(zhí)行它的park方法。Parker的定義如下
class Parker : public os::PlatformParker { private: //表示許可 volatile int _counter ; Parker * FreeNext ; JavaThread * AssociatedWith ; // Current association public: Parker() : PlatformParker() { //初始化_counter _counter = 0 ; FreeNext = NULL ; AssociatedWith = NULL ; } protected: ~Parker() { ShouldNotReachHere(); } public: void park(bool isAbsolute, jlong time); void unpark(); // Lifecycle operators static Parker * Allocate (JavaThread * t) ; static void Release (Parker * e) ; private: static Parker * volatile FreeList ; static volatile int ListLock ; };
它繼承了os::PlatformParker,內(nèi)置了一個volatitle的 _counter。PlatformParker則是在不同的操作系統(tǒng)中有不同的實現(xiàn),以linux為例
class PlatformParker : public CHeapObj { protected: //互斥變量類型 pthread_mutex_t _mutex [1] ; //條件變量類型 pthread_cond_t _cond [1] ; public: ~PlatformParker() { guarantee (0, "invariant") ; } public: PlatformParker() { int status; //初始化條件變量,使用 pthread_cond_t之前必須先執(zhí)行初始化 status = pthread_cond_init (_cond, NULL); assert_status(status == 0, status, "cond_init”); // 初始化互斥變量,使用 pthread_mutex_t之前必須先執(zhí)行初始化 status = pthread_mutex_init (_mutex, NULL); assert_status(status == 0, status, "mutex_init"); } }
上述代碼均為POSIX線程接口使用,所以pthread指的也就是posixThread
parker實現(xiàn)如下
void Parker::park(bool isAbsolute, jlong time) { if (_counter > 0) { //已經(jīng)有許可了,用掉當(dāng)前許可 _counter = 0 ; //使用內(nèi)存屏障,確保 _counter賦值為0(寫入操作)能夠被內(nèi)存屏障之后的讀操作獲取內(nèi)存屏障事前的結(jié)果,也就是能夠正確的讀到0 OrderAccess::fence(); //立即返回 return ; } Thread* thread = Thread::current(); assert(thread->is_Java_thread(), "Must be JavaThread"); JavaThread *jt = (JavaThread *)thread; if (Thread::is_interrupted(thread, false)) { // 線程執(zhí)行了中斷,返回 return; } if (time < 0 || (isAbsolute && time == 0) ) { //時間到了,或者是代表絕對時間,同時絕對時間是0(此時也是時間到了),直接返回,java中的parkUtil傳的就是絕對時間,其它都不是 return; } if (time > 0) { //傳入了時間參數(shù),將其存入absTime,并解析成absTime->tv_sec(秒)和absTime->tv_nsec(納秒)存儲起來,存的是絕對時間 unpackTime(&absTime, isAbsolute, time); } //進(jìn)入safepoint region,更改線程為阻塞狀態(tài) ThreadBlockInVM tbivm(jt); if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { //如果線程被中斷,或者是在嘗試給互斥變量加鎖的過程中,加鎖失敗,比如被其它線程鎖住了,直接返回 return; } //這里表示線程互斥變量鎖成功了 int status ; if (_counter > 0) { // 有許可了,返回 _counter = 0; //對互斥變量解鎖 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; OrderAccess::fence(); return; } #ifdef ASSERT // Don"t catch signals while blocked; let the running threads have the signals. // (This allows a debugger to break into the running thread.) //debug用 sigset_t oldsigs; sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals(); pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs); #endif //將java線程所擁有的操作系統(tǒng)線程設(shè)置成 CONDVAR_WAIT狀態(tài) ,表示在等待某個條件的發(fā)生 OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); //將java的_suspend_equivalent參數(shù)設(shè)置為true jt->set_suspend_equivalent(); // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self() if (time == 0) { //把調(diào)用線程放到等待條件的線程列表上,然后對互斥變量解鎖,(這兩是原子操作),這個時候線程進(jìn)入等待,當(dāng)它返回時,互斥變量再次被鎖住。 //成功返回0,否則返回錯誤編號 status = pthread_cond_wait (_cond, _mutex) ; } else { //同pthread_cond_wait,只是多了一個超時,如果超時還沒有條件出現(xiàn),那么重新獲取胡吃兩然后返回錯誤碼 ETIMEDOUT status = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ; if (status != 0 && WorkAroundNPTLTimedWaitHang) { //WorkAroundNPTLTimedWaitHang 是JVM的運行參數(shù),默認(rèn)為1 //去除初始化 pthread_cond_destroy (_cond) ; //重新初始化 pthread_cond_init (_cond, NULL); } } assert_status(status == 0 || status == EINTR || status == ETIME || status == ETIMEDOUT, status, "cond_timedwait"); #ifdef ASSERT pthread_sigmask(SIG_SETMASK, &oldsigs, NULL); #endif //等待結(jié)束后,許可被消耗,改為0 _counter = 0 ; //釋放互斥量的鎖 status = pthread_mutex_unlock(_mutex) ; assert_status(status == 0, status, "invariant") ; // If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) { jt->java_suspend_self(); } //加入內(nèi)存屏障指令 OrderAccess::fence(); }
從park的實現(xiàn)可以看到
無論是什么情況返回,park方法本身都不會告知調(diào)用方返回的原因,所以調(diào)用的時候一般都會去判斷返回的場景,根據(jù)場景做不同的處理
線程的等待與掛起、喚醒等等就是使用的POSIX的線程API
park的許可通過原子變量_count實現(xiàn),當(dāng)被消耗時,_count為0,只要擁有許可,就會立即返回
OrderAccess::fence();在linux中實現(xiàn)原理如下
inline void OrderAccess::fence() { if (os::is_MP()) { #ifdef AMD64 // 沒有使用mfence,因為mfence有時候性能差于使用 locked addl __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory"); #else __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory"); #endif } }
內(nèi)存重排序網(wǎng)上的驗證ThreadBlockInVM tbivm(jt)
這屬于C++新建變量的語法,也就是調(diào)用構(gòu)造函數(shù)新建了一個變量,變量名為tbivm,參數(shù)為jt。類的實現(xiàn)為
class ThreadBlockInVM : public ThreadStateTransition { public: ThreadBlockInVM(JavaThread *thread) : ThreadStateTransition(thread) { // Once we are blocked vm expects stack to be walkable thread->frame_anchor()->make_walkable(thread); //把線程由運行狀態(tài)轉(zhuǎn)成阻塞狀態(tài) trans_and_fence(_thread_in_vm, _thread_blocked); } ... };
_thread_in_vm 表示線程當(dāng)前在VM中執(zhí)行,_thread_blocked表示線程當(dāng)前阻塞了,他們是globalDefinitions.hpp中定義的枚舉
//這個枚舉是用來追蹤線程在代碼的那一塊執(zhí)行,用來給 safepoint code使用,有4種重要的類型,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的狀態(tài)都是中間狀態(tài),表示線程正在由一種狀態(tài)變成另一種狀態(tài),這種方式使得 safepoint code在處理線程狀態(tài)時,不需要對線程進(jìn)行掛起,使得safe point code運行更快,而給定一個狀態(tài),通過+1就可以得到他的轉(zhuǎn)換狀態(tài) enum JavaThreadState { _thread_uninitialized = 0, // should never happen (missing initialization) _thread_new = 2, // just starting up, i.e., in process of being initialized _thread_new_trans = 3, // corresponding transition state (not used, included for completeness) _thread_in_native = 4, // running in native code . This is a safepoint region, since all oops will be in jobject handles _thread_in_native_trans = 5, // corresponding transition state _thread_in_vm = 6, // running in VM _thread_in_vm_trans = 7, // corresponding transition state _thread_in_Java = 8, // Executing either interpreted or compiled Java code running in Java or in stub code _thread_in_Java_trans = 9, // corresponding transition state (not used, included for completeness) _thread_blocked = 10, // blocked in vm _thread_blocked_trans = 11, // corresponding transition state _thread_max_state = 12 // maximum thread state+1 - used for statistics allocation };
父類ThreadStateTransition中定義trans_and_fence如下
void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即構(gòu)造函數(shù)傳進(jìn)來de thread // transition_and_fence must be used on any thread state transition // where there might not be a Java call stub on the stack, in // particular on Windows where the Structured Exception Handler is // set up in the call stub. os::write_memory_serialize_page() can // fault and we can"t recover from it on Windows without a SEH in // place. //transition_and_fence方法必須在任何線程狀態(tài)轉(zhuǎn)換的時候使用 static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) { assert(thread->thread_state() == from, "coming from wrong thread state"); assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states"); //標(biāo)識線程轉(zhuǎn)換中 thread->set_thread_state((JavaThreadState)(from + 1)); // 設(shè)置內(nèi)存屏障,確保新的狀態(tài)能夠被VM 線程看到 if (os::is_MP()) { if (UseMembar) { // Force a fence between the write above and read below OrderAccess::fence(); } else { // Must use this rather than serialization page in particular on Windows InterfaceSupport::serialize_memory(thread); } } if (SafepointSynchronize::do_call_back()) { SafepointSynchronize::block(thread); } //線程狀態(tài)轉(zhuǎn)換成最終的狀態(tài),對待這里的場景就是阻塞 thread->set_thread_state(to); CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();) }操作系統(tǒng)線程狀態(tài)的一般取值
在osThread中給定了操作系統(tǒng)線程狀態(tài)的大致取值,它本身是依據(jù)平臺而定
enum ThreadState { ALLOCATED, // Memory has been allocated but not initialized INITIALIZED, // The thread has been initialized but yet started RUNNABLE, // Has been started and is runnable, but not necessarily running MONITOR_WAIT, // Waiting on a contended monitor lock CONDVAR_WAIT, // Waiting on a condition variable OBJECT_WAIT, // Waiting on an Object.wait() call BREAKPOINTED, // Suspended at breakpoint SLEEPING, // Thread.sleep() ZOMBIE // All done, but not reclaimed yet };unpark 源碼追蹤
實現(xiàn)如下
void Parker::unpark() { int s, status ; //給互斥量加鎖,如果互斥量已經(jīng)上鎖,則阻塞到互斥量被解鎖 //park進(jìn)入wait時,_mutex會被釋放 status = pthread_mutex_lock(_mutex); assert (status == 0, "invariant") ; //存儲舊的_counter s = _counter; //許可改為1,每次調(diào)用都設(shè)置成發(fā)放許可 _counter = 1; if (s < 1) { //之前沒有許可 if (WorkAroundNPTLTimedWaitHang) { //默認(rèn)執(zhí)行 ,釋放信號,表明條件已經(jīng)滿足,將喚醒等待的線程 status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; //釋放鎖 status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } else { status = pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; status = pthread_cond_signal (_cond) ; assert (status == 0, "invariant") ; } } else { //一直有許可,釋放掉自己加的鎖,有許可park本身就返回了 pthread_mutex_unlock(_mutex); assert (status == 0, "invariant") ; } }
從源碼可知unpark本身就是發(fā)放許可,并通知等待的線程,已經(jīng)可以結(jié)束等待了
總結(jié)park/unpark能夠精準(zhǔn)的對線程進(jìn)行喚醒和等待。
linux上的實現(xiàn)是通過POSIX的線程API的等待、喚醒、互斥、條件來進(jìn)行實現(xiàn)的
park在執(zhí)行過程中首選看是否有許可,有許可就立馬返回,而每次unpark都會給許可設(shè)置成有,這意味著,可以先執(zhí)行unpark,給予許可,再執(zhí)行park立馬自行,適用于producer快,而consumer還未完成的場景參考地址
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/72726.html
摘要:此對象在線程受阻塞時被記錄,以允許監(jiān)視工具和診斷工具確定線程受阻塞的原因。阻塞當(dāng)前線程,最長不超過納秒,返回條件在的基礎(chǔ)上增加了超時返回。喚醒線程喚醒處于阻塞狀態(tài)的線程。 LockSupport 用法簡介 LockSupport 和 CAS 是Java并發(fā)包中很多并發(fā)工具控制機(jī)制的基礎(chǔ),它們底層其實都是依賴Unsafe實現(xiàn)。 LockSupport是用來創(chuàng)建鎖和其他同步類的基本線程阻塞...
摘要:此對象在線程受阻塞時被記錄,以允許監(jiān)視工具和診斷工具確定線程受阻塞的原因。調(diào)用該線程變量的方法,會喚醒該線程,并拋出異常。對于等待狀態(tài)來說,它比狀態(tài)多了一種喚醒方式,就是超過規(guī)定時間,那么線程會自動醒來。 一. LockSupport類介紹 LockSupport類可以阻塞當(dāng)前線程以及喚醒指定被阻塞的線程。主要是通過park()和unpark(thread)方法來實現(xiàn)阻塞和喚醒線程的操...
摘要:很多開發(fā)人員包括我,尤其是剛進(jìn)入軟件行業(yè)的新手,認(rèn)為設(shè)置線程中斷就是表示線程停止了,不往前執(zhí)行了,其實不是這樣的,線程中斷只是一個狀態(tài)而已,表示已中斷,表示未中斷獲取線程中斷狀態(tài),如果中斷了返回否則返回設(shè)置線程中斷不影響線程的繼續(xù)執(zhí)行,但是 ? ? 很多Java開發(fā)人員(包括我),尤其是剛進(jìn)入軟件行業(yè)的新手,認(rèn)為Java設(shè)置線程中斷就是表示線程停止了,不往前執(zhí)行了, Thread.cu...
摘要:初始時,為,當(dāng)調(diào)用方法時,線程的加,當(dāng)調(diào)用方法時,如果為,則調(diào)用線程進(jìn)入阻塞狀態(tài)。該對象一般供監(jiān)視診斷工具確定線程受阻塞的原因時使用。 showImg(https://segmentfault.com/img/remote/1460000016012503); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、LockSupport類簡介...
摘要:源碼閱讀創(chuàng)建鎖和同步類中使用的基礎(chǔ)的線程阻塞原語除非你是多線程專家,而且你要自己設(shè)計和實現(xiàn)阻塞式線程同步機(jī)制比如等等,否則你不需要用和。 LockSupport源碼閱讀 /* * 創(chuàng)建鎖和同步類中使用的基礎(chǔ)的線程阻塞原語 * * 除非你是多線程專家,而且你要自己設(shè)計和實現(xiàn)阻塞式線程同步機(jī)制(比如lock、condition等等),否則你不需要用park和unpark。這兩個原語是...
閱讀 3773·2021-11-23 09:51
閱讀 4386·2021-11-15 11:37
閱讀 3523·2021-09-02 15:21
閱讀 2746·2021-09-01 10:31
閱讀 879·2021-08-31 14:19
閱讀 852·2021-08-11 11:20
閱讀 3308·2021-07-30 15:30
閱讀 1689·2019-08-30 15:54