摘要:前言這篇主要來講解多線程中一個非常經(jīng)典的設(shè)計模式包括它的基礎(chǔ)到拓展希望大家能夠有所收獲生產(chǎn)者消費者模式簡述此設(shè)計模式中主要分兩類線程生產(chǎn)者線程和消費者線程生產(chǎn)者提供數(shù)據(jù)和任務(wù)消費者處理數(shù)據(jù)和任務(wù)該模式的核心就是數(shù)據(jù)和任務(wù)的交互點共享內(nèi)存緩
前言
這篇主要來講解多線程中一個非常經(jīng)典的設(shè)計模式
包括它的基礎(chǔ)到拓展
希望大家能夠有所收獲
此設(shè)計模式中主要分兩類線程:生產(chǎn)者線程和消費者線程
生產(chǎn)者提供數(shù)據(jù)和任務(wù)
消費者處理數(shù)據(jù)和任務(wù)
該模式的核心就是數(shù)據(jù)和任務(wù)的交互點:共享內(nèi)存緩存區(qū)
下面給出簡單易懂的一張圖:
使用BlockingQueue來做緩沖區(qū)是非常合適的
通過BlockingQueue來理解生產(chǎn)者消費者模式
首先我們要知道BlockingQueue是什么?
它是一個實現(xiàn)接口,有很多實現(xiàn)類,比如:
ArrayBlockingQueue:前面講過,這個隊列適合做有界隊列,固定線程數(shù)
LinkedBlockingQueue:它適合做無界隊列
......
以ArrayBlockingQueue為例
它在內(nèi)部放置了一個對象數(shù)組:
final Object[] items;
通過items數(shù)組來進行元素的存取
1(存).向隊列中壓入一個元素:
.offer():如果隊列滿了,返回false
.put():將元素壓入隊列末尾,如果隊列滿了,它就會一直等待
2(取).向隊列中彈出元素(從頭部彈出):
.poll():如果隊列為空,返回null
.take():如果隊列為空,繼續(xù)等待,知道隊列中有元素
了解了上面這些基礎(chǔ)后,我們來看下實際操作是怎樣的
在開始之前我們要有一個Entity類,只存一個long類型的value值進去:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
有了這個數(shù)據(jù)模型,看下最后的執(zhí)行main方法:
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); //建立線程池 BlockingQueueblockingQueue = new ArrayBlockingQueue (10); //建立緩存隊列 for (int i=0;i<3;i++){ Producer i = new Producer(queue); executor.execute(i); } //制造三個生產(chǎn)線程 for (int j=0;j<3;j++){ Consumer j = new Consumer(queue); executor.execute(j); } //制造三個消費線程 Thread.sleep(10000); for (int i=0;i<3;i++){ i.stop(); } //停止生產(chǎn) Thread.sleep(5000); executor.shutdown(); }
這里只給出Main,大家可以通過代碼簡單理解使用BlockingQueue做緩沖區(qū)的過程
沒有給出生產(chǎn)者和消費者的具體線程實現(xiàn)類,除了博主比較懶之外,還有是因為使用BlockingQueue做緩沖區(qū)并不推薦使用
雖然BlockingQueue是個不錯的選擇,但它使用了鎖和阻塞來保證線程間的同步,并不具備良好的并發(fā)性能
下面講解一種具有高性能的共享緩沖區(qū)
我們知道BlockingQueue隊列的性能不是特別優(yōu)越
而之前講到過ConcurrentLinkedQueue是一個高性能隊列,因為它使用了大量的CAS操作
同理,如果我們利用CAS操作實現(xiàn)生產(chǎn)者-消費者模式,性能就可以得到客觀的提升
但是大量的CAS操作自己實現(xiàn)起來非常困難
所以推薦使用Disruptor框架
實際工作還是得使用成熟的框架,Disruptor是一款高效的無鎖內(nèi)存隊列
它不像傳統(tǒng)隊列有head和tail指針來操控入列和出列
而是實現(xiàn)了一個固定大小的環(huán)形隊列(RingBuffer),來看下實際模型圖:
生產(chǎn)者向緩沖區(qū)寫入數(shù)據(jù),消費者從緩沖區(qū)讀取數(shù)據(jù),大家都使用了CAS操作
而且由于是環(huán)形隊列的原因,可以做到完全的內(nèi)存復(fù)用
從而大大減少系統(tǒng)分配空間以及回收空間的額外開銷
那么這個框架怎么使用呢?
1.導入包(博主使用了Maven依賴,不同版本大同小異):
com.lmax disruptor 3.3.2
2.依舊創(chuàng)建一個entity類:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
3.還要寫一個Factory類,細心的同學會看到環(huán)形隊列是固定大小的
這個Factory會在Disruptor實例對象構(gòu)造時,構(gòu)造所有緩沖區(qū)中的對象實例
public class DataFactory implements EventFactory{ @Override public Object newInstance() { return new MyData(); } }
4.生產(chǎn)者(具體每行代碼的作用都已經(jīng)注釋):
public class Producers { private final RingBufferringBuffer; //創(chuàng)建環(huán)形隊列(環(huán)形緩沖區(qū)) public Producers(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; //將ringBuffer與Producers綁定 } public void putData(ByteBuffer byteBuffer){ //此方法將產(chǎn)生的數(shù)據(jù)推入緩沖區(qū) long sequeue = ringBuffer.next(); //通過.next()方法得到ringBuffer的下一個節(jié)點,并且賦值給sequeue MyData event = ringBuffer.get(sequeue); //將mydata數(shù)據(jù)存入到下一個節(jié)點 event.setValue(byteBuffer.getLong(0)); //mydata的值有ByteBuffer參數(shù)帶入 ringBuffer.publish(sequeue); //將sequeue節(jié)點內(nèi)的數(shù)據(jù)發(fā)布 } }
5.消費者:
public class Consumers implements WorkHandler{ @Override public void onEvent(MyData myData) throws Exception { System.out.println("當前線程為:"+Thread.currentThread().getId()+"線程,它處理的數(shù)據(jù)是:"+myData.getValue()); } }
6.執(zhí)行函數(shù):
public class RunTest { public static void main(String[] args) throws InterruptedException { Executor executor = Executors.newCachedThreadPool(); //創(chuàng)建線程池 DataFactory dataFactory = new DataFactory(); //創(chuàng)建Factory實例 int bufferSize = 1024; //設(shè)置緩存區(qū)大小為1024(必須是2的整數(shù)次冪) Disruptordisruptor = new Disruptor ( dataFactory, bufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy() ); disruptor.handleEventsWithWorkerPool( new Consumers(), new Consumers(), new Consumers(), new Consumers() ); disruptor.start(); //Disruptor啟動 RingBuffer ringBuffer = disruptor.getRingBuffer(); //實例化環(huán)形隊列并與Disruptor綁定 Producers producers = new Producers(ringBuffer); //實例化生產(chǎn)者并綁定ringBuffer ByteBuffer byteBuffe = ByteBuffer.allocate(8); //創(chuàng)建一個容量為256字節(jié)的ByteBuffer for (long n = 0;true;n++){ byteBuffe.putLong(0,n); producers.putData(byteBuffe); Thread.sleep(100); System.out.println("add data "+n); } } }
我們來看下執(zhí)行結(jié)果:
當前線程為:13線程,它處理的數(shù)據(jù)是:1059 add data 1059 當前線程為:11線程,它處理的數(shù)據(jù)是:1060 add data 1060 當前線程為:10線程,它處理的數(shù)據(jù)是:1061 add data 1061 當前線程為:12線程,它處理的數(shù)據(jù)是:1062 add data 1062 當前線程為:13線程,它處理的數(shù)據(jù)是:1063 add data 1063 當前線程為:11線程,它處理的數(shù)據(jù)是:1064 add data 1064 當前線程為:10線程,它處理的數(shù)據(jù)是:1065
可以看出,因為我無限的讓生產(chǎn)線程生產(chǎn)數(shù)據(jù),而RingBuffer中那十幾條消費線程不停的消費數(shù)據(jù)
此外Disruptor不止CAS操作,還提供了四種等待策略讓消費者監(jiān)控緩沖區(qū)的信息:
1.BlockingWaitStrategy:默認策略,最節(jié)省CPU,但在高并發(fā)下性能表現(xiàn)最糟糕
2.SleepingWaitStrategy:等待數(shù)據(jù)時自旋等待,不成功會使用LockSupport方法阻塞自己,通常用于異步日志
3.YieldWaitStrategy:用于低延時場合,在內(nèi)部執(zhí)行Thread.yield()死循環(huán)
4.BusySpinWaitStrategy:消費線程進行死循環(huán)監(jiān)控緩沖區(qū),吃掉所有CPU資源
除了CAS操作,消費者等待策略,Disruptor還使用CPU Cache的優(yōu)化來進行優(yōu)化
根據(jù)Disruptor官方報道:Disruptor的性能比BlockingQueuez至少高一倍以上!
以上便是生產(chǎn)者消費者模式的應(yīng)用
謝謝閱讀,記得點關(guān)注看更新
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/70719.html
摘要:今天就先到這里,大家可以看看這些內(nèi)容的拓展記得點關(guān)注看更新,謝謝閱讀 前言 這是一個長篇博客,希望大家關(guān)注我并且一起學習java高并發(fā)廢話不多說,直接開始 并行和并發(fā) 并行:多個線程同時處理多個任務(wù)并發(fā):多個線程處理同個任務(wù),不一定要同時 下面用圖來描述并行和并發(fā)的區(qū)別:(實現(xiàn)和虛線表示兩個不同的線程) showImg(https://segmentfault.com/img/bVYT...
摘要:可以用代替可以用代替定義的對象的值是不可變的今天就先到這里,大家可以看看這些內(nèi)容的拓展記得點關(guān)注看更新,謝謝閱讀 前言 java高并發(fā)第二篇講的是java線程的基礎(chǔ)依舊不多說廢話 線程和進程 進程是操作系統(tǒng)運行的基礎(chǔ),是一個程序運行的實體,windows上打開任務(wù)管理器就能看到進程線程是輕量級的進程,是程序執(zhí)行的最小單位,是在進程這個容器下進行的 線程基本操作 新建一個線程類有兩種方式...
摘要:前言今天講的多線程的同步控制直接進入正題重入鎖重入鎖可以完全代替,它需要類來實現(xiàn)下面用一個簡單的例子來實現(xiàn)重入鎖以上代碼打印出來的是,可以說明也實現(xiàn)了線程同步它相比更加靈活,因為重入鎖實現(xiàn)了用戶自己加鎖,自己釋放鎖記得一定要釋放,不然其他線 前言 今天講的多線程的同步控制直接進入正題 ReentrantLock重入鎖 重入鎖可以完全代替synchronized,它需要java.util...
摘要:前言本篇主要講解如何去優(yōu)化鎖機制或者克服多線程因為鎖可導致性能下降的問題線程變量有這樣一個場景,前面是一大桶水,個人去喝水,為了保證線程安全,我們要在杯子上加鎖導致大家輪著排隊喝水,因為加了鎖的杯子是同步的,只能有一個人拿著這個唯一的杯子喝 前言 本篇主要講解如何去優(yōu)化鎖機制或者克服多線程因為鎖可導致性能下降的問題 ThreadLocal線程變量 有這樣一個場景,前面是一大桶水,10個...
閱讀 1668·2021-11-12 10:35
閱讀 1614·2021-08-03 14:02
閱讀 2681·2019-08-30 15:55
閱讀 2027·2019-08-30 15:54
閱讀 756·2019-08-30 14:01
閱讀 2427·2019-08-29 17:07
閱讀 2252·2019-08-26 18:37
閱讀 3031·2019-08-26 16:51