摘要:所有示例代碼請見下載于基本概念并發同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內存這些線程是同時存在的,每個線程都處于執行過程中的某個狀態,如果運行在多核處理器上此時,程序中的每個線程都
所有示例代碼,請見/下載于
https://github.com/Wasabi1234...
同時擁有兩個或者多個線程,如果程序在單核處理器上運行多個線程將交替地換入或者換出內存,這些線程是同時“存在"的,每個線程都處于執行過程中的某個狀態,如果運行在多核處理器上,此時,程序中的每個線程都將分配到一個處理器核上,因此可以同時運行.
1.2 高并發( High Concurrency)互聯網分布式系統架構設計中必須考慮的因素之一,通常是指,通過設計保證系統能夠同時并行處理很多請求.
1.3 區別與聯系并發: 多個線程操作相同的資源,保證線程安全,合理使用資源
高并發:服務能同時處理很多請求,提高程序性能
2 CPU 2.1 CPU 多級緩存為什么需要CPU cache
CPU的頻率太快了,快到主存跟不上
如此,在處理器時鐘周期內,CPU常常需要等待主存,浪費資源。所以cache的出現,是為了緩解CPU和內存之間速度的不匹配問題(結構:cpu-> cache-> memory ).
CPU cache的意義
1) 時間局部性
如果某個數據被訪問,那么在不久的將來它很可能被再次訪問
2) 空間局部性
如果某個數據被訪問,那么與它相鄰的數據很快也可能被訪問
2.2 緩存一致性(MESI)用于保證多個 CPU cache 之間緩存共享數據的一致
M-modified被修改
該緩存行只被緩存在該 CPU 的緩存中,并且是被修改過的,與主存中數據是不一致的,需在未來某個時間點寫回主存,該時間是允許在其他CPU 讀取主存中相應的內存之前,當這里的值被寫入主存之后,該緩存行狀態變為 E
E-exclusive獨享
緩存行只被緩存在該 CPU 的緩存中,未被修改過,與主存中數據一致
可在任何時刻當被其他 CPU讀取該內存時變成 S 態,被修改時變為 M態
S-shared共享
該緩存行可被多個 CPU 緩存,與主存中數據一致
I-invalid無效
亂序執行優化
處理器為提高運算速度而做出違背代碼原有順序的優化
并發的優勢與風險 3 項目準備 3.1 項目初始化
以上二者通常和線程池搭配
下面開始做并發模擬
package com.mmall.concurrency; import com.mmall.concurrency.annoations.NotThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng * @date 18/4/1 */ @Slf4j @NotThreadSafe public class ConcurrencyTest { /** * 請求總數 */ public static int clientTotal = 5000; /** * 同時并發執行的線程數 */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { //定義線程池 ExecutorService executorService = Executors.newCachedThreadPool(); //定義信號量,給出允許并發的線程數目 final Semaphore semaphore = new Semaphore(threadTotal); //統計計數結果 final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); //將請求放入線程池 for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { //信號量的獲取 semaphore.acquire(); add(); //釋放 semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); //關閉線程池 executorService.shutdown(); log.info("count:{}", count); } /** * 統計方法 */ private static void add() { count++; } }
運行發現結果隨機,所以非線程安全
4線程安全性 4.1 線程安全性當多個線程訪問某個類時,不管運行時環境采用何種調度方式或者這些進程將如何交替執行,并且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那么就稱這個類是線程安全的
4.2 原子性 4.2.1 Atomic 包AtomicXXX:CAS,Unsafe.compareAndSwapInt
提供了互斥訪問,同一時刻只能有一個線程來對它進行操作
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; /** * @author shishusheng */ @Slf4j @ThreadSafe public class AtomicExample2 { /** * 請求總數 */ public static int clientTotal = 5000; /** * 同時并發執行的線程數 */ public static int threadTotal = 200; /** * 工作內存 */ public static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { System.out.println(); semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); //主內存 log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
package com.mmall.concurrency.example.atomic; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /** * @author shishusheng * @date 18/4/3 */ @Slf4j @ThreadSafe public class AtomicExample4 { private static AtomicReferencecount = new AtomicReference<>(0); public static void main(String[] args) { // 2 count.compareAndSet(0, 2); // no count.compareAndSet(0, 1); // no count.compareAndSet(1, 3); // 4 count.compareAndSet(2, 4); // no count.compareAndSet(3, 5); log.info("count:{}", count.get()); } }
AtomicReference,AtomicReferenceFieldUpdater
AtomicBoolean
AtomicStampReference : CAS的 ABA 問題
4.2.2 鎖synchronized:依賴 JVM
修飾代碼塊:大括號括起來的代碼,作用于調用的對象
修飾方法: 整個方法,作用于調用的對象
修飾靜態方法:整個靜態方法,作用于所有對象
package com.mmall.concurrency.example.count; import com.mmall.concurrency.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j @ThreadSafe public class CountExample3 { /** * 請求總數 */ public static int clientTotal = 5000; /** * 同時并發執行的線程數 */ public static int threadTotal = 200; public static int count = 0; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count); } private synchronized static void add() { count++; } }
synchronized 修正計數類方法
修飾類:括號括起來的部分,作用于所有對象
子類繼承父類的被 synchronized 修飾方法時,是沒有 synchronized 修飾的!!!
Lock: 依賴特殊的 CPU 指令,代碼實現
4.2.3 對比synchronized: 不可中斷鎖,適合競爭不激烈,可讀性好
Lock: 可中斷鎖,多樣化同步,競爭激烈時能維持常態
Atomic: 競爭激烈時能維持常態,比Lock性能好; 只能同步一
個值
4.3 可見性一個線程對主內存的修改可以及時的被其他線程觀察到
4.3.1 導致共享變量在線程間不可見的原因線程交叉執行
重排序結合線程交叉執行
共享變量更新后的值沒有在工作內存與主存間及時更新
4.3.2 可見性之synchronizedJMM關于synchronized的規定
線程解鎖前,必須把共享變量的最新值刷新到主內存
線程加鎖時,將清空工作內存中共享變量的值,從而使
用共享變量時需要從主內存中重新讀取最新的值(加鎖與解鎖是同一把鎖)
4.3.3 可見性之volatile通過加入內存屏障和禁止重排序優化來實現
對volatile變量寫操作時,會在寫操作后加入一條store
屏障指令,將本地內存中的共享變量值刷新到主內存
對volatile變量讀操作時,會在讀操作前加入一條load
屏障指令,從主內存中讀取共享變量
volatile使用
volatile boolean inited = false; //線程1: context = loadContext(); inited= true; // 線程2: while( !inited ){ sleep(); } doSomethingWithConfig(context)4.4 有序性
一個線程觀察其他線程中的指令執行順序,由于指令重排序的存在,該觀察結果一般雜亂無序
JMM允許編譯器和處理器對指令進行重排序,但是重排序過程不會影響到單線程程序的執行,卻會影響到多線程并發執行的正確性
4.4.1 happens-before 規則 5發布對象package com.mmall.concurrency.example.singleton; import com.mmall.concurrency.annoations.NotThreadSafe; /** * 懶漢模式 -》 雙重同步鎖單例模式 * 單例實例在第一次使用時進行創建 * @author shishusheng */ @NotThreadSafe public class SingletonExample4 { /** * 私有構造函數 */ private SingletonExample4() { } // 1、memory = allocate() 分配對象的內存空間 // 2、ctorInstance() 初始化對象 // 3、instance = memory 設置instance指向剛分配的內存 // JVM和cpu優化,發生了指令重排 // 1、memory = allocate() 分配對象的內存空間 // 3、instance = memory 設置instance指向剛分配的內存 // 2、ctorInstance() 初始化對象 /** * 單例對象 */ private static SingletonExample4 instance = null; /** * 靜態的工廠方法 * * @return */ public static SingletonExample4 getInstance() { // 雙重檢測機制 // B if (instance == null) { // 同步鎖 synchronized (SingletonExample4.class) { if (instance == null) { // A - 3 instance = new SingletonExample4(); } } } return instance; } }
使用Node實現FIFO隊列,可以用于構建鎖或者其他同步裝置的基礎框架
利用了一個int類型表示狀態
使用方法是繼承
子類通過繼承并通過實現它的方法管理其狀態{acquire 和release} 的方法操縱狀態
可以同時實現排它鎖和共享鎖模式(獨占、共享)
同步組件
CountDownLatchpackage com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 指定時間內處理任務 * * @author shishusheng * */ @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }Semaphore用法
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author shishusheng */ @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
package com.mmall.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @author shishusheng */ @Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { // 嘗試獲取一個許可 if (semaphore.tryAcquire()) { test(threadNum); // 釋放一個許可 semaphore.release(); } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }9 線程池 9.1 newCachedThreadPool 9.2 newFixedThreadPool 9.3 newSingleThreadExecutor
看出是順序執行的
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72545.html
摘要:筆記來源并發編程與高并發解決方案并發基礎綜述多級緩存緩存一致性亂序執行優化內存模型規定抽象結構同步八種操作及規則并發的優勢與風險并發與高并發基本概念基本概念并發同時擁有兩個或者多個線程,如果程序在單核處理器上運行,多個線程將交替地換入或者換 筆記來源:【IMOOC】Java并發編程與高并發解決方案 并發基礎 綜述: CPU多級緩存:緩存一致性、亂序執行優化 Java內存模型:JM...
摘要:資源獲取方式根據下面的索引,大家可以選擇自己需要的資源,然后在松哥公眾號牧碼小子后臺回復對應的口令,就可以獲取到資源的百度云盤下載地址。公眾號二維碼如下另外本文會定期更新,松哥有新資源的時候會及時分享給大家,歡迎各位小伙伴保持關注。 沒有一條路是容易的,特別是轉行計算機這條路。 松哥接觸過很多轉行做開發的小伙伴,我了解到很多轉行人的不容易,記得松哥大二時剛剛決定轉行計算機,完全不知道這...
摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報率高。馬上就十一國慶假期了,給小伙伴們分享下,從小白程序員到大廠高級技術專家我看過哪些技術類書籍。 大家好,我是...
摘要:并發表示在一段時間內有多個動作存在。并發帶來的問題在享受并發編程帶來的高性能高吞吐量的同時,也會因為并發編程帶來一些意想不到弊端。并發過程中多線程之間的切換調度,上下文的保存恢復等都會帶來額外的線程切換開銷。 0x01 什么是并發 要理解并發首選我們來區分下并發和并行的概念。 并發:表示在一段時間內有多個動作存在。 并行:表示在同一時間點有多個動作同時存在。 例如:此刻我正在寫博客,但...
摘要:另一個是使用鎖的機制來處理線程之間的原子性。依賴于去實現鎖,因此在這個關鍵字作用對象的作用范圍內,都是同一時刻只能有一個線程對其進行操作的。 線程安全性 定義:當多個線程訪問某個類時,不管運行時環境采用何種調度方式或者這些線程將如何交替執行,并且在主調代碼中不需要任何額外的同步或協同,這個類都能表現出正確的行為,那么就稱這個類是線程安全的。 線程安全性主要體現在三個方面:原子性、可見性...
閱讀 1207·2019-08-30 15:55
閱讀 954·2019-08-30 15:55
閱讀 2150·2019-08-30 15:44
閱讀 2879·2019-08-29 14:17
閱讀 1130·2019-08-29 12:45
閱讀 3301·2019-08-26 10:48
閱讀 3133·2019-08-23 18:18
閱讀 2599·2019-08-23 16:47