摘要:注意是一個假節點,阻塞的節點是作為后面的節點出現的。總之在非公平鎖場景下嘗試去獲取鎖,如果獲取上了,則置一下狀態,并設置自己為獨占線程,并支持重入鎖功能。方法用于創建一個節點值為當前線程并維護一個雙向鏈表。阻塞了當前線程。
部分段落來自于http://javadoop.com/post/Abst...,他的文章相當不錯。
ReentrantLock基于Sync內部類來完成鎖。Sync繼承于AbstractQueuedSynchronizer。Sync有兩個不同的子類NonfairSync和FairSync。
ReentrantLock的大部分方法都是基于AbstractQueuedSynchronizer實現,大部分僅僅是對AbstractQueuedSynchronizer的轉發。因此,了解AbstractQueuedSynchronizer就非常重要。
作為AbstractQueuedSynchronizer的實現者需要實現isHeldExclusively,tryAcquire,tryRelease,(可選tryAcquireShared,tryReleaseShared)
那么我們看看對于一個常用的套路,ReentrantLock是如何實現同步的
lock.lock(); try{ i++; }finally { lock.unlock(); }
lock.lock()內部實現為調用了sync.lock(),之后又會調用NonfairSync或FairSync的lock(),你看果然重度使用了AQS吧,這里我們先記住這個位置,一會我們還會回來分析。
public void lock() { sync.lock(); }
先介紹一下AQS里面的屬性,不復雜就4個主要的屬性:AQS里面阻塞的節點是作為隊列出現的,維護了一個head節點和tail節點,同時維護了一個阻塞狀態,如果state=0表示沒有鎖,如果state>0表示鎖被重入了幾次。
注意head是一個假節點,阻塞的節點是作為head后面的節點出現的。
// 頭結點,你直接把它當做 當前持有鎖的線程 可能是最好理解的 private transient volatile Node head; // 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個隱視的鏈表 private transient volatile Node tail; // 這個是最重要的,不過也是最簡單的,代表當前鎖的狀態,0代表沒有被占用,大于0代表有線程持有當前鎖 // 之所以說大于0,而不是等于1,是因為鎖可以重入嘛,每次重入都加上1 private volatile int state; // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入 // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
接著看一下FairSync和NonfairSync的實現,FairSync和NonfairSync都繼承了Sync,而且Sync又繼承了AbstractQueuedSynchronizer。可以看到FairSync和NonfairSync直接或間接的實現了isHeldExclusively,tryAcquire,tryRelease這三個方法。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果沒有鎖上,則設置為鎖上并設置自己為獨占線程 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果鎖上了,而且獨占線程是自己,那么重新設置state+1,并且返回true else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //否則返回false return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don"t need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //如果沒有人鎖上,那么就設置我自己為獨占線程,否則再acquire一次 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //調用到了AQS的acquire里面 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don"t grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
之前我們說到回到ReentrantLock的lock()調用了sync.lock();現在我們回來看看非公平鎖的邏輯是:如果搶到鎖了,則設置自己的線程為占有鎖的線程,否則調用acquire(1),這個是AQS的方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire會調用tryAcquire,而這個是對于不同的實現是不一樣的,非公平鎖NonfairSync里面的tryAcquire,而tryAcquire又會調用到Sync的nonfairTryAcquire。總之tryAcquire在非公平鎖場景下嘗試去獲取鎖,如果獲取上了,則置一下AQS狀態state,并設置自己為獨占線程,并支持重入鎖功能。
addWaiter方法用于創建一個節點(值為當前線程)并維護一個雙向鏈表。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
現在說一下Node的結構,主要有用的field為waitStatus,prev,next,thread。waitStatus目前僅要了解1,0,-1就夠了。 0是默認狀態,1代表爭取鎖取消,-1表示它的后繼節點對應的線程需要被喚醒。也就是說這個waitStatus其實代表的不是自己的狀態,而是后繼節點的狀態。可以看見默認進隊的節點的waitStatus都是0
static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 標識節點當前在共享模式下 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 標識節點當前在獨占模式下 static final Node EXCLUSIVE = null; // ======== 下面的幾個int常量是給waitStatus用的 =========== /** waitStatus value to indicate thread has cancelled */ // 代碼此線程取消了爭搶這個鎖 static final int CANCELLED = 1; /** waitStatus value to indicate successor"s thread needs unparking */ // 官方的描述是,其表示當前node的后繼節點對應的線程需要被喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 本文不分析condition,所以略過吧 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 同樣的不分析,略過吧 static final int PROPAGATE = -3; // ===================================================== // 取值為上面的1、-1、-2、-3,或者0(以后會講到) // 這么理解,暫時只需要知道如果這個值 大于0 代表此線程取消了等待, // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。 volatile int waitStatus; // 前驅節點的引用 volatile Node prev; // 后繼節點的引用 volatile Node next; // 這個就是線程本尊 volatile Thread thread; }
acquireQueued的作用是從等待隊列中嘗試去把入隊的那個節點去做park。另外當節點unpark以后,也會在循環中將自己設置成頭結點,然后自己拿到鎖
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //對于隊首節點,剛才也許沒有搶到鎖,現在也許能搶到了,再試一次 if (p == head && tryAcquire(arg)) { //如果搶到了鎖,這個入隊的節點根本不需要park,直接可以執行 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果不是隊首節點,或者是隊首但是沒有搶過其他節點 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire。這個方法說的是:"當前線程沒有搶到鎖,是否需要掛起當前線程?第一個參數是前驅節點,第二個參數才是代表當前線程的節點。注意因為默認加入的節點的狀態都是0,這個方法會進來兩次,第一次進來走到else分支里面修改前置節點的waitStatus為-1.第二次進來直接返回true。對于剛加入隊列的節點,修改head節點的waitStatus為-1,對于后來加入的節點,修改它前一個節點的waitStatus為-1。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt的代碼很簡單,這個this就是ReentrantLock類的實例。阻塞了當前線程。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
再來看看怎么解鎖。
public void unlock() { sync.release(1); }
調用到AQS里面,如果鎖被完全釋放了,那么就unpark head的下一個
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease是由Sync覆蓋的。重置AQS里面的state,返回鎖是否被完全釋放了的判斷。
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待(waitStatus==1) // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
等到unpark以后,parkAndCheckInterrupt的阻塞解除,將繼續for無限循環,因為是隊列里是一個一個阻塞的,此時阻塞節點的前置依次都是head,因此if (p == head && tryAcquire(arg)) 這句話如果它醒來搶鎖成功了將執行成功,阻塞的線程獲取鎖并執行,將自己設置成head,同時也將自己從隊列中清除出去。 注意這里是非公平鎖,因此在tryAcquire有可能還沒有搶過其他線程,那么搶到的那個將會直接執行,而沒有搶到的,又在循環里鎖住了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68033.html
摘要:公平策略在多個線程爭用鎖的情況下,公平策略傾向于將訪問權授予等待時間最長的線程。使用方式的典型調用方式如下二類原理的源碼非常簡單,它通過內部類實現了框架,接口的實現僅僅是對的的簡單封裝,參見原理多線程進階七鎖框架獨占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首發于一世流云的專欄:https...
摘要:所謂的重入,就是當本線程想再次獲得鎖,不需要重新申請,它本身就已經鎖了,即重入該鎖。如果不為,則表示有線程已經占有了。總結回顧下要點是一個可重入的鎖被當前占用的線程重入。 上一章《AQS源碼閱讀》講了AQS框架,這次講講它的應用類(注意不是子類實現,待會細講)。ReentrantLock,顧名思義重入鎖,但什么是重入,這個鎖到底是怎樣的,我們來看看類的注解說明showImg(http:...
摘要:的主要功能和關鍵字一致,均是用于多線程的同步。而僅支持通過查詢當前線程是否持有鎖。由于和使用的是同一把可重入鎖,所以線程可以進入方法,并再次獲得鎖,而不會被阻塞住。公平與非公平公平與非公平指的是線程獲取鎖的方式。 1.簡介 可重入鎖ReentrantLock自 JDK 1.5 被引入,功能上與synchronized關鍵字類似。所謂的可重入是指,線程可對同一把鎖進行重復加鎖,而不會被阻...
摘要:不同的是它還多了內部類和內部類,以及讀寫對應的成員變量和方法。另外是給和內部類使用的。內部類前面說到的操作是分配到里面執行的。他們都是接口的實現,所以其實最像應該是這個兩個內部類。而且大體上也沒什么差異,也是用的內部類。 之前講了《AQS源碼閱讀》和《ReentrantLock源碼閱讀》,本次將延續閱讀下ReentrantReadWriteLock,建議沒看過之前兩篇文章的,先大概了解...
摘要:關于,最后有兩點規律需要注意當的等待隊列隊首結點是共享結點,說明當前寫鎖被占用,當寫鎖釋放時,會以傳播的方式喚醒頭結點之后緊鄰的各個共享結點。當的等待隊列隊首結點是獨占結點,說明當前讀鎖被使用,當讀鎖釋放歸零后,會喚醒隊首的獨占結點。 showImg(https://segmentfault.com/img/remote/1460000016012293); 本文首發于一世流云的專欄:...
閱讀 2684·2021-10-22 09:55
閱讀 2008·2021-09-27 13:35
閱讀 1267·2021-08-24 10:02
閱讀 1478·2019-08-30 15:55
閱讀 1198·2019-08-30 14:13
閱讀 3471·2019-08-30 13:57
閱讀 1975·2019-08-30 11:07
閱讀 2447·2019-08-29 17:12