摘要:同步器的實現根據其狀態是否獨占而有所不同。這個框架為同步狀態的原子性管理線程的阻塞和解除阻塞以及排隊提供了一種通用的機制。在需要使用同步器的目標類中,繼承了的子類要求被聲明為目標類的非公有內部類。類通過一組方法實現線程的阻塞和解除阻塞。
java.util.concurrent.locks包主要是提供線程通信的鎖,下面看一下包中有哪些類。
Unsafeconcurrent包里的很多方法都是基于sun.misc.Unsafe這個類,Unsafe這個類從名字上可以看出是一個不安全的類,JDK也并沒有把這個類開放給用戶使用(但是我們可以通過一些比較hack的方式使用到這個類)。Unsafe是一個單例的類,通過靜態的getUnsafe()方法獲取到他的實例,可以看到,在方法中會判斷調用Unsafe.getUnsafe()方法的類的類加載器是不是引導類加載器BootstrapClassLoader,一般我們開發的代碼所屬的類加載器會是AppClassLoader及其子類,所以此時會拋出SecurityException,告訴我們unsafe,不要用??!
@CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); if(!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } }
Unsafe類在JDK源碼中經常用到,主要作用是任意內存地址位置處讀寫數據,以及CAS操作。它的大部分操作都是通過JNI(Java Native Interface)完成的,因此它所分配的內存需要手動free,所以是非常危險的。
Java并發中主要用到是的Unsafe中的Compare And Swap操作,CAS 操作包含三個操作數 —— 內存位置(offset)、預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在 CAS 指令之前返回該位置的值。CAS 有效地說明了“我認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可?!?/p>
// 獲取類的某個字段在類的實例中內存位置的偏移量 public native long objectFieldOffset(Field var1); /* * 下面三個方法是類似的,對var1對象的偏移量是var2的字段進行CAS操作 * 預期值是var4,如果該字段當前值是var4,則更新為var5,否則什么都不做 */ public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);LockSupport
LockSupport是j.u.c包中并發控制的基礎,它的底層是依賴于Unsafe實現的。LockSupport提供了Thread.suspend()和Thread.resume()的替代方案,因為suspend和resume是不安全的,所以已經被標記為deprecated。suspend()和resume()必須要成對出現,否則非常容易發生死鎖。
因為suspend方法并不會釋放鎖,如果使用suspend的目標線程對一個重要的系統資源持有鎖,那么沒任何線程可以使用這個資源直到要suspend的目標線程被resumed,如果一個線程在resume目標線程之前嘗試持有這個重要的系統資源鎖再去resume目標線程,這兩條線程就相互死鎖了。
public class LockSupport { private LockSupport() {} // Cannot be instantiated. public static void unpark(Thread thread); public static void park(Object blocker); public static void parkNanos(Object blocker, long nanos); public static void parkUntil(Object blocker, long deadline); public static void park(); public static void parkNanos(long nanos); public static void parkUntil(long deadline); }
LockSupport中主要用到park和unpark方法,park阻塞當前線程,unpark解除指定線程的阻塞。而且unpark可以在park之前執行,比Thread的wait/notify更加靈活。
LockSupport中有個叫做permit(許可)的概念,unpark方法有兩種情況:
如果入參的線程是阻塞的,那么解除該線程的阻塞
否則給該線程一個permit,確保該線程下一次執行park的時候不被阻塞,直接返回。
相應的,park也分為兩種情況:
如果一個線程有許可的話,那么它在調用park方法時就會收回它那個許可,但是不會被阻塞,而是直接返回。但是當它再次調用park方法時,因為許可已經被用掉了,于是又成了第2種情況。
如果一個線程沒有許可,那么它在調用park方法時就會被阻塞,直到以下事件之一發生才會解除阻塞。
有其它線程調用unpark方法給它發許可
其他線程調用了當前線程的interrupt方法
阻塞過時(調用parkNanos(long nanos)阻塞指定時間長度或調用parkUntil(long deadline)阻塞直到指定的時間戳)
虛假喚醒(Spurious wakeup)
需要注意的一點是,一個線程一個時刻最多只能有一個許可,即使你多次調用unpark方法它也只能有一個許可.
The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. (Such tools may access blockers using method getBlocker(Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. The normal argument to supply as a blocker within a lock implementation is this.
park, parkUntil, parkNanos這3個方法都分別對應有一個帶Object blocker參數的方法,表示把線程阻塞在這個對象上,類似于synchronized()中的鎖對象,以允許監視工具和診斷工具確定線程受阻塞的原因。Java官方建議使用帶blocker參數的park方法,并用this關鍵字作為blocker參數。
AbstractOwnableSynchronizer可以由線程以獨占方式擁有的同步器。此類為創建鎖和相關同步器(伴隨著所有權的概念)提供了基礎。AbstractOwnableSynchronizer 類本身不管理或使用此信息。但是,子類和工具可以使用適當維護的值幫助控制和監視訪問以及提供診斷。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
這是一個抽象類,在j.u.c包中它有2個子類:AbstractQueuedSynchronizer和AbstractQueuedLongSynchronizer。同步器的實現根據其狀態是否獨占而有所不同。獨占狀態的同步器,在同一時間只有一個線程可以通過阻塞點,而共享狀態的同步器可以同時有多個線程在執行。一般鎖的實現類往往只維護獨占狀態,但是,例如計數信號量在數量許可的情況下,允許多個線程同時執行。為了使框架能得到廣泛應用,這兩種模式都要支持。
AbstractQueuedSynchronizer在JDK1.5之前,線程同步是通過synchronized關鍵字實現的,
從JDK1.5開始提供的java.util.concurrent包中,大部分的同步器(例如鎖,屏障等等)都是基于AbstractQueuedSynchronizer類(下稱AQS類)而構建的。這個框架為同步狀態的原子性管理、線程的阻塞和解除阻塞以及排隊提供了一種通用的機制。
線程同步涉及兩個操作,對臨界資源的競爭和釋放。在j.u.c包中,這兩個操作的設計思想是:
acquire
while (synchronization state does not allow acquire) { enqueue current thread if not already queued; possibly block current thread; } dequeue current thread if it was queued;
release
update synchronization state; if (state may permit a blocked thread to acquire) unblock one or more queued threads;
為了實現上述操作,需要下面三個基本組件的相互協作:
同步狀態的原子性管理;
線程的阻塞與解除阻塞;
隊列的管理;
AQS類的一般用法是繼承,在子類中定義管理同步狀態的方法,并且定義這個AQS實現類在acquire和release操作時同步狀態變化對應的含義。AQS類負責管理線程的阻塞和線程隊列。在需要使用同步器的目標類中,繼承了AQS的子類要求被聲明為目標類的非公有內部類。例如下圖j.u.c包中,在需要使用AQS控制線程同步時,都是在類中聲明一個內部類并繼承AQS。
AQS類支持共享和排他兩種模式,排他模式下,只能有一個線程acquire,共享模式下可以多個線程同時acquire。
1. 同步狀態
AQS類使用單個int(32位)來保存同步狀態,并暴露出getState、setState以及compareAndSetState操作來讀取和更新這個狀態。compareAndSetState僅當同步狀態擁有一個期望值的時候,才會被原子地設置成新值。
private volatile int state; protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
將同步狀態限制為一個32位的整型是出于實踐上的考量。雖然JSR166也提供了64位long字段的原子性操作,但這些操作在很多平臺上還是使用內部鎖的方式來模擬實現的,這會使同步器的性能可能不會很理想。JDK1.6中新增的java.util.concurrent.locks.AbstractQueuedLongSynchronizer類,就是使用long變量維護同步狀態的一個AbstractOwnableSynchronizer版本。目前來說,32位的狀態對大多數應用程序都是足夠的。在j.u.c包中,只有一個同步器類可能需要多于32位來維持狀態,那就是CyclicBarrier類,所以它用了鎖(該包中大多數更高層次的工具亦是如此)。
基于AQS的具體實現類必須根據暴露出的狀態相關的方法定義tryAcquire和tryRelease方法,以控制acquire和release操作。當同步狀態滿足時,tryAcquire方法必須返回true,而當新的同步狀態允許后續acquire時,tryRelease方法也必須返回true。這些方法都接受一個int類型的參數用于傳遞想要的狀態。例如:可重入鎖中,當某個線程從條件等待中返回,然后重新獲取鎖時,為了重新建立循環計數的場景。很多同步器并不需要這樣一個參數,因此忽略它即可。
2. 隊列
整個框架的關鍵就是如何管理被阻塞的線程的隊列,該隊列是嚴格的FIFO隊列,因此,框架不支持基于優先級的同步。
隊列中的元素Node(AQS的內部類)就是保存著線程引用和線程狀態的容器,每個線程對同步器的訪問,都可以看做是隊列中的一個節點。Node的主要包含以下成員變量:
static final class Node { volatile int waitStatus; volatile Node prev; // 前驅節點 volatile Node next; // 后繼節點 volatile Thread thread; // 入隊列時的當前線程 Node nextWaiter; // 存儲condition隊列中的后繼節點 /* waitStatus */ static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /* 標識節點的等待是共享模式或排他模式 */ static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; }
waitStatus的含義:
CANCELLED,值為1,表示當前的線程因超時或中斷被取消;
SIGNAL,值為-1,表示當前節點的后繼節點包含的線程處于阻塞狀態,當前節點線程釋放時需要對后繼進行unpark;
CONDITION,值為-2,表示當前節點在等待condition,也就是在condition隊列中;
PROPAGATE,值為-3,表示在同步器在共享模式下,當前節點釋放后傳播到其他節點;
值為0,表示當前節點在sync隊列中,等待著獲取鎖
enq節點入隊,如果隊列為空則先初始化隊列,創建一個空節點作為頭節點。
private transient volatile Node head; // 隊列頭節點 private transient volatile Node tail; // 隊列尾節點 /* 入隊 */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 延遲初始化,隊列為空時創建一個空Node,head和tail都指向這個Node if (compareAndSetHead(new Node())) tail = head; } else { // 死循環CAS操作,把新節點和隊列當前尾節點做雙向綁定 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
addWaiter先判斷tail如果不為空則進行一次快速的插入,否則使用enq進行可能包括隊列初始化的入隊操作。
/* * 把當前線程用Node包裝起來并入隊 * mode有兩種情況: Node.EXCLUSIVE/Node.SHARED * this.nextWaiter = mode; */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
3. 阻塞
AQS可以根據具體的場景提供exclusive模式和shared模式,在exclusive模式下,同一時刻最多只能有一個線程能夠處于成功獲取的狀態,排他鎖是一個exclusive模式的例子,shared模式則可以多個線程一起獲取成功,如多個許可的Semaphore。
AQS類通過一組aquire/release方法實現線程的阻塞和解除阻塞。在共享模式和獨占模式下,又有所區別。
子類需要去實現以下方法:
/* 獨占模式 */ protected boolean tryAcquire(int arg) protected boolean tryRelease(int arg) /* 共享模式 */ protected int tryAcquireShared(int arg) protected boolean tryReleaseShared(int arg)獨占模式下的acquire
首先嘗試一次tryAcquire, 如果不成功則添加一個Node節點到等待隊列反復重試。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
當前線程包裝為node對象加入隊尾,acquireQueued則在循環中判斷node的前驅節點是不是head,如果是則繼續嘗試tryAcquire,如果acquire成功則說明成功通過了acquire,則將自己設置為新的head。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; /* 死循環中不斷重試acquire */ for (;;) { final Node p = node.predecessor(); /* 嘗試acquire,成功則把自己設為隊列head節點 */ if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } /* acquire失敗后判斷是否park阻塞,還是要繼續重試acquire */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } /* pred是node的前驅節點,此方法用于判斷node節點acquire失敗后是否park阻塞 */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * 前驅節點狀態是SIGNAL,release時會signal喚醒node * 所以node在acquire失敗時應當繼續park阻塞 */ return true; if (ws > 0) { /* * 前驅節點pred狀態是CANCELLED * 向前遍歷隊列,直到找到狀態不是CANCELLED的節點 * 把這個節點和node設置為前驅后繼關系 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 前驅節點的狀態是0或PROPAGATE * 前驅節點狀態更新為SIGNAL,release時喚醒node節點 * node節點則不需要park,繼續嘗試acquire */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /* 當前線程park,并返回中斷狀態 */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }共享模式下的acquire
子類實現tryAcquireShared(arg), 調用tryAcquireShared返回值小于0說明獲取失敗,等于0表示獲取成功,但是接下來的acquireShared不會成功,大于0說明tryAcquireShared獲取成功并且接下來的acquireShared也可能成功。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
實現共享模式acquire的主要邏輯在下邊的doAcquireShared方法中,把當前線程封裝為Node加入隊列,向前遍歷隊列,直到當前節點的前驅是頭節點,然后嘗試tryAcquireShared,tryAcquireShared成功后(結果>=0),調用setHeadAndPropagate。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared中tryAcquireShared返回值大于0,head為null或head的waitStatus小于0,滿足以上條件情況下,判斷當前節點的后繼節點若為null或是共享類型,調用doReleaseShared喚醒后繼節點以確保共享沿隊列繼續傳播。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don"t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }獨占模式下的release
如果tryRelease返回了true,說明可以喚醒其他線程,則判斷head不為null并且waitStatus不為0的情況下去unpark后繼節點。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
unparkSuccessor中當node的后繼節點為null或waitStatus > 0說明
next已經取消。此時需要從tail向前遍歷找到離node最近的沒有取消的節點進行unpark。如果node的后繼節點s不是null而且waitStatus < 0則unpark節點s。
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }共享模式下的release
tryReleaseShared返回true,調用doReleaseShared,允許一個等待的節點 acquire成功。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
和獨占模式的release只unpark一個后繼節點不同的是,共享模式下 喚醒行為需要向后傳播。doReleaseShared會從head開始往后檢查狀態,如果節點是SIGNAL狀態,就喚醒它的后繼節點。如果是0就標記為PROPAGATE, 等它釋放鎖的時候會再次喚醒后繼節點。
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77178.html
摘要:源碼學習筆記基于包源碼大致分為以下幾組對包集合框架的擴展更好的支持多線程并發操作線程池相關鎖基本數據類型的原子性封裝 Java concurrent 源碼學習筆記基于JDK1.8 concurrent包源碼大致分為以下幾組: 對util包集合框架的擴展(更好的支持多線程并發操作) 線程池相關 鎖 基本數據類型的原子性封裝 showImg(https://segmentfault.c...
摘要:總結總的來說,操作順序是進入隊列喚醒,成功獲得鎖將狀態變為并將其從轉到使再次獲得鎖執行余下代碼。當然這是理由狀態下,為了討論及的原理,實際的操作時序也有可能變化。 AQS Condition 最近面試被問到java concurrent包下有哪些熟悉的,用過的工具。因此來回顧一下,這些工具的底層實現,AbstractQueuedSynchronizer。在網上看到了其他人的一些技術博客...
摘要:總結總的來說,操作順序是進入隊列喚醒,成功獲得鎖將狀態變為并將其從轉到使再次獲得鎖執行余下代碼。當然這是理由狀態下,為了討論及的原理,實際的操作時序也有可能變化。 AQS Condition 最近面試被問到java concurrent包下有哪些熟悉的,用過的工具。因此來回顧一下,這些工具的底層實現,AbstractQueuedSynchronizer。在網上看到了其他人的一些技術博客...
閱讀 2985·2021-10-19 11:46
閱讀 979·2021-08-03 14:03
閱讀 2934·2021-06-11 18:08
閱讀 2905·2019-08-29 13:52
閱讀 2744·2019-08-29 12:49
閱讀 480·2019-08-26 13:56
閱讀 924·2019-08-26 13:41
閱讀 849·2019-08-26 13:35