摘要:概述是從開始提供的一種非阻塞式線程安全鏈表,隸屬于包。當許多線程同時訪問一個公共集合時,是一個合適的選擇。程序的一次輸出為該程序實現了多線程并發添加大量元素到一個公共的鏈表,剛好是的典型使用場景。
Java JUC學習 - ConcurrentLinkedDeque 詳解 0x00 前言
如何實現并發程序,對于Java以及其他高級語言來說都是一件并不容易的事情。在大一上學期的時候,我們學習了鏈表。可以說鏈表是一種很常用的數據結構了,不管是實現鏈式的隊列和棧還是實現樹都需要這種數據結構。但是在最初的時候我們并沒有考慮過鏈表的并發問題,比如下面是一個經典的鏈表插入過程:
//C Source Code //查找函數 LinkNode *SearchNode(LinkList *list, int i); //插入函數 void LinkListInsert(LinkList *list, LinkNode *node, int i) { // 首先找到插入點之前的結點 LinkNode *previous = SearchNode(list, i-1); // 插入操作 node->next = previous->next; previous->next = node; }
如果只有一個線程操作鏈表的話那一定是沒有問題的,但是因為鏈表的插入操作實際上是非原子(nonatomic)訪問,所以多線程操作同一個鏈表的話可能會出現問題。解決這種問題的一種方法是使用線程同步,在訪問的時候加鎖。不過這樣一來編程就會變的很繁瑣,也會出現很多不可預料的問題。在Java中為我們提供了JUC類庫,里面有很多可以用于多線程訪問的類型。今天介紹的就是其中的ConcurrentLinkedDeque類型。
0x01 ConcurrentLinkedDeque概述ConcurrentLinkedDeque是從Java SE 7/JDK 7開始提供的一種Non-blocking Thread-safe List(非阻塞式線程安全鏈表),隸屬于java.util.concurrent包。其類原型為:
public class ConcurrentLinkedDequeextends AbstractCollection implements Deque , Serializable
在API簡介中,對ConcurrentLinkedDeque的表述是:
An unbounded concurrent deque based on linked nodes. Concurrent insertion, removal, and access operations execute safely across multiple threads. A ConcurrentLinkedDeque is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.
大概意思是,ConcurrentLinkedDeque是一種基于鏈接節點的無限并發鏈表。可以安全地并發執行插入、刪除和訪問操作。當許多線程同時訪問一個公共集合時,ConcurrentLinkedDeque是一個合適的選擇。和大多數其他并發的集合類型一樣,這個類不允許使用空元素。
聽起來很美好,但是在使用過程中還要注意一個元素數量的問題,在API文檔中這樣表述:
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.
大概意思是:與大多數集合類型不同,其size方法不是一個常量操作。因為鏈表的異步性質,確定當前元素的數量需要遍歷所有的元素,所以如果在遍歷期間有其他線程修改了這個集合,size方法就可能會報告不準確的結果。
同時,對鏈表的批量操作也不是一個原子操作,在使用的時候要注意,在API文檔中這樣表述:
Bulk operations that add, remove, or examine multiple elements, such as addAll(java.util.Collection extends E>), removeIf(java.util.function.Predicate super E>) or forEach(java.util.function.Consumer super E>), are not guaranteed to be performed atomically.
大概意思是,批量的操作:包括添加、刪除或檢查多個元素,比如addAll(java.util.Collection extends E>)、removeIf(java.util.function.Predicate super E>)或者removeIf(java.util.function.Predicate super E>)或forEach(java.util.function.Consumer super E>)方法,這個類型并不保證以原子方式執行。由此可見如果想保證原子訪問,不得使用批量操作的方法。
瀏覽API的介紹,我們對這個類型有了大體上的了解。實際上ConcurrentLinkedDeque本質上也是一個鏈表,對這個類型的基本操作也和基本的鏈表一樣,基本上就是“創建、插入、刪除、遍歷、清空”等,下面直接借助具體的例子來表述。
0x02 ConcurrentLinkedDeque源代碼解析在本節我將從普通鏈表的實現的角度解析這個非阻塞的線程安全鏈表的實現。首先對于鏈表而言一定是由結點構成的,定義如下:
static final class Node{ // 結點的前繼結點,Java沒有指針,但是普通對象就相當于指針了。 volatile Node prev; // 存儲的泛型元素。 volatile E item; // 結點的后繼結點 volatile Node next; }
其次是該類的一些公有方法的實現,這些方法大多是實現的java.util.Deque
檢查鏈表中是否包含給定的元素
/** * Returns {@code true} if this deque contains the specified element. * More formally, returns {@code true} if and only if this deque contains * at least one element {@code e} such that {@code o.equals(e)}. * * @param o element whose presence in this deque is to be tested * @return {@code true} if this deque contains the specified element */ public boolean contains(Object o) { //如果是空對象直接返回,因為不允許存儲空對象 if (o != null) { //比較經典的for循環結構了,從首個結點開始依次尋找。 for (Nodep = first(); p != null; p = succ(p)) { final E item; //如果結點元素不為空并且還是我們要的元素,就返回true if ((item = p.item) != null && o.equals(item)) return true; } } return false; }
檢查鏈表是否為空
/** * Returns {@code true} if this collection contains no elements. * * @return {@code true} if this collection contains no elements */ public boolean isEmpty() { //直接檢查能否取到首元素即可,下面會講解peekFirst()方法的實現 return peekFirst() == null; }
返回鏈表中包含的元素個數
/** * Returns the number of elements in this deque. If this deque * contains more than {@code Integer.MAX_VALUE} elements, it * returns {@code Integer.MAX_VALUE}. * *Beware that, unlike in most collections, this method is * NOT a constant-time operation. Because of the * asynchronous nature of these deques, determining the current * number of elements requires traversing them all to count them. * Additionally, it is possible for the size to change during * execution of this method, in which case the returned result * will be inaccurate. Thus, this method is typically not very * useful in concurrent applications. * * @return the number of elements in this deque */ public int size() { //這里應該是Java中的標簽(label)語句的用法,我們平時編程比較少用到 restartFromHead: for (;;) { //count計數器 int count = 0; //從首結點 for (Node
p = first(); p != null;) { //如果不是空結點 if (p.item != null) //如果count加1之后達到了最大整數值就退出 if (++count == Integer.MAX_VALUE) break; // @see Collection.size() //如果p的下一個結點仍然為p,則從頭開始 if (p == (p = p.next)) continue restartFromHead; } return count; } }
獲得鏈表首結點
public E peekFirst() { //從第一個結點開始循環,找到第一個非空的結點 for (Nodep = first(); p != null; p = succ(p)) { final E item; if ((item = p.item) != null) return item; } return null; }
獲得鏈表尾結點
public E peekLast() { //從最后一個結點開始,找到第一個非空的結點 for (Nodep = last(); p != null; p = pred(p)) { final E item; if ((item = p.item) != null) return item; } return null; }
下面介紹一些protected方法和private方法。這里僅講解插入到首結點和插入到尾結點的方法,其他方法思路大致相同,這里就不再介紹了。
/** * Links e as first element. */ private void linkFirst(E e) { // 新建一個結點,并且要求結點不為空。 final NodenewNode = newNode(Objects.requireNonNull(e)); restartFromHead: for (;;) //從頭結點開始遍歷鏈表 for (Node h = head, p = h, q;;) { //如果p不是第一個結點則應該退回第一個結點 if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. p = (h != (h = head)) ? h : q; else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p is first node //將新結點的下個結點設為p NEXT.set(newNode, p); // CAS piggyback //設置新結點的上個結點 if (PREV.compareAndSet(p, null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != h) // hop two nodes at a time; failure is OK //調整頭結點 HEAD.weakCompareAndSet(this, h, newNode); return; } // Lost CAS race to another thread; re-read prev } } }
/** * Links e as last element. */ private void linkLast(E e) { // 新建一個結點,并且要求結點不為空。 final NodenewNode = newNode(Objects.requireNonNull(e)); restartFromTail: for (;;) for (Node t = tail, p = t, q;;) { //如果p不是最后一個結點則應該退回最后一個結點 if ((q = p.next) != null && (q = (p = q).next) != null) // Check for tail updates every other hop. // If p == q, we are sure to follow tail instead. p = (t != (t = tail)) ? t : q; else if (p.prev == p) // NEXT_TERMINATOR continue restartFromTail; else { // p is last node //將新結點的上個結點設為p PREV.set(newNode, p); // CAS piggyback if (NEXT.compareAndSet(p, null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != t) // hop two nodes at a time; failure is OK //調整尾結點 TAIL.weakCompareAndSet(this, t, newNode); return; } // Lost CAS race to another thread; re-read next } } }
果然Java庫中的代碼的實現還是比較復雜的,這次只是粗略的了解。有些地方還是不太理解,所以還需要進一步的學習。
0x03 ConcurrentLinkedDeque在實際中的應用不多做解釋了,直接上應用。
package cn.xiaolus.cld; import java.util.concurrent.ConcurrentLinkedDeque; public class CLDMain { private static ConcurrentLinkedDequecld = new ConcurrentLinkedDeque<>(); public static void main(String[] args) { int numThread = Runtime.getRuntime().availableProcessors(); Thread[] threads = new Thread[numThread]; for (int i = 0; i < threads.length; i++) { (threads[i] = new Thread(addTask(), "Thread "+i)).start(); } } public static Runnable addTask() { return new Runnable() { @Override public void run() { int num = Runtime.getRuntime().availableProcessors(); for (int i = 0; i < num; i++) { StringBuilder item = new StringBuilder("Item ").append(i); cld.addFirst(item.toString()); callbackAdd(Thread.currentThread().getName(), item); } callbackFinish(Thread.currentThread().getName()); } }; } public static void callbackAdd(String threadName, StringBuilder item) { StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item); System.out.println(builder); } public static void callbackFinish(String threadName) { StringBuilder builder = new StringBuilder(threadName).append(" has Finished"); System.out.println(builder); System.out.println(new StringBuilder("CurrentSize ").append(cld.size())); } }
程序的一次輸出為:
Thread 0 added :Item 0 Thread 0 added :Item 1 Thread 0 added :Item 2 Thread 0 added :Item 3 Thread 0 has Finished CurrentSize 6 Thread 1 added :Item 0 Thread 2 added :Item 0 Thread 2 added :Item 1 Thread 2 added :Item 2 Thread 2 added :Item 3 Thread 1 added :Item 1 Thread 1 added :Item 2 Thread 2 has Finished Thread 1 added :Item 3 Thread 1 has Finished CurrentSize 13 CurrentSize 13 Thread 3 added :Item 0 Thread 3 added :Item 1 Thread 3 added :Item 2 Thread 3 added :Item 3 Thread 3 has Finished CurrentSize 16
該程序實現了多線程并發添加大量元素到一個公共的鏈表,剛好是ConcurrentLinkedDeque的典型使用場景。同時也驗證了上面的說法,即size()方法需要遍歷鏈表,可能返回錯誤的結果。
源代碼鏈接:Github - xiaolulwr (路偉饒) / JUC-Demo。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68454.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:和方法會一直阻塞調用線程,直到線程被中斷或隊列狀態可用和方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態可用。 showImg(https://segmentfault.com/img/bVbgyPy?w=1191&h=670); 本文首發于一世流云專欄:https://segmentfault.com/blog... 一、引言 從本節開始,我們將介紹juc-collectio...
摘要:有三種狀態運行關閉終止。類類,提供了一系列工廠方法用于創建線程池,返回的線程池都實現了接口。線程池的大小一旦達到最大值就會保持不變,在提交新任務,任務將會進入等待隊列中等待。此線程池支持定時以及周期性執行任務的需求。 這是java高并發系列第19篇文章。 本文主要內容 介紹Executor框架相關內容 介紹Executor 介紹ExecutorService 介紹線程池ThreadP...
摘要:在之前,除了類外,并沒有其它適合并發環境的棧數據結構。作為雙端隊列,可以當作棧來使用,并且高效地支持并發環境。 showImg(https://segmentfault.com/img/bVbguF7?w=1280&h=853); 本文首發于一世流云專欄:https://segmentfault.com/blog... 一、引言 在開始講ConcurrentLinkedDeque之前...
摘要:方法由兩個參數,表示期望的值,表示要給設置的新值。操作包含三個操作數內存位置預期原值和新值。如果處的值尚未同時更改,則操作成功。中就使用了這樣的操作。上面操作還有一點是將事務范圍縮小了,也提升了系統并發處理的性能。 這是java高并發系列第21篇文章。 本文主要內容 從網站計數器實現中一步步引出CAS操作 介紹java中的CAS及CAS可能存在的問題 悲觀鎖和樂觀鎖的一些介紹及數據庫...
閱讀 1571·2021-09-24 10:38
閱讀 1498·2021-09-22 15:15
閱讀 3059·2021-09-09 09:33
閱讀 905·2019-08-30 11:08
閱讀 638·2019-08-30 10:52
閱讀 1253·2019-08-30 10:52
閱讀 2344·2019-08-28 18:01
閱讀 520·2019-08-28 17:55