摘要:如果此時,鎖被釋放,需要通知等待線程再次嘗試獲取鎖,公平鎖會讓最先進入隊列的線程獲得鎖。等待隊列節點的操作由于進入阻塞狀態的操作會降低執行效率,所以,會盡力避免試圖獲取獨占性變量的線程進入阻塞狀態。
?今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供隊列式同步器,比如說常用的ReentranLock,Semaphore和CountDownLatch等。
?為了方便理解,我們以一段使用ReentranLock的代碼為例,講解ReentranLock每個方法中有關AQS的使用。
?我們都知道ReentranLock的加鎖行為和Synchronized類似,都是可重入的鎖,但是二者的實現方式確實完全不同的,我們之后也會講解Synchronized的原理。除此之外,Synchronized的阻塞無法被中斷,而ReentrantLock則提供了可中斷的阻塞。下面的代碼是ReentranLock的函數,我們就以此為順序,依次講解這些函數背后的實現原理。
ReentrantLock lock = new ReentrantLock(); lock.lock(); lock.unlock();公平鎖和非公平鎖
?ReentranLock分為公平鎖和非公平鎖,二者的區別就在獲取鎖機會是否和排隊順序相關。我們都知道,如果鎖被另一個線程持有,那么申請鎖的其他線程會被掛起等待,加入等待隊列。理論上,先調用lock函數被掛起等待的線程應該排在等待隊列的前端,后調用的就排在后邊。如果此時,鎖被釋放,需要通知等待線程再次嘗試獲取鎖,公平鎖會讓最先進入隊列的線程獲得鎖。而非公平鎖則會喚醒所有線程,讓它們再次嘗試獲取鎖,所以可能會導致后來的線程先獲得了鎖,則就是非公平。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
?我們會發現FairSync和NonfairSync都繼承了Sync類,而Sync的父類就是AbstractQueuedSynchronizer(后續簡稱AQS)。但是AQS的構造函數是空的,并沒有任何操作。
?之后的源碼分析,如果沒有特別說明,就是指公平鎖。
?ReentranLock的lock函數如下所示,直接調用了sync的lock函數。也就是調用了FairSync的lock函數。
//ReentranLock public void lock() { sync.lock(); } //FairSync final void lock() { //調用了AQS的acquire函數,這是關鍵函數之一 acquire(1); }
?我們接下來就正式開始AQS相關的源碼分析了,acquire函數的作用是獲取同一時間段內只能被一個線程獲取的量,這個量就是抽象化的鎖概念。我們先分析代碼,你慢慢就會明白其中的含義。
public final void acquire(int arg) { // tryAcquire先嘗試獲取"鎖",獲取了就不進入后續流程 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //addWaiter是給當前線程創建一個節點,并將其加入等待隊列 //acquireQueued是當線程已經加入等待隊列之后繼續嘗試獲取鎖. selfInterrupt(); }
?tryAcquire,addWaiter和acquireQueued都是十分重要的函數,我們接下來依次學習一下這些函數,理解它們的作用。
//AQS類中的變量. private volatile int state; //這是FairSync的實現,AQS中未實現,子類按照自己的需要實現該函數 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //獲取AQS中的state變量,代表抽象概念的鎖. int c = getState(); if (c == 0) { //值為0,那么當前獨占性變量還未被線程占有 //如果當前阻塞隊列上沒有先來的線程在等待,UnfairSync這里的實現就不一致 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //成功cas,那么代表當前線程獲得該變量的所有權,也就是說成功獲得鎖 setExclusiveOwnerThread(current); // setExclusiveOwnerThread將本線程設置為獨占性變量所有者線程 return true; } } else if (current == getExclusiveOwnerThread()) { //如果該線程已經獲取了獨占性變量的所有權,那么根據重入性 //原理,將state值進行加1,表示多次lock //由于已經獲得鎖,該段代碼只會被一個線程同時執行,所以不需要 //進行任何并行處理 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //上述情況都不符合,說明獲取鎖失敗 return false; }
?由上述代碼我們可以發現,tryAcquire就是嘗試獲取那個線程獨占的變量state。state的值表示其狀態:如果是0,那么當前還沒有線程獨占此變量;否在就是已經有線程獨占了這個變量,也就是代表已經有線程獲得了鎖。但是這個時候要再進行一次判斷,看是否是當前線程自己獲得的這個鎖,如果是,就增加state的值。
?這里有幾點需要說明一下,首先是compareAndSetState函數,這是使用CAS操作來設置state的值,而且state值設置了volatile修飾符,通過這兩點來確保修改state的值不會出現多線程問題。然后是公平鎖和非公平鎖的區別問題,在UnfairSync的nonfairTryAcquire函數中不會在相同的位置上調用hasQueuedPredecessors來判斷當前是否已經有線程在排隊等待獲得鎖。
?如果tryAcquire返回true,那么就是獲取鎖成功;如果返回false,那么就是未獲得鎖,需要加入阻塞等待隊列。我們下面就來看一下addWaiter的相關操作。
等待鎖的阻塞隊列?將保存當前線程信息的節點加入到等待隊列的相關函數中涉及到了無鎖隊列的相關算法,由于在AQS中只是將節點添加到隊尾,使用到的無鎖算法也相對簡單。真正的無鎖隊列的算法我們等到分析ConcurrentSkippedListMap時在進行講解。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //先使用快速入列法來嘗試一下,如果失敗,則進行更加完備的入列算法. //只有在必要的情況下才會使用更加復雜耗時的算法,也就是樂觀的態度 Node pred = tail; //列尾指針 if (pred != null) { node.prev = pred; //步驟1:該節點的前趨指針指向tail if (compareAndSetTail(pred, node)){ //步驟二:cas將尾指針指向該節點 pred.next = node;//步驟三:如果成果,讓舊列尾節點的next指針指向該節點. return node; } } //cas失敗,或在pred == null時調用enq enq(node); return node; } private Node enq(final Node node) { for (;;) { //cas無鎖算法的標準for循環,不停的嘗試 Node t = tail; if (t == null) { //初始化 if (compareAndSetHead(new Node())) //需要注意的是head是一個哨兵的作用,并不代表某個要獲取鎖的線程節點 tail = head; } else { //和addWaiter中一致,不過有了外側的無限循環,不停的嘗試,自旋鎖 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
?通過調用addWaiter函數,AQS將當前線程加入到了等待隊列,但是還沒有阻塞當前線程的執行,接下來我們就來分析一下acquireQueued函數。
等待隊列節點的操作?由于進入阻塞狀態的操作會降低執行效率,所以,AQS會盡力避免試圖獲取獨占性變量的線程進入阻塞狀態。所以,當線程加入等待隊列之后,acquireQueued會執行一個for循環,每次都判斷當前節點是否應該獲得這個變量(在隊首了)。如果不應該獲取或在再次嘗試獲取失敗,那么就調用shouldParkAfterFailedAcquire判斷是否應該進入阻塞狀態。如果當前節點之前的節點已經進入阻塞狀態了,那么就可以判定當前節點不可能獲取到鎖,為了防止CPU不停的執行for循環,消耗CPU資源,調用parkAndCheckInterrupt函數來進入阻塞狀態。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //一直執行,直到獲取鎖,返回. final Node p = node.predecessor(); //node的前驅是head,就說明,node是將要獲取鎖的下一個節點. if (p == head && tryAcquire(arg)) { //所以再次嘗試獲取獨占性變量 setHead(node); //如果成果,那么就將自己設置為head p.next = null; // help GC failed = false; return interrupted; //此時,還沒有進入阻塞狀態,所以直接返回false,表示不需要中斷調用selfInterrupt函數 } //判斷是否要進入阻塞狀態.如果`shouldParkAfterFailedAcquire` //返回true,表示需要進入阻塞 //調用parkAndCheckInterrupt;否則表示還可以再次嘗試獲取鎖,繼續進行for循環 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //調用parkAndCheckInterrupt進行阻塞,然后返回是否為中斷狀態 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //前一個節點在等待獨占性變量釋放的通知,所以,當前節點可以阻塞 return true; if (ws > 0) { //前一個節點處于取消獲取獨占性變量的狀態,所以,可以跳過去 //返回false do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //將上一個節點的狀態設置為signal,返回false, compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //將AQS對象自己傳入 return Thread.interrupted(); }阻塞和中斷
?由上述分析,我們知道了AQS通過調用LockSupport的park方法來執行阻塞當前進程的操作。其實,這里的阻塞就是線程不再執行的含義,通過調用這個函數,線程進入阻塞狀態,上述的lock操作也就阻塞了,等待中斷或在獨占性變量被釋放。
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker);//設置阻塞對象,用來記錄線程被誰阻塞的,用于線程監控和分析工具來定位 UNSAFE.park(false, 0L);//讓當前線程不再被線程調度,就是當前線程不再執行. setBlocker(t, null); }
?關于中斷的相關知識,我們以后再說,就繼續沿著AQS的主線,看一下釋放獨占性變量的相關操作吧。
unlock操作?與lock操作類似,unlock操作調用了AQS的relase方法,參數和調用acquire時一樣,都是1。
public final boolean release(int arg) { if (tryRelease(arg)) { //釋放獨占性變量,起始就是將status的值減1,因為acquire時是加1 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//喚醒head的后繼節點 return true; } return false; }
?由上述代碼可知,release就是先調用tryRelease來釋放獨占性變量。如果成功,那么就看一下是否有等待鎖的阻塞線程,如果有,就調用unparkSuccessor來喚醒他們。
protected final boolean tryRelease(int releases) { //由于只有一個線程可以獲得獨占先變量,所以,所有操作不需要考慮多線程 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //如果等于0,那么說明鎖應該被釋放了,否則表示當前線程有多次lock操作. free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
?我們可以看到tryRelease中的邏輯也體現了可重入鎖的概念,只有等到state的值為0時,才代表鎖真正被釋放了。所以獨占性變量state的值就代表鎖的有無。當state=0時,表示鎖未被占有,否在表示當前鎖已經被占有。
private void unparkSuccessor(Node node) { ..... //一般來說,需要喚醒的線程就是head的下一個節點,但是如果它獲取鎖的操作被取消,或在節點為null時 //就直接繼續往后遍歷,找到第一個未取消的后繼節點. Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
?調用了unpark方法后,進行lock操作被阻塞的線程就恢復到運行狀態,就會再次執行acquireQueued中的無限for循環中的操作,再次嘗試獲取鎖。
后記?有關AQS和ReentrantLock的分析就差不多結束了。不得不說,我第一次看到AQS的實現時真是震驚,以前都認為Synchronized和ReentrantLock的實現原理是一致的,都是依靠java虛擬機的功能實現的。沒有想到還有AQS這樣一個背后大Boss在提供幫助啊。學習了這個類的原理,我們對JUC的很多類的分析就簡單了很多。此外,AQS涉及的CAS操作和無鎖隊列的算法也為我們學習其他無鎖算法提供了基礎。知識的海洋是無限的啊!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72983.html
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
閱讀 2241·2021-11-23 09:51
閱讀 1073·2021-11-22 15:35
閱讀 4831·2021-11-22 09:34
閱讀 1597·2021-10-08 10:13
閱讀 3018·2021-07-22 17:35
閱讀 2518·2019-08-30 15:56
閱讀 3079·2019-08-29 18:44
閱讀 3089·2019-08-29 15:32