摘要:的并發(fā)容器并發(fā)集合這是一個高效的并發(fā)你可以把它理解為一個線程安全的。可以看作一個線程安全的這是一個接口,內(nèi)部通過鏈表數(shù)組等方式實現(xiàn)了這個接口。
3. JDK的并發(fā)容器
并發(fā)集合
ConcurrentHashMap:這是一個高效的并發(fā)HashMap.你可以把它理解為一個線程安全的HashMap。
CopyOnWriteArrayList:這是一個List,從名字看就知道它和ArrayList是一族的。在讀多寫少的場合,這個List的性能非常好,遠遠優(yōu)于Vector。
ConcurrentLinkedQueue:高效的并發(fā)隊列,使用鏈表實現(xiàn)。可以看作一個線程安全的LinkedList.
BlockingQueue:這是一個接口,JDK內(nèi)部通過鏈表、數(shù)組等方式實現(xiàn)了這個接口。表示阻塞隊列,非常適合作為數(shù)據(jù)共享的通道。
ConcurrentSkipListMap:跳表的實現(xiàn)。這是一個Map,使用跳表的數(shù)據(jù)結(jié)構(gòu)進行快速查找。
線程安全的HashMap
可用Collections類來使普通HashMap轉(zhuǎn)為線程安全的map
Collections.synchronizedMap(new HashMap())
private static class SynchronizedMapimplements Map , Serializable { private static final long serialVersionUID = 1978198479659022715L; private final Map m; // 傳入的map final Object mutex; // 鎖資源對象,對map的任何操作都會鎖該對象 SynchronizedMap(Map m) { this.m = Objects.requireNonNull(m); mutex = this; } SynchronizedMap(Map m, Object mutex) { this.m = m; this.mutex = mutex; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue(Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get(Object key) { synchronized (mutex) {return m.get(key);} } public V put(K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove(Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll(Map extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear() { synchronized (mutex) {m.clear();} } ....... //省略 }
List的線程安全
Collections.synchronizedList(new LinkedList())
高效讀寫隊列ConcurrentLinkedQueue類
高并發(fā)環(huán)境中性能最好的隊列,主要是利用CAS進行無鎖操作,非阻塞隊列
首先我們來看下它的Node節(jié)點:
private static class Node{ volatile E item; //當(dāng)前對象 volatile Node next; //下一個對象,以此來構(gòu)建鏈表 Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { //(期望值,設(shè)置目標(biāo)值),cas操作 return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
ConcurrentLinkedQueue類內(nèi)部的tail指針更新并不是實時的,可能存在拖延現(xiàn)象,每次更新跳躍兩個元素,如下圖:
然后再看一下新增節(jié)點offer()方法:
public boolean offer(E e) { checkNotNull(e); //非空校驗 final NodenewNode = new Node (e); for (Node t = tail, p = t;;) { //for循環(huán) 無出口,知道設(shè)置成功 Node q = p.next; //獲取tail節(jié)點的next對象 if (q == null) { //第一次插入,p.next對象為空 // p 為最后一個節(jié)點 if (p.casNext(null, newNode)) { //插入新元素,此時p=t //每兩次更新tail if (p != t) casTail(t, newNode); return true; } // cas競爭失敗,再次循環(huán) } else if (p == q) //遇到哨兵 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. p = (p != t && t != (t = tail)) ? t : q; //t!=(t=tail) !=并不是原子操作,先取左邊t的值,再取右邊t=tail } }
高效讀取:不變模式下的CopyOnWriteArrayList類
使用場景:讀操作遠遠大于寫操作,讀操作越快越好,寫操作慢一些也沒事
特點:讀取不用加鎖,寫入不會阻塞讀取操作,只有寫入和寫入需要同步等待,讀性能大幅提升
原理:寫入時進行一次自我復(fù)制,修改內(nèi)容寫入副本中,寫完后,再用副本內(nèi)容替代原來的數(shù)據(jù)
public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); //進行復(fù)制 newElements[len] = e; //新數(shù)組代替老數(shù)組 setArray(newElements); return true; } finally { lock.unlock(); } }
數(shù)據(jù)共享通道:BlockingQueue
BlockingQueue是接口,實現(xiàn)類有ArrayBlockingQueue以及LinkedBlockingQueue.當(dāng)BlockingQueue為空時,會等待,當(dāng)有消息進入隊列后,自動喚醒線程,Condition.await()和Condition.signal(),祥見上一篇 Condition重入鎖
注意: 一般生產(chǎn)者消費者模型中,往往采用BlockingQueue而不是ConcurrentLinkedQueeu,因為BlockingQueue帶有阻塞功能,可以控制生產(chǎn)消費者的速率(await和signal)
隨機數(shù)據(jù)結(jié)構(gòu):跳表
使用環(huán)境:高并發(fā)環(huán)境
特點:快速查找,類似平衡樹,平衡樹插入和刪除往往會引起一次全局調(diào)整,而跳表只需局部調(diào)整,且在高并發(fā)環(huán)境下,平衡樹需要全局鎖,而跳表只需要局部;隨機算法,跳表的本質(zhì)是維護多個鏈表;有序性
原理:如下
用于測試方法的執(zhí)行效率,精度達毫秒級.
maven:
org.openjdk.jmh jmh-core 1.18 org.openjdk.jmh jmh-generator-annprocess 1.18 provided
基本概念
1.模式(Model): model表示JMH的測量方式和角度,共四種 Throughput:整體吞吐量,一秒可執(zhí)行多少次 AverageTime:調(diào)用平均時間 SampleTime:隨機取樣,最后輸出取樣結(jié)果,如"99%的調(diào)用在xxx毫秒內(nèi)" SingleShotTime:只運行一次,無warmup(熱身),用于測試啟動時的性能 2.迭代(Iteration) 迭代表示一次測試單位,一般為1秒 3.預(yù)熱(warmup) 預(yù)熱是為了測試在JIT編譯后的性能 4.狀態(tài)(State) 指測試范圍,一種是線程范,一個線程一個對象.另外一種是基準測試范圍(Benchmark),多個線程共享一個實例 5.配置類(Options) 指定一些參數(shù),如指定測試類(include),使用進程個數(shù)(fork),預(yù)熱迭代次數(shù)(warmuoIterations)
代碼
@BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MICROSECONDS) @State(Scope.Benchmark) public class ListTest { CopyOnWriteArrayList smallCopyOnWriteList = new CopyOnWriteArrayList(); ConcurrentLinkedQueue smallConcurrentList = new ConcurrentLinkedQueue(); CopyOnWriteArrayList bigCopyOnWriteList = new CopyOnWriteArrayList(); ConcurrentLinkedQueue bigConcurrentList = new ConcurrentLinkedQueue(); @Setup public void setup() { for (int i = 0; i < 10; i++) { smallCopyOnWriteList.add(new Object()); smallConcurrentList.add(new Object()); } for (int i = 0; i < 1000; i++) { bigCopyOnWriteList.add(new Object()); bigCopyOnWriteList.add(new Object()); } } @Benchmark public void copyOnWriteGet() { smallCopyOnWriteList.get(0); } @Benchmark public void copyOnWriteSize() { smallCopyOnWriteList.size(); } @Benchmark public void concurrentListGet() { smallConcurrentList.peek(); } @Benchmark public void concurrentListSize() { smallConcurrentList.size(); } @Benchmark public void smallCopyOnWriteWrite() { smallCopyOnWriteList.add(new Object()); smallCopyOnWriteList.remove(0); } @Benchmark public void smallConcurrentListWrite() { smallConcurrentList.add(new Object()); smallConcurrentList.remove(0); } @Benchmark public void bigCopyOnWriteWrite() { bigCopyOnWriteList.add(new Object()); bigCopyOnWriteList.remove(0); } @Benchmark public void bigConcurrentListWrite() { bigConcurrentList.offer(new Object()); bigConcurrentList.remove(0); } public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder().include(ListTest.class.getSimpleName()).forks(1).warmupIterations(5) .measurementIterations(5).threads(4).build(); new Runner(opt).run(); } }
性能思考
hashmap和concurrenthashmap的對比
單線程下,hashmap的get方法比concurrenthashmap略慢,size()方法卻快得多,同步hashmap,size方法僅比concurrenthashmap略快一點
CopyOnWriteArrayList類與ConcurrentLinkedQueue類
當(dāng)元素總量不大時,絕大部分場景中CopyOnWriteArrayList性能要優(yōu)于ConcurrentLinkedQueue
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76116.html
摘要:只要線程池未關(guān)閉該策略直接在調(diào)用者線程中運行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會真的丟棄任務(wù)但是任務(wù)提交線程的性能極有可能會急劇下降。任務(wù)并嘗試再次提交當(dāng)前任務(wù)。 1. 同步控制 synchronized的擴展:重入鎖 同步控制不僅有synchronized配合object.wait()以及object.notify(),也有增強版的reentrantLock(重入鎖) public cl...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術(shù)專家我看過哪些技術(shù)類書籍。 大家好,我是...
摘要:有時候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會有線程安全的需求。它可以讓你在不改動或者極少改動原有代碼的基礎(chǔ)上,讓普通的變量也享受操作帶來的線程安全性,這樣你可以修改極少的代碼,來獲得線程安全的保證。 有時候,由于初期考慮不周,或者后期的需求變化,一些普通變量可能也會有線程安全的需求。如果改動不大,我們可以簡單地修改程序中每一個使用或者讀取這個變量的地方。但顯然,這...
閱讀 1709·2021-09-22 10:02
閱讀 1937·2021-09-02 15:40
閱讀 2840·2019-08-30 15:55
閱讀 2250·2019-08-30 15:44
閱讀 3597·2019-08-30 13:18
閱讀 3230·2019-08-30 11:00
閱讀 1951·2019-08-29 16:57
閱讀 570·2019-08-29 16:41