摘要:開篇說明本文分析采用的是約定下面內(nèi)容中代表的是引用地址,引用對應(yīng)的節(jié)點前面已經(jīng)講解了公平模式的內(nèi)容,今天來講解下關(guān)于非公平模式下的是如何進行工作的,在源碼分析的時候,先來簡單看一下非公平模式的簡單原理,它采用的棧這種先進后出的方式進行非公
開篇
說明:本文分析采用的是jdk1.8約定:下面內(nèi)容中Ref-xxx代表的是引用地址,引用對應(yīng)的節(jié)點
前面已經(jīng)講解了公平模式的內(nèi)容,今天來講解下關(guān)于非公平模式下的SynchronousQueue是如何進行工作的,在源碼分析的時候,先來簡單看一下非公平模式的簡單原理,它采用的棧這種FILO先進后出的方式進行非公平處理,它內(nèi)部有三種狀態(tài),分別是REQUEST,DATA,F(xiàn)ULFILLING,其中REQUEST代表的數(shù)據(jù)請求的操作也就是take操作,而DATA表示的是數(shù)據(jù)也就是Put操作將數(shù)據(jù)存放到棧中,用于消費者進行獲取操作,而FULFILLING代表的是可以進行互補操作的狀態(tài),其實和前面講的公平模式也很類似。
當(dāng)有相同模式情況下進行入棧操作,相同操作指的是REQUEST和DATA兩種類型中任意一種進行操作時,模式相同則進行入棧操作,如下圖所示:
同REQUEST進行獲取數(shù)據(jù)時的入棧情況:
同樣的put的操作,進行數(shù)據(jù)操作時為DATA類型的操作,此時隊列情況為:
不同模式下又是如何進行操作的?當(dāng)有不同模式進來的時候,他不是將當(dāng)前的模式壓入棧頂,而是將FullFill模式和當(dāng)前模式進行按位或之后壓入棧頂,也就是壓入一個進行FullFill請求的模式進入棧頂,請求配對操作,如下圖所示:
通過上圖可見,本來棧中有一個DATA模式的數(shù)據(jù)等待消費者進行消費,這時候來了一個REQUEST模式的請求操作來進行消費數(shù)據(jù),這時候并沒有將REQUEST模式直接壓入棧頂,而是將其轉(zhuǎn)換為FULLFILLING模式,并且保留了原有的類型,這是進行FULLFILLING的請求,請求和棧頂下方元素進行匹配,當(dāng)匹配成功后將棧頂和匹配元素同時進行出棧操作,詳細請見下文分析:
/** 消費者模式 */ static final int REQUEST = 0; /** 提供者模式 */ static final int DATA = 1; /** 互補模式 */ static final int FULFILLING = 2; /** 棧頂指針 */ volatile SNode head;方法
方法名 | 描述 |
---|---|
isFulfilling | 判斷指定類型是否是互補模式 |
casHead | 替換當(dāng)前頭結(jié)點 |
snode | 生成SNode節(jié)點對象 |
transfer | 主要處理邏輯 |
awaitFulfill | 等待fulfill操作 |
shouldSpin | 判斷節(jié)點s是頭結(jié)點或是fulfill節(jié)點則返回true |
volatile SNode next; // 棧下一個元素 volatile SNode match; // 匹配的節(jié)點 volatile Thread waiter; // 控制park/unpark的線程 Object item; // 數(shù)據(jù)或請求 int mode; // 模式,上面介紹的三種模式方法
方法名 | 描述 |
---|---|
casNext | 判斷指定類型是否是互補模式 |
tryMatch | 嘗試匹配節(jié)點,如果存在匹配節(jié)點則判斷是否是當(dāng)前節(jié)點,直接返回判斷結(jié)果,如果沒有則替換match內(nèi)容并且喚醒線程 |
tryCancel | 取消當(dāng)前節(jié)點,將當(dāng)前節(jié)點的match節(jié)點設(shè)置為當(dāng)前節(jié)點(this) |
isCancelled | 判斷match節(jié)點是不是等于當(dāng)前節(jié)點 |
經(jīng)過上面內(nèi)容的分析,接下來就進入正題,讓我們整體先看一下下transfer都為我們做了些什么內(nèi)容,下面是transfer源碼內(nèi)容:
E transfer(E e, boolean timed, long nanos) { /* * Basic algorithm is to loop trying one of three actions: * * 1. If apparently empty or already containing nodes of same * mode, try to push node on stack and wait for a match, * returning it, or null if cancelled. * * 2. If apparently containing node of complementary mode, * try to push a fulfilling node on to stack, match * with corresponding waiting node, pop both from * stack, and return matched item. The matching or * unlinking might not actually be necessary because of * other threads performing action 3: * * 3. If top of stack already holds another fulfilling node, * help it out by doing its match and/or pop * operations, and then continue. The code for helping * is essentially the same as for fulfilling, except * that it doesn"t return the item. */ SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { // 棧頂指針為空或者是模式相同 if (timed && nanos <= 0) { // 制定了timed并且時間小于等于0則取消操作。 if (h != null && h.isCancelled()) casHead(h, h.next); // 判斷頭結(jié)點是否被取消了取消了就彈出隊列,將頭結(jié)點指向下一個節(jié)點 else return null; } else if (casHead(h, s = snode(s, e, h, mode))) {// 初始化新節(jié)點并且修改棧頂指針 SNode m = awaitFulfill(s, timed, nanos); // 進行等待操作 if (m == s) { // 返回內(nèi)容是本身則進行清理操作 clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s"s fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // 嘗試去匹配 if (h.isCancelled()) // 判斷是否已經(jīng)被取消了 casHead(h, h.next); // 彈出取消的節(jié)點并且從新進入主循環(huán) else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//新建一個Full節(jié)點壓入棧頂 for (;;) { // 循環(huán)直到匹配 SNode m = s.next; // s的下一個節(jié)點為匹配節(jié)點 if (m == null) { // 代表沒有等待內(nèi)容了 casHead(s, null); // 彈出full節(jié)點 s = null; // 設(shè)置為null用于下次生成新的節(jié)點 break; // 退回到主循環(huán)中 } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // 彈出s節(jié)點和m節(jié)點兩個節(jié)點 return (E) ((mode == REQUEST) ? m.item : s.item); } else // 如果失去了匹配 s.casNext(m, mn); // 幫助取消連接 } } } else { // 這里是幫助進行fillull SNode m = h.next; // m是頭結(jié)點的匹配節(jié)點 if (m == null) // 如果m不存在則直接將頭節(jié)點賦值為nll casHead(h, null); // 彈出fulfill節(jié)點 else { SNode mn = m.next; if (m.tryMatch(h)) // h節(jié)點嘗試匹配m節(jié)點 casHead(h, mn); // 彈出h和m節(jié)點 else // 丟失匹配則直接將頭結(jié)點的下一個節(jié)點賦值為頭結(jié)點的下下節(jié)點 h.casNext(m, mn); } } } }
模式相同的時候則進行等待操作,入隊等待操作
當(dāng)模式不相同時,首先判斷頭結(jié)點是否是fulfill節(jié)點如果不是則進行匹配操作,如果是fulfill節(jié)點先幫助頭結(jié)點的fulfill節(jié)點進行匹配操作
接下來再來看一下awaitFulfill方法內(nèi)容
SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 等待線程 Thread w = Thread.currentThread(); // 等待時間設(shè)置 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) // 判斷當(dāng)前線程是否被中斷 s.tryCancel(); // 嘗試取消操作 SNode m = s.match; // 獲取當(dāng)前節(jié)點的匹配節(jié)點,如果節(jié)點不為null代表匹配或取消操作,則返回 if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
通過上面的源碼,其實我們之前分析同步模式的時候差不太多,變化的地方其中包括返回內(nèi)容判斷這里判斷的是match節(jié)點是否為null,還有就是spins時間設(shè)置這里發(fā)現(xiàn)了shoudSpin用來判斷是否進行輪訓(xùn),來看一下shouldSpin方法:
/** * 判斷節(jié)點是否是fulfill節(jié)點,或者是頭結(jié)點為空再或者是頭結(jié)點和當(dāng)前節(jié)點相等時則不需要進行輪訓(xùn)操作 */ boolean shouldSpin(SNode s) { SNode h = head; return (h == s || h == null || isFulfilling(h.mode)); }
實際上就是判斷節(jié)點是否是fulfill節(jié)點,或者是頭結(jié)點為空再或者是頭結(jié)點和當(dāng)前節(jié)點相等時則不需要進行輪訓(xùn)操作,如果滿足上述條件就不小進行輪訓(xùn)等到操作了直接進行等待就行了。
接下來我們來用例子一點點解析原理:
首先先進行一個put操作,這樣可以簡單分析下內(nèi)部信息。
/** * SynchronousQueue原理內(nèi)容 * * @author battleheart */ public class SynchronousQueueDemo1 { public static void main(String[] args) throws Exception { SynchronousQueuequeue = new SynchronousQueue<>(); Thread thread1 = new Thread(() -> { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); } }
首先它會進入到transfer方法中,進行第一步的判斷他的類型信息,如下所示:
SNode s = null; // constructed/reused as needed int mode = (e == null) ? REQUEST : DATA;
通過上面代碼可以看到e=1所以是DATA類型,接下來進行判斷是如何進行操作,當(dāng)前堆棧是空的,如何判斷堆棧為空呢?上面也講到了head節(jié)點為空時則代表堆棧為空,接下來就要判斷如果head節(jié)點為空或head指向的節(jié)點和當(dāng)前操作內(nèi)容模式相同,則進行等待操作,如下代碼所示:
SNode h = head; if (h == null || h.mode == mode) { // empty or same-mode if (timed && nanos <= 0) { // can"t wait if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s"s fulfiller return (E) ((mode == REQUEST) ? m.item : s.item); } }
顯然頭結(jié)點是空的,所以進入到第一個fi語句中執(zhí)行等待操作,如果指定了timed則判斷時間是否小于0,如果小于0則直接null,反之判斷當(dāng)前節(jié)點是否不是頭結(jié)點以及頭結(jié)點是否取消,潘祖條件彈出頭結(jié)點,并將下一個節(jié)點設(shè)置為頭結(jié)點,上述條件在當(dāng)前例子中都不滿足,所以要進入到下面這段代碼中,首先進行對s進行初始化值,并且進行入棧操作,casHead(h, s = snode(s, e, h, mode)),下面看一下棧中的情況如下圖所示:
當(dāng)執(zhí)行完了入棧操作之后接下來要執(zhí)行awaitFulfill這里的操作就是輪訓(xùn)以及將當(dāng)前節(jié)點的線程賦值,并且掛起當(dāng)前線程。此時的棧的情況如下圖所示:
當(dāng)有同樣的模式進行操作時候也是重復(fù)上述的操作內(nèi)容,我們這里模擬兩次put操作,讓讓我們看一下棧中的情況如下圖所示:
通過上圖可以看到,其實就是將頭結(jié)點移動到了新的節(jié)點上,然后新節(jié)點的next節(jié)點維護這下一個節(jié)點的引用,好了,上述內(nèi)容分析是同模式的操作,接下來我們試著進行take操作時,這時候會發(fā)什么內(nèi)容呢?
/** * SynchronousQueue例子二進行兩次put操作和一次take操作 * * @author battleheart */ public class SynchronousQueueDemo1 { public static void main(String[] args) throws Exception { SynchronousQueuequeue = new SynchronousQueue<>(); Thread thread1 = new Thread(() -> { try { queue.put(1); } catch (InterruptedException e) { e.printStackTrace(); } }); thread1.start(); Thread.sleep(2000); Thread thread2 = new Thread(() -> { try { queue.put(2); } catch (InterruptedException e) { e.printStackTrace(); } }); thread2.start(); Thread.sleep(2000); Thread thread6 = new Thread(() -> { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } }); thread6.start(); } }
上面例子正好符合上面例子兩次put操作的截圖,進行兩次put操作過后再進行take操作,接下來我們來看一下take操作是如何進行操作的,換句話說當(dāng)有不同模式的操作時又是如何進行處理呢?上面分析的內(nèi)容是同種操作模式下的,當(dāng)有不同操作則會走下面內(nèi)容:
else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s"s match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h"s match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } }
最下面的else我們等會來進行分析,我們看到如果不是同模式的話,則會先判斷是否是fulfill模式,如果不是fulfill模式,則進入到第一個if語句中,顯然通過圖示6可以得出,頭結(jié)點head模式并不是fillfull模式,則進入到該if語句中,上來首先判斷當(dāng)前頭結(jié)點是否被取消了,如果被取消則將頭結(jié)點移動到棧頂下一個節(jié)點,反之則將s節(jié)點賦值為fulfill模式按位或當(dāng)前節(jié)點模式,個人認為目的是既保留了原有模式也變成了fulfill模式,我們開篇就講到了,REQUEST=0,二進制則是00,而DATA=1,其二進制為01,而FULFILLING=2,其二進制表示10,也就是說如果當(dāng)前節(jié)點是REQUEST的話那么節(jié)點的內(nèi)容值時00|10=10,如果節(jié)點是DATA模式則s節(jié)點的模式時01|10=11,這樣的話11既保留了原有模式也是FULFILLING模式,然后將頭節(jié)點移動到當(dāng)前s節(jié)點,也就是將FULFILLING模式節(jié)點入棧操作,目前分析到這里時casHead(h, s=snode(s, e, h, FULFILLING|mode),棧的情況如下圖所示:
接下來運行for循環(huán)里面內(nèi)容,先運行如下內(nèi)容:
SNode m = s.next; // m is s"s match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop }
先判斷當(dāng)前節(jié)點也就是頭結(jié)點s的下一個節(jié)點上圖中head=s節(jié)點,所以s.next節(jié)點代表的是Ref-750,判斷當(dāng)前節(jié)點是否為空,如果為空的話代表沒有可匹配的節(jié)點,先對head進行替換為null代表堆棧為空,然后將當(dāng)前s節(jié)點設(shè)置為null,退出fulfill匹配模式進入到主循環(huán)中,會重新進行對當(dāng)前節(jié)點進行操作,是消費還是匹配,顯然本例子中m節(jié)點是不為空的,所以這里不會運行,跳過之后運行下面內(nèi)容:
SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink
mn節(jié)點在上圖中對應(yīng)的是Ref-681,這里是重點,m.tryMatch(s),m節(jié)點嘗試匹配s節(jié)點,進入到方法里,到這一步是我們再來看一下頭結(jié)點的元素的內(nèi)容:
并且喚醒m節(jié)點的,告訴m節(jié)點,你現(xiàn)在有匹配的對象了你可以被喚醒了,這里喚醒之后就會進入到awaitFulfill下面的操作
SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s"s fulfiller return (E) ((mode == REQUEST) ? m.item : s.item);
運行這里的線程顯然是上圖中的m節(jié)點,因為m節(jié)點被喚醒了,m==s代表的是取消了節(jié)點,顯然沒有進行該操作,然后就是幫助頭結(jié)點進行fulfill操作,這里重點說一下這段代碼:
if ((h = head) != null && h.next == s) casHead(h, s.next);
獲取當(dāng)前頭結(jié)點,也就是上圖中的頭結(jié)點如果不為空而且h.next節(jié)點為m節(jié)點正好是m節(jié)點進行操作時的s節(jié)點,也就是說這個語句是成立的,直接將頭節(jié)點指向了上圖的mn節(jié)點,這里的操作和take中的下面操作是一樣的,也就是幫助fulfill操作彈出棧頂和棧頂匹配的節(jié)點內(nèi)容,下面代碼:
SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match s.casNext(m, mn); // help unlink
重點是casHead的代碼,彈出s和m兩個節(jié)點,此時棧中內(nèi)容如下圖所示:
主要的流程分析完畢了,但是細心的朋友會發(fā)現(xiàn),最后面還有一個幫助fulfill的操作,(transfer中)代碼如下所示:
else { // help a fulfiller SNode m = h.next; // m is h"s match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } }
個人理解是這樣的,我們上面也分析到了如果模式是相同模式情況和如果是不同模式且模式不為匹配模式的情況,但是還會有另外一種情況就是如果是不同模式并且頭結(jié)點是匹配模式的就會進入到幫助去fullfill的情況,我來畫圖說明一下該情況:
如上圖所示,上一個匹配操作沒有進行完然后又來了一個請求操作,他就會幫助head進行匹配操作,也就是運行上面的代碼邏輯,邏輯和匹配內(nèi)容是一樣的。
接下來讓我們看一下取消的clean方法內(nèi)容:
void clean(SNode s) { s.item = null; // 將item值設(shè)置為null s.waiter = null; // 將線程設(shè)置為null SNode past = s.next; // s節(jié)點下一個節(jié)點如果不為空,并且節(jié)點是取消節(jié)點則指向下下個節(jié)點,這里是結(jié)束的標(biāo)識,代表沒有了。 if (past != null && past.isCancelled()) past = past.next; // 如果取消的是頭節(jié)點則運行下面的清理操作,操作邏輯很簡單就是判斷頭結(jié)點是不是取消節(jié)點,如果是則將節(jié)點一定到下一個節(jié)點 SNode p; while ((p = head) != null && p != past && p.isCancelled()) casHead(p, p.next); // 取消不是頭結(jié)點的嵌套節(jié)點。 while (p != null && p != past) { SNode n = p.next; if (n != null && n.isCancelled()) p.casNext(n, n.next); else p = n; } }
通過源碼可以看到首先是先找到一個可以結(jié)束的標(biāo)識past,也就說到這里就結(jié)束了,判斷是否不是頭節(jié)點被取消了,如果是頭節(jié)點被取消了則進行第一個while語句,操作也很簡單就是將頭節(jié)點替換頭結(jié)點的下一個節(jié)點,如果不是頭節(jié)點被取消了則進行下面的while語句操作,其實就是將取消的上一個節(jié)點的下一個節(jié)點指定為被取消節(jié)點的下一個節(jié)點,到此分析完畢了。
結(jié)束語如果有分析不正確的請各位指正,我這邊改正~
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/74553.html
摘要:如果節(jié)點不為說明已經(jīng)有其他線程進行操作將節(jié)點替換為節(jié)點等待有消費者消費線程。如果頭節(jié)點下一個節(jié)點是當(dāng)前節(jié)點以防止其他線程已經(jīng)修改了節(jié)點則運算,否則直接返回。 一、介紹 SynchronousQueue是一個雙棧雙隊列算法,無空間的隊列或棧,任何一個對SynchronousQueue寫需要等到一個對SynchronousQueue的讀操作,反之亦然。一個讀操作需要等待一個寫操作,相當(dāng)于是...
摘要:三總結(jié)主要用于線程之間的數(shù)據(jù)交換,由于采用無鎖算法,其性能一般比單純的其它阻塞隊列要高。它的最大特點時不存儲實際元素,而是在內(nèi)部通過棧或隊列結(jié)構(gòu)保存阻塞線程。 showImg(https://segmentfault.com/img/bVbgOsh?w=900&h=900); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、Synchro...
摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問題。同時,也用于自帶線程池的緩沖隊列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數(shù)據(jù)的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質(zhì)量的多線程程序帶來極大的便利。同時,BlockingQueue也用于...
摘要:概述前面已經(jīng)講解了關(guān)于的非公平鎖模式,關(guān)于非公平鎖,內(nèi)部其實告訴我們誰先爭搶到鎖誰就先獲得資源,下面就來分析一下公平鎖內(nèi)部是如何實現(xiàn)公平的如果沒有看過非公平鎖的先去了解下非公平鎖,因為這篇文章前面不會講太多內(nèi)部結(jié)構(gòu),直接會對源碼進行分析前文 概述 前面已經(jīng)講解了關(guān)于AQS的非公平鎖模式,關(guān)于NonfairSync非公平鎖,內(nèi)部其實告訴我們誰先爭搶到鎖誰就先獲得資源,下面就來分析一下公平...
摘要:限期阻塞調(diào)用方法等待時間結(jié)束或線程執(zhí)行完畢。終止?fàn)顟B(tài)線程執(zhí)行完畢或出現(xiàn)異常退了。和都會檢查線程何時中斷,并且在發(fā)現(xiàn)中斷時提前放回。工廠方法將線程池的最大大小設(shè)置為,而將基本大小設(shè)置為,并將超時大小設(shè)置為分鐘。 wait()、notify()、notifyAll() Object是所有類的基類,它有5個方法組成了等待、通知機制的核心:notify()、notifyAll()、wait()...
閱讀 2849·2021-11-22 11:56
閱讀 3553·2021-11-15 11:39
閱讀 898·2021-09-24 09:48
閱讀 759·2021-08-17 10:14
閱讀 1322·2019-08-30 15:55
閱讀 2753·2019-08-30 15:55
閱讀 1310·2019-08-30 15:44
閱讀 2775·2019-08-30 10:59