摘要:獲取資源失敗,將該線程加入等待隊列尾部,標記為獨占模式。如果有剩余資源則會喚醒下一個線程,且整個過程忽略中斷的影響。
AQS概念及定義
ASQ:AbstractQueuedSynchronizer
它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列,有個內部類Node定義了節點。隊列由AQS的volatile成員變量head和tail組成一個雙向鏈表)
資源共享方式AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch)。
自定義同步器AQS是抽象類,使用了模板方法設計模式,已經將流程定義好,且實現了對等待隊列的維護,因此實現者只需要按需實現AQS預留的四個方法即可。
isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。
核心方法分析1.1 acquire(int)
方法定義此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。
方法源碼/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }函數流程
1. tryAcquire():嘗試獲取資源。 2. addWaiter(Node.EXCLUSIVE):獲取資源失敗,將該線程加入等待隊列尾部,標記為獨占模式。 3. acquireQueued(Node,int):獲取該node指定數量的資源數,會一直等待成功獲取才返回,返回值是在獲取期間是否中斷過源碼分析
1. tryAcquire()
/** * Attempts to acquire in exclusive mode. This method should query * if the state of the object permits it to be acquired in the * exclusive mode, and if so to acquire it. * *This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. This can be used * to implement method {@link Lock#tryLock()}. * *
The default * implementation throws {@link UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return {@code true} if successful. Upon success, this object has * been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */ protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
這是個抽象方法,用于給實現者自定義實現,此方法嘗試去獲取獨占資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義。
2. addWaiter(Node)
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { //新建Node Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //快速嘗試一次,使用CAS將node放到隊尾,失敗調用enq Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //保證將Node放入隊尾 enq(node); return node; }
enq源碼
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node"s predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; //如果尾節點為空,說明隊列還未進行初始化 if (t == null) { // Must initialize //CAS設置頭結點 if (compareAndSetHead(new Node())) //初始頭尾相同,從下一次循環開始嘗試加入新Node tail = head; } else { node.prev = t; //CAS將當前節點設置為尾節點 if (compareAndSetTail(t, node)) { //設置成功返回當前節點 t.next = node; return t; } } } }
3. acquireQueued(Node, int)
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { //標志是否成功獲取資源 boolean failed = true; try { //是否被中斷 boolean interrupted = false; for (;;) { //獲取前驅Node final Node p = node.predecessor(); //如果自己是隊列中第二個節點,那會進行嘗試獲取,進入這里判斷要么是一次,要么是被前驅節點給unPark喚醒了。 if (p == head && tryAcquire(arg)) { //成功獲取資源,設置自身為頭節點,將原來的頭結點剝離隊列 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //判斷是否需要被park,如果需要進行park并檢測是否被中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果獲取資源失敗了將當前node取消, if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire方法
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node"s predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //如果前驅的狀態已經是signal,代表前驅釋放是會通知喚醒你,那么此node可以安心被park if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //如果前驅已經被取消,那么從當前node一直往前找,直到有非取消的node,直接排在它的后面,此時不需要park,會出去再嘗試一次獲取資源。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //前驅節點沒有被取消,那么告訴前驅節點釋放的時候通知自己 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt()
/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { //讓該線程進入wait狀態 LockSupport.park(this); //返回期間是被中斷過 return Thread.interrupted(); }
acquireQueued流程總結
檢查自己是否是老二,且是否能獲得資源,能獲得自己成為head節點,否則進入流程2。
2.找到“有效”(not canceled)的前驅,并通知前驅釋放了要“通知”(watiStatus=signal)我,安心被park。
3。被前驅unpark,或interrrupt(),繼續流程1。
首先調用實現者實現的tryAcquire()去獲取資源,如果成功則直接返回。
如果失敗,則新建一個獨占模式的節點加到隊列尾部。
通知一個有效的前驅記得釋放時喚醒自己,在喚醒時自己再進行不斷tryAcquire()直到獲取到資源,返回是否被中斷過。
如果等待過程中被中斷過,則將將中斷補上,調用當前線程的interrupt().
至此acquire流程完結,
1.2 release(int)
方法定義此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。這也正是unlock()的語義。
方法源碼/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { //調用實現者的嘗試解鎖方法,因為已經獲得鎖,所以基本不會失敗 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //喚醒下一個節點 unparkSuccessor(h); return true; } return false; }
unparkSuccessor()
/** * Wakes up node"s successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0)//設置當前節點的狀態允許失敗,失敗了也沒關系。 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //找到下一個需要被喚醒的節點 Node s = node.next; if (s == null || s.waitStatus > 0) {//如果是空或被取消 s = null; //從尾節點開始尋找,直到找到最前面的有效的節點。因為鎖已經釋放,所以從尾節點開始找可以避免因為高并發下復雜的隊列動態變化帶來的邏輯判斷, for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }release小結
首先調用實現者的tryRelease(),失敗則返回false
成功則找到下一個有效的節點并喚醒它。
注意實現者實現tryRelease應該是當state為0時才返回
1.3 acquireShared(int)
方法定義此方法是共享模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回。如果有剩余資源則會喚醒下一個線程,否則進入wait,且整個過程忽略中斷的影響。
方法源碼/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { //嘗試獲取指定數量資源 if (tryAcquireShared(arg) < 0) //獲取資源直到成功 doAcquireShared(arg); }
共享模式下的流程與獨占模式極為相似,首先根據tryAcquireShared(arg)嘗試是否能獲取到資源,能則直接返回,不能則會進入隊列按入隊順序依次喚醒嘗試獲取。
tryAcquireShared(int)
/** * Attempts to acquire in shared mode. This method should query if * the state of the object permits it to be acquired in the shared * mode, and if so to acquire it. * *This method is always invoked by the thread performing * acquire. If this method reports failure, the acquire method * may queue the thread, if it is not already queued, until it is * signalled by a release from some other thread. * *
The default implementation throws {@link * UnsupportedOperationException}. * * @param arg the acquire argument. This value is always the one * passed to an acquire method, or is the value saved on entry * to a condition wait. The value is otherwise uninterpreted * and can represent anything you like. * @return a negative value on failure; zero if acquisition in shared * mode succeeded but no subsequent shared-mode acquire can * succeed; and a positive value if acquisition in shared * mode succeeded and subsequent shared-mode acquires might * also succeed, in which case a subsequent waiting thread * must check availability. (Support for three different * return values enables this method to be used in contexts * where acquires only sometimes act exclusively.) Upon * success, this object has been acquired. * @throws IllegalMonitorStateException if acquiring would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if shared mode is not supported */ protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
這是AQS預留給實現者的方法,用于共享模式下嘗試獲取指定數量的資源,返回值<0代表獲取失敗,=0代表獲取成功且無剩余資源,>0代表還有剩余資源
doAcquireShared(int)方法用于共享模式獲取資源會直到獲取成功才返回
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { //添加當前線程的Node模式為共享模式至隊尾, final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //獲取前驅節點 final Node p = node.predecessor(); //如果自己是老二才有嘗試的資格 if (p == head) { //嘗試獲取指定數量資源 int r = tryAcquireShared(arg); if (r >= 0) { //如果成功獲取,將當前節點設置為頭節點,如果有剩余資源喚醒下一有效節點 setHeadAndPropagate(node, r); p.next = null; // help GC //如果有中斷,自己補償中斷 if (interrupted) selfInterrupt(); failed = false; return; } } //判斷是否需要被park,和park后檢查是否被中弄斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { //如果獲取失敗,取消當前節點 if (failed) cancelAcquire(node); } }
流程和獨占模式幾乎一模一樣,但是代碼的書寫缺有不同,不知原作者是咋想的。區別于獨占不同的有兩點
添加模式為SHARED1的Node。
在成功獲取到資源后,設置當前節點為head節點時,如果還有剩余資源的話,會喚醒下一個有效的節點,如果資源數量不夠下一節點,下一節點會一直等待,直到其它節點釋放,并不會讓步給后面的節點,取決于FIFO的按順序出隊。
setHeadAndPropagate()看有剩余資源的時候如何喚醒下一節點
/** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //將當前節點設置為head節點 setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don"t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //如果有剩余資源 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //當下一個有效節點存在且是共享模式時,會喚醒它 if (s == null || s.isShared()) doReleaseShared(); } }
doReleaseShared()喚醒下一共享模式節點
/** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //如果頭結點狀態是“通知后繼” if (ws == Node.SIGNAL) { //將其狀態改為0,表示已通知 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //喚醒后繼 unparkSuccessor(h); } //如果已通知后繼,則改為可傳播,在下次acquire中的shouldParkAfterFailedAcquire會將改為SIGNAL else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果頭結點變了,再次循環 if (h == head) // loop if head changed break; } }acquireShared小結
共享模式acquire與獨占模式技術相同,唯一的不同就是在于如果當前節點獲取資源成功且有剩余則會喚醒下一節點,資源可以為多個線程功能分配,而獨占模式則就是一個線程獨占。
1.4 releaseShared(int)
方法定義此方法是共享模式下線程釋放共享資源的頂層入口。如果釋放資源成功,直接返回。如果有剩余資源則會喚醒下一個線程,且整個過程忽略中斷的影響。
方法源碼/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { //嘗試共享模式獲取資源 if (tryReleaseShared(arg)) { //喚醒下一節點 doReleaseShared(); return true; } return false; }AQS的源碼分析就到這里為止由于本人目前功力尚淺,對AQS的理解停留在代碼級別,下此會將應用補上,如有不對和遺漏歡迎各位補充。
參考文章
Java并發之AQS詳解
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68444.html
為什么讀webpack源碼 因為前端框架離不開webpack,天天都在用的東西啊,怎能不研究 讀源碼能學到很多做項目看書學不到的東西,比如說架構,構造函數,es6很邊緣的用法,甚至給函數命名也會潛移默化的影響等 想寫源碼,不看源碼怎么行,雖然現在還不知道寫什么,就算不寫什么,看看別人寫的總可以吧 知道世界的廣闊,那么多插件,那么多軟件開發師,他們在做什么,同樣是寫js的,怎么他們能這么偉大 好奇...
摘要:前言本文的目的是閱讀理解的源碼,作為集合中重要的一個角色,平時用到十分多的一個類,深入理解它,知其所以然很重要。 前言 本文的目的是閱讀理解HashMap的源碼,作為集合中重要的一個角色,平時用到十分多的一個類,深入理解它,知其所以然很重要。本文基于Jdk1.7,因為Jdk1.8改變了HashMap的數據結構,進行了優化,我們先從基礎閱讀,之后再閱讀理解Jdk1.8的內容 HashMa...
摘要:大多的初學者都會使用中間件來處理異步請求,其理解簡單使用方便具體使用可參考官方文檔。源碼的源碼非常簡潔,出去空格一共只有行,這行中如果不算上則只有行。官方文檔中的一節講解的非常好,也確實幫我理解了中間件的工作原理,非常推薦閱讀。 總覺得文章也應該是有生命力的,歡迎關注我的Github上的博客,這里的文章會依據我本人的見識,逐步更新。 大多redux的初學者都會使用redux-thunk...
摘要:主要邏輯本質上還是回調函數那一套。通過的判斷完成異步和同步的區分。 主要邏輯: 本質上還是回調函數那一套。通過_subscribers的判斷完成異步和同步的區分。通過 resolve,reject -> publish -> invokeCallback -> resolve,reject的遞歸和下一條then的parent是上一條的child來完成then鏈的流轉 同步情況...
摘要:進入傳入地址出來一個復雜對象把掛載到對象上太復雜我們先看可以緩存輸入的文件系統輸入文件系統輸出文件系統,掛載到對象傳入輸入文件,監視文件系統,掛載到對象添加事件流打開插件讀取目錄下文件對文件名進行格式化異步讀取目錄下文件同步方法就 進入webpack.js //傳入地址,new Compiler出來一個復雜對象 compiler = new Compiler(options.conte...
閱讀 1802·2021-11-24 09:39
閱讀 2290·2021-09-30 09:47
閱讀 4144·2021-09-22 15:57
閱讀 1873·2019-08-29 18:36
閱讀 3577·2019-08-29 12:21
閱讀 590·2019-08-29 12:17
閱讀 1263·2019-08-29 11:25
閱讀 724·2019-08-28 18:26