摘要:前驅節點的引用后繼節點的引用這個就是線程本尊的數據結構其實也挺簡單的,就是四個屬性而已,大家先要有這個概念在心里。這里需要知道這點進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。
前言
在我們學校java并發編程的時候,并發包絕對是最值得我們研究的資源。在分析并發包源碼的時候,少不了需要AbstractQueuedSynchronizer(以下簡寫AQS),它是整個并發包的基礎工具類,是實現ReentranLock、CountDownLatch、Semaphore、FutrureTask等類的基礎。
本文從ReentranLock的公平鎖源碼出發,分析下AQS怎么工作的,希望能給大家一點幫助。
先來看看 AQS 有哪些屬性,搞清楚這些基本就知道 AQS 是什么套路了,畢竟可以猜嘛!
// 頭結點,你直接把它當做 當前持有鎖的線程 可能是最好理解的 private transient volatile Node head; // 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個隱視的鏈表 private transient volatile Node tail; // 這個是最重要的,不過也是最簡單的,代表當前鎖的狀態,0代表沒有被占用,大于0代表有線程持有當前鎖 // 之所以說大于0,而不是等于1,是因為鎖可以重入嘛,每次重入都加上1 private volatile int state; // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入 // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
怎么樣,看樣子應該是很簡單的吧,畢竟也就四個屬性啊。
AbstractQueuedSynchronizer 的等待隊列示意如下所示,注意了,之后分析過程中所說的 queue,也就是阻塞隊列不包含 head,不包含 head,不包含 head。
等待隊列中每個線程被包裝成一個 node,數據結構是鏈表,一起看看源碼吧:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 標識節點當前在共享模式下 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 標識節點當前在獨占模式下 static final Node EXCLUSIVE = null; // ======== 下面的幾個int常量是給waitStatus用的 =========== /** waitStatus value to indicate thread has cancelled */ // 代碼此線程取消了爭搶這個鎖 static final int CANCELLED = 1; /** waitStatus value to indicate successor"s thread needs unparking */ // 官方的描述是,其表示當前node的后繼節點對應的線程需要被喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 本文不分析condition,所以略過吧,下一篇文章會介紹這個 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 同樣的不分析,略過吧 static final int PROPAGATE = -3; // ===================================================== // 取值為上面的1、-1、-2、-3,或者0(以后會講到) // 這么理解,暫時只需要知道如果這個值 大于0 代表此線程取消了等待, // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。 volatile int waitStatus; // 前驅節點的引用 volatile Node prev; // 后繼節點的引用 volatile Node next; // 這個就是線程本尊 volatile Thread thread; }
Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已,大家先要有這個概念在心里。
上面的是基礎知識,后面會多次用到,心里要時刻記著它們,心里想著這個結構圖就可以了。下面,我們開始說 ReentrantLock 的公平鎖。多嘴一下,我說的阻塞隊列不包含 head 節點。
首先,我們先看下 ReentrantLock 的使用方式。
// 我用個web開發中的service概念吧 public class OrderService { // 使用static,這樣每個線程拿到的是同一把鎖,當然,spring mvc中service默認就是單例,別糾結這個 private static ReentrantLock reentrantLock = new ReentrantLock(true); public void createOrder() { // 比如我們同一時間,只允許一個線程創建訂單 reentrantLock.lock(); // 通常,lock 之后緊跟著 try 語句 try { // 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程), // 其他的線程在lock()方法上阻塞,等待獲取到鎖,再進來 // 執行代碼... // 執行代碼... // 執行代碼... } finally { // 釋放鎖 reentrantLock.unlock(); } } }
ReentrantLock 在內部用了內部類 Sync 來管理鎖,所以真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。Sync 有兩個實現,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖),我們看 FairSync 部分。
abstract static class Sync extends AbstractQueuedSynchronizer { public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } }線程搶鎖
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 爭鎖 final void lock() { acquire(1); } // 來自父類AQS,我直接貼過來這邊,下面分析的時候同樣會這樣做,不會給讀者帶來閱讀壓力 // 我們看到,這個方法,如果tryAcquire(arg) 返回true, 也就結束了。 // 否則,acquireQueued方法會將線程壓到隊列中 public final void acquire(int arg) { // 此時 arg == 1 // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試 // 因為有可能直接就成功了呢,也就不需要進隊列排隊了, // 對于公平鎖的語義就是:本來就沒人持有鎖,根本沒必要進隊列等待(又是掛起,又是等待被喚醒的) if (!tryAcquire(arg) && // tryAcquire(arg)沒有成功,這個時候需要把當前線程掛起,放到阻塞隊列中。 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } /** * Fair version of tryAcquire. Don"t grant access unless * recursive call or no waiters or is first. */ // 嘗試直接獲取鎖,返回值是boolean,代表是否獲取到鎖 // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程本來就持有鎖,也就可以理所當然可以直接獲取 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state == 0 此時此刻沒有線程持有鎖 if (c == 0) { // 雖然此時此刻鎖是可以用的,但是這是公平鎖,既然是公平,就得講究先來后到, // 看看有沒有別人在隊列中等了半天了 if (!hasQueuedPredecessors() && // 如果沒有線程在等待,那就用CAS嘗試一下,成功了就獲取到鎖了, // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_= // 因為剛剛還沒人的,我判斷過了??? compareAndSetState(0, acquires)) { // 到這里就是獲取到鎖了,標記一下,告訴大家,現在是我占用了鎖 setExclusiveOwnerThread(current); return true; } } // 會進入這個else if分支,說明是重入了,需要操作:state=state+1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 如果到這里,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖 // 回到上面一個外層調用方法繼續看: // if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); return false; } // 假設tryAcquire(arg) 返回false,那么代碼將執行: // acquireQueued(addWaiter(Node.EXCLUSIVE), arg), // 這個方法,首先需要執行:addWaiter(Node.EXCLUSIVE) /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ // 此方法的作用是把線程包裝成node,同時進入到隊列中 // 參數mode此時是Node.EXCLUSIVE,代表獨占模式 private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 以下幾行代碼想把當前node加到鏈表的最后面去,也就是進到阻塞隊列的最后 Node pred = tail; // tail!=null => 隊列不為空(tail==head的時候,其實隊列是空的,不過不管這個吧) if (pred != null) { // 設置自己的前驅 為當前的隊尾節點 node.prev = pred; // 用CAS把自己設置為隊尾, 如果成功后,tail == node了 if (compareAndSetTail(pred, node)) { // 進到這里說明設置成功,當前node==tail, 將自己與之前的隊尾相連, // 上面已經有 node.prev = pred // 加上下面這句,也就實現了和之前的尾節點雙向連接了 pred.next = node; // 線程入隊了,可以返回了 return node; } } // 仔細看看上面的代碼,如果會到這里, // 說明 pred==null(隊列是空的) 或者 CAS失敗(有線程在競爭入隊) // 讀者一定要跟上思路,如果沒有跟上,建議先不要往下讀了,往回仔細看,否則會浪費時間的 enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node"s predecessor */ // 采用自旋的方式入隊 // 之前說過,到這個方法只有兩種可能:等待隊列為空,或者有線程競爭入隊, // 自旋在這邊的語義是:CAS設置tail過程中,競爭一次競爭不到,我就多次競爭,總會排到的 private Node enq(final Node node) { for (;;) { Node t = tail; // 之前說過,隊列為空也會進來這里 if (t == null) { // Must initialize // 初始化head節點 // 細心的讀者會知道原來head和tail初始化的時候都是null,反正我不細心 // 還是一步CAS,你懂的,現在可能是很多線程同時進來呢 if (compareAndSetHead(new Node())) // 給后面用:這個時候head節點的waitStatus==0, 看new Node()構造方法就知道了 // 這個時候有了head,但是tail還是null,設置一下, // 把tail指向head,放心,馬上就有線程要來了,到時候tail就要被搶了 // 注意:這里只是設置了tail=head,這里可沒return哦,沒有return,沒有return // 所以,設置完了以后,繼續for循環,下次就到下面的else分支了 tail = head; } else { // 下面幾行,和上一個方法 addWaiter 是一樣的, // 只是這個套在無限循環里,反正就是將當前線程排到隊尾,有線程競爭的話排不上重復排 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } // 現在,又回到這段代碼了 // if (!tryAcquire(arg) // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // selfInterrupt(); // 下面這個方法,參數node,經過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞隊列 // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話, // 意味著上面這段代碼將進入selfInterrupt(),所以正常情況下,下面應該返回false // 這個方法非常重要,應該說真正的線程掛起,然后被喚醒后去獲取鎖,都在這個方法里了 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // p == head 說明當前節點雖然進到了阻塞隊列,但是是阻塞隊列的第一個,因為它的前驅是head // 注意,阻塞隊列不包含head節點,head一般指的是占有鎖的線程,head后面的才稱為阻塞隊列 // 所以當前節點可以去試搶一下鎖 // 這里我們說一下,為什么可以去試試: // 首先,它是隊頭,這個是第一個條件,其次,當前的head有可能是剛剛初始化的node, // enq(node) 方法里面有提到,head是延時初始化的,而且new Node()的時候沒有設置任何線程 // 也就是說,當前的head不屬于任何一個線程,所以作為隊頭,可以去試一試, // tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操作一下state if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 到這里,說明上面的if分支沒有成功,要么當前node本來就不是隊頭, // 要么就是tryAcquire(arg)沒有搶贏別人,繼續往下看 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node"s predecessor holding status * @param node the node * @return {@code true} if thread should block */ // 剛剛說過,會到這里就是沒有搶到鎖唄,這個方法說的是:"當前線程沒有搶到鎖,是否需要掛起當前線程?" // 第一個參數是前驅節點,第二個參數才是代表當前線程的節點 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程需要掛起,直接可以返回true if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 前驅節點 waitStatus大于0 ,之前說過,大于0 說明前驅節點取消了排隊。這里需要知道這點: // 進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。 // 所以下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點, // 簡單說,就是為了找個好爹,因為你還得依賴它來喚醒呢,如果前驅節點取消了排隊, // 找前驅節點的前驅節點做爹,往前循環總能找到一個好爹的 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 仔細想想,如果進入到這個分支意味著什么 // 前驅節點的waitStatus不等于-1和1,那也就是只可能是0,-2,-3 // 在我們前面的源碼中,都沒有看到有設置waitStatus的,所以每個新的node入隊時,waitStatu都是0 // 用CAS將前驅節點的waitStatus設置為Node.SIGNAL(也就是-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) // 這個方法結束根據返回值我們簡單分析下: // 如果返回true, 說明前驅節點的waitStatus==-1,是正常情況,那么當前線程需要被掛起,等待以后被喚醒 // 我們也說過,以后是被前驅節點喚醒,就等著前驅節點拿到鎖,然后釋放鎖的時候叫你好了 // 如果返回false, 說明當前不需要被掛起,為什么呢?往后看 // 跳回到前面是這個方法 // if (shouldParkAfterFailedAcquire(p, node) && // parkAndCheckInterrupt()) // interrupted = true; // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true, // 那么需要執行parkAndCheckInterrupt(): // 這個方法很簡單,因為前面返回true,所以需要掛起線程,這個方法就是負責掛起線程的 // 這里用了LockSupport.park(this)來掛起線程,然后就停在這里了,等待被喚醒======= private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } // 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況 // 仔細看shouldParkAfterFailedAcquire(p, node),我們可以發現,其實第一次進來的時候,一般都不會返回true的,原因很簡單,前驅節點的waitStatus=-1是依賴于后繼節點設置的。也就是說,我都還沒給前驅設置-1呢,怎么可能是true呢,但是要看到,這個方法是套在循環里的,所以第二次進來的時候狀態就是-1了。 // 解釋下為什么shouldParkAfterFailedAcquire(p, node)返回false的時候不直接掛起線程: // => 是為了應對在經過這個方法后,node已經是head的直接后繼節點了。剩下的讀者自己想想吧。
說到這里,也就明白了,多看幾遍 final boolean acquireQueued(final Node node, int arg) 這個方法吧。自己推演下各個分支怎么走,哪種情況下會發生什么,走到哪里。
解鎖操作最后,就是還需要介紹下喚醒的動作了。我們知道,正常情況下,如果線程沒獲取到鎖,線程會被 LockSupport.park(this); 掛起停止,等待被喚醒。
// 喚醒的代碼還是比較簡單的,你如果上面加鎖的都看懂了,下面都不需要看就知道怎么回事了 public void unlock() { sync.release(1); } public final boolean release(int arg) { // 往后看吧 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 回到ReentrantLock看tryRelease方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否完全釋放鎖 boolean free = false; // 其實就是重入的問題,如果c==0,也就是說沒有嵌套鎖了,可以釋放了,否則還不能釋放掉 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node"s successor, if one exists. * * @param node the node */ // 喚醒后繼節點 // 從上面調用處知道,參數node是head頭結點 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 如果head節點當前waitStatus<0, 將其修改為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待(waitStatus==1) // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 從后往前找,仔細看代碼,不必擔心中間有節點取消(waitStatus==1)的情況 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒線程 LockSupport.unpark(s.thread); }
喚醒線程以后,被喚醒的線程將從以下代碼中繼續往前走:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 剛剛線程被掛起在這里了 return Thread.interrupted(); } // 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了
好了,后面就不分析源碼了,剩下的還有問題自己去仔細看看代碼吧。
總結總結一下吧。
在并發環境下,加鎖和解鎖需要以下三個部件的協調:
鎖狀態。我們要知道鎖是不是被別的線程占有了,這個就是 state 的作用,它為 0 的時候代表沒有線程占有鎖,可以去爭搶這個鎖,用 CAS 將 state 設為 1,如果 CAS 成功,說明搶到了鎖,這樣其他線程就搶不到了,如果鎖重入的話,state進行+1 就可以,解鎖就是減 1,直到 state 又變為 0,代表釋放鎖,所以 lock() 和 unlock() 必須要配對啊。然后喚醒等待隊列中的第一個線程,讓其來占有鎖。
線程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 來掛起線程,用 unpark 來喚醒線程。
阻塞隊列。因為爭搶鎖的線程可能很多,但是只能有一個線程拿到鎖,其他的線程都必須等待,這個時候就需要一個 queue 來管理這些線程,AQS 用的是一個 FIFO 的隊列,就是一個鏈表,每個 node 都持有后繼節點的引用。AQS 采用了 CLH 鎖的變體來實現,感興趣的讀者可以參考這篇文章關于CLH的介紹,寫得簡單明了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72340.html
摘要:作者畢來生微信鎖狀態轉換分類以后幫助我們提供了線程同步機制,通過顯示定義同步鎖來實現對象之間的同步。等待重新嘗試因為在中是用關鍵字聲明的,故可以在線程間可見再次判斷一下能否持有鎖可能線程同步代碼執行得比較快,已經釋放了鎖,不可以就返回。 作者 : 畢來生微信: 878799579 鎖狀態轉換 showImg(https://segmentfault.com/img/remote/...
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:用于生成虛擬機當前時刻的線程快照。線程快照就是當前虛擬機內每一條線程正在執行的方法堆棧的集合,生成線程快照的主要目的就是定位線程出現長時間停頓的原因,如線程死鎖死循環請求外部資源導致的長時間等待等都是導致線程長時間停頓的常見原因。 在JDK的命令行中,一般開發人員最耳熟能詳的肯定就是java,javac,javap等常用命令,不過在jdk/bin下還有許多其他的命令行工具,它們被用來監...
閱讀 2285·2021-11-15 11:37
閱讀 2954·2021-09-01 10:41
閱讀 787·2019-12-27 11:58
閱讀 747·2019-08-30 15:54
閱讀 715·2019-08-30 13:52
閱讀 2930·2019-08-29 12:22
閱讀 1075·2019-08-28 18:27
閱讀 1452·2019-08-26 18:42