摘要:并發設計模式一模式的使用表示線程本地存儲模式。為不同的任務創建不同的線程池,這樣能夠有效的避免死鎖問題。兩階段終止,即將線程的結束分為了兩個階段,第一個階段是一個線程向另一個線程發送終止指令,第二個階段是線程響應終止指令。
Java 并發設計模式 一、Thread Local Storage 模式 1. ThreadLocal 的使用
Thread Local Storage 表示線程本地存儲模式。
大多數并發問題都是由于變量的共享導致的,多個線程同時讀寫同一變量便會出現原子性,可見性等問題。局部變量是線程安全的,本質上也是由于各個線程各自擁有自己的變量,避免了變量的共享。
Java 中使用了 ThreadLocal 來實現避免變量共享的方案。ThreadLocal 保證在線程訪問變量時,會創建一個這個變量的副本,這樣每個線程都有自己的變量值,沒有共享,從而避免了線程不安全的問題。
下面是 ThreadLocal 的一個簡單使用示例:
public class ThreadLocalTest { private static final ThreadLocalthreadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); public static SimpleDateFormat safeDateFormat() { return threadLocal.get(); } public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask task1 = new FutureTask<>(ThreadLocalTest::safeDateFormat); FutureTask task2 = new FutureTask<>(ThreadLocalTest::safeDateFormat); Thread t1 = new Thread(task1); Thread t2 = new Thread(task2); t1.start(); t2.start(); System.out.println(task1.get() == task2.get());//返回false,表示兩個對象不相等 } }
程序中構造了一個線程安全的 SimpleDateFormat ,兩個線程取到的是不同的示例對象,這樣就保證了線程安全。
2. ThreadLocal 原理淺析線程 Thread 類內部有兩個 ThreadLocalMap 類型的變量:
/* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; /* * InheritableThreadLocal values pertaining to this thread. This map is * maintained by the InheritableThreadLocal class. */ ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
其中第二個變量的用途是創建可繼承父線程變量的子線程,只不過這并不常用,主要介紹第一個。
ThreadLocalMap 是一個用于存儲 ThreadLocal 的特殊 HashMap,map 中 key 就是 ThreadLocal,value 是線程變量值。只不過這個 map 并不被 ThreadLocal 持有,而是被 Thread 持有。
當調用 ThreadLocal 類中的 set 方法時,就會創建 Thread 中的 threadLocals 屬性。
//ThreadLocal的set方法 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t);//獲取Thread中的ThreadLocalMap if (map != null) map.set(this, value); else createMap(t, value); }
可以看到,最終的 ThreadLocal 對象和變量值并不是創建在 ThreadLocal 內部,而是 Thread 中的 ThreadLocalMap,ThreadLocal 在這里只是充當了代理的作用。
3. ThreadLocal 內存泄漏問題存儲數據的 TheadLocalMap 被 Thread 持有,而不是 ThreadLocal,主要的原因便是 ThreadLocal 的生命周期比 Thread 要長,如果 ThreadLocal 對象一直存在,那么 map 中的線程就不能被回收,容易導致內存泄漏。
而 Thread 持有 ThreadLocalMap,并且 ThreadLocalMap 對 ThreadLocal 的引用還是弱引用,這樣當線程被回收時,map 也能夠被回收,更加安全。
但是 Java 的這種設計并沒有完全避免內存泄漏問題。如果線程池中的線程存活時間過長,那么其持有的 ThreadLocalMap 一直不會被釋放。ThreadLocalMap 中的 Entry 對其 value 是強引用的(對 ThreadLocal 是弱引用),這樣就算 ThreadLocalMap 的生命周期結束了,但是 value 值并沒有被回收。
解決的辦法便是手動釋放 ThreadLocalMap 中對 value 的強引用,可以使用 TheadLocal 的 remove 方法。在 finally 語句塊中執行。例如下面這個簡單的示例:
public class ThreadLocalTest { private final ThreadLocal二、Immutability 模式 1. 不可變的概念threadLocal = new ThreadLocal<>(); public void test(){ //設置變量值 threadLocal.set(10); try { System.out.println(threadLocal.get()); } finally { //釋放 threadLocal.remove(); } } }
Immutability,即不變模式。可以理解為只要對象一經創建,其狀態是不能夠被改變的,無法進行寫操作。
要實現 Immuatability 模式很簡單,將一個類本身及其所有的屬性都設為 final ,并且方法都是只讀的,需要注意的是,如果類的屬性也是引用類型,那么其對應的類也要滿足不可變的特性。final 應該都很熟悉了,用它來修飾類和方法,分別表示類不可繼承、屬性不可改變。
Java 中具備不可變性的類型包括:
String
final 修飾的基本數據類型
Integer、Long、Double 等基本數據類型的包裝類
Collections 中的不可變集合
具備不可變性的類,如果需要有類似修改這樣的功能,那么它不會像普通的對象一樣改變自己的屬性,而是創建新的對象。
下面是 String 的字符串連接方法 concat() 的源碼,仔細觀察,可以看到最后方法返回的時候,創建了一個新的 Sring 對象:
public String concat(String str) { int otherLen = str.length(); if (otherLen == 0) { return this; } int len = value.length; char buf[] = Arrays.copyOf(value, len + otherLen); str.getChars(buf, len); //創建新的對象 return new String(buf, true); }
而 Collections 工具可以將集合變為不可變的,完全禁止寫、修改等操作。示例如下:
//Collections 中構建不可變集合的方法 Collections.unmodifiableList(); Collections.unmodifiableSet(); Collections.unmodifiableMap(); Collections.unmodifiableSortedSet(); Collections.unmodifiableSortedMap(); --- List2. 對象池list = Arrays.asList(1, 2, 3, 4, 5); //構建不可變集合 List unmodifiableList = Collections.unmodifiableList(list); unmodifiableList.remove(1);//拋出異常
對于一個不可變性的類,如果頻繁的對其進行修改操作,那么一直會創建性新的對象,這樣就比較浪費內存空間了,一種解決辦法便是利用對象池。
原理也很簡單,新建對象的時候,去對象池看是否存在對象,如果存在則直接利用,如果不存在才會創建新的對象,創建之后再將對象放到對象池中。
以長整型的包裝類 Long 為例,它緩存了 -128 到 127 的數據,如果創建的是這個區間的對象,那么會直接使用緩存中的對象。例如 Long 中的 valueOf 方法就用到了這個緩存,然后直接返回:
public static Long valueOf(long l) { final int offset = 128; //在這個區間則直接使用緩存中的對象 if (l >= -128 && l <= 127) { // will cache return LongCache.cache[(int)l + offset]; } return new Long(l); }三、Guarded Suspension 模式 1. Guarded Suspension 實現
Guarded Suspension 意為保護性暫停。一個典型的使用場景是:當客戶端線程 T 發送請求后,服務端這時有大量的請求需要處理,這時候就需要排隊,線程 T 進入等待狀態,直到服務端處理完請求并且返回結果。
Guarded Suspension 的實現很簡單,有一個對象 GuardedObject,其內部有一個屬性,即被保護的對象,還有兩個方法,客戶端調用 get() 方法,如果未獲取到結果,則進入等待狀態,即“保護性暫停”;還有一個 notice() 通知方法,當服務端處理完請求后,調用這個方法,并且喚醒等待中的線程。示意圖如下:
示例代碼如下:
public class GuardedObject{ private T obj; private final Lock lock = new ReentrantLock(); private final Condition finished = lock.newCondition(); //調用方線程獲取結果 T get(){ lock.lock(); try { while (未獲取到結果){ finished.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return obj; } //執行完后通知 void notice(T obj){ lock.lock(); try { this.obj = obj; finished.signalAll(); } finally { lock.unlock(); } } }
從代碼中可以看到,Guarded Suspension 模式本質上就是一種等待-通知機制,只不過使用這種模式,在解決實際的問題的時候,需要根據情況進行程序功能的擴展。
2. 使用示例還是上面提到的那個例子,當客戶端發送請求后,需要等待服務端的響應結果,這時候就可以使用 Guarded Suspension 來實現,下面是代碼示例:
public class SendRequest{ //相當于消息隊列 private final BlockingQueue queue = new ArrayBlockingQueue<>(5); //客戶端發送請求 void send(Request request) throws InterruptedException { //將消息存放至隊列中 queue.put(request); //創建Guarded Suspension模式的對象 GuardedObject guardedObject = GuardedObject.create(request.id); //循環等待,獲取結果 Request res = guardedObject.get(Objects::nonNull); } //服務端處理請求 void handle() throws InterruptedException { //從隊列中獲取請求 Request request = queue.take(); //調用請求對應的GuardedObject,并處理請求 GuardedObject.handleRequest(request.id, request); } //請求類 private static class Request{ private int id; private String content; } }
需要注意的是,這里并不是直接使用 new GuardedObject() 的方式來創建對象,這是因為需要找到每個請求和對象之間的對應關系,所以 GuardedObject 內部使用了一個 map 來保存對象,key 是對應的請求 id。
GuardedObject 類代碼如下:
public class GuardedObject四、Balking 模式{ private T obj; private final Lock lock = new ReentrantLock(); private final Condition finished = lock.newCondition(); private static final ConcurrentHashMap map = new ConcurrentHashMap<>(); //創建對象 public static GuardedObject create(int id){ GuardedObject guardedObject = new GuardedObject(); //保存對象和請求的對應關系 map.put(id, guardedObject); return guardedObject; } //處理請求 public static void handleRequest(int id, Object obj){ GuardedObject guardedObject = map.remove(id); if (guardedObject != null){ //具體的處理邏輯省略 //處理完后通知 guardedObject.notice(obj); } } //調用方線程獲取結果 T get(Predicate p){ lock.lock(); try { while (!p.test(obj)){ finished.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return obj; } //執行完后通知 void notice(T obj){ lock.lock(); try { this.obj = obj; finished.signalAll(); } finally { lock.unlock(); } } }
Balking 模式的典型應用場景是,業務邏輯依賴于某個條件變量的狀態,因此這種模式又可以理解為多線程版本的 if。
public class BalkingTest { private boolean flag = false; public void execute(){ if (!flag){ return; } //具體的執行操作省略 flag = false; } public void test(){ //省略業務代碼若干 flag = true; } }
例如上面這個例子,一段業務邏輯會改變 flag 的值,另一個方法會根據 flag 的值來決定是否繼續執行。
這個程序并不是線程安全的,解決的辦法也很簡單,就是加互斥鎖,然后可以將改變 flag 值的邏輯多帶帶拿出來,如下:
public class BalkingTest { private boolean flag = false; public synchronized void execute(){ if (!flag){ return; } //具體的執行操作省略 flag = false; } public void test(){ //省略業務代碼若干 change(); } public synchronized void change(){ flag = true; } }
Balking 模式一般可以使用互斥鎖來實現,并且可以將對條件變量的改變的邏輯和業務邏輯進行分離,這樣能夠減小鎖的粒度,提升性能。Balking 模式大多應用于需要快速失敗的場景,即當條件變量不滿足,則直接失敗。這也是它和 Guarded Suspension 模式的區別,因為 Guarded Suspension 模式在條件不滿足的時候,會一直等待條件滿足。
五、Worker - Thread 模式Worker Thread 模式,對應到現實世界,類似工廠中的工人做任務,當有任務的時候,工人取出任務執行。
解決的辦法是使用線程池,并且使用一個阻塞隊列來存儲任務,線程池中的線程從隊列中取出任務執行。線程池的使用需要注意幾點:
任務隊列盡量使用有界隊列,避免任務過多造成 OOM。
應該明確指定拒絕策略,可以根據實際情況實現 RejectedExecutionHandler 接口自定義拒絕策略。
應該給線程指定一個有意義的名字,最好和業務相關。
為不同的任務創建不同的線程池,這樣能夠有效的避免死鎖問題。
六、Two - Phase Termination 模式 1. 兩階段終止概念Two - Phase Termination,即兩階段終止,主要是為解決如何正確的終止一個線程,這里說的是一個線程終止另一個線程,而不是線程終止自己。
Java 中的線程提供了一個 stop() 方法用來終止線程,這不過這個方法會直接將線程殺死,風險太高,并且這個方法已經被標記為廢棄,不建議使用了。
兩階段終止,即將線程的結束分為了兩個階段,第一個階段是一個線程 T1 向另一個線程 T2 發送終止指令,第二個階段是線程 T2 響應終止指令。
根據 Java 的線程狀態,線程如果要進入 TERMINATED 狀態則必須先進入 RUNNABLE 狀態,而處于 RUNNABLE 狀態的線程有可能轉換到休眠狀態。
Java 的線程提供了 interrupt() 方法,這個方法的作用便是將線程的狀態從休眠狀態轉換到 RUNNABLE 狀態。
切換到 RUNNABLE 狀態之后,線程有兩種方式可以終止,一是執行完 run() 方法,自動進入終止狀態;二是設置一個標志,線程如果檢測到這個標志,則退出 run() 方法,這就是兩階段終止的響應終止指令。
2. 程序示例下面是一個簡單的使用 interrupt() 方法和中斷標志位來終止線程的示例:
public class Test { public static void main(String[] args) { Thread thread = new Thread(() -> { //檢測到中斷則退出 while (!Thread.currentThread().isInterrupted()){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); //重新設置中斷標志 Thread.currentThread().interrupt(); } System.out.println("I am roseduan"); } }); thread.start(); thread.interrupt(); } }
程序要每隔三秒打印語句,但是線程啟動之后就直接調用了 interrupt() 方法,所以線程直接退出了。需要注意的是這里在捕獲異常之后,需要重新設置線程的中斷狀態,因為 JVM 的異常處理會清除線程的中斷狀態。
在實際的生產中,并不推薦使用這種方式,因為在 Thread 內部可能會調用其他的方法,而其他的方法并不能夠保證正確的處理了線程中斷,解決的辦法便是自定義一個線程的中斷標志,如下所示:
public class Test { //自定義中斷標志 private volatile boolean isTerminated = false; private Thread thread; public synchronized void start(){ thread = new Thread(() -> { //檢測到中斷則退出 while (!isTerminated) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); //重新設置中斷狀態 Thread.currentThread().interrupt(); } System.out.println("I am roseduan"); } isTerminated = false; }); thread.start(); } //線程終止方法 public synchronized void stop(){ isTerminated = true; thread.interrupt(); } }3. 終止線程池
Java 中并不太會顯式的創建和終止一個線程,使用更多的是線程池。
Java 中的線程池提供了兩個方法來終止,分別是 shutdown() 和 shutdownNow() ,兩個方法的區別如下:
shutdown():拒絕新的任務,等待正在執行的和已經在阻塞隊列中的任務執行完后,再關閉線程池
shutdownNow():直接關閉線程池,拒絕新的任務,并且中斷正在執行的任務,已經在阻塞隊列中的任務也不會被執行了。
七、Producer - Consumer 模式這是較為常用的生產者 - 消費者模式,Java 中的線程池就使用了這種模式,線程的使用方是生產者,提供任務,線程池本身是消費者,取出并執行任務。
生產者 - 消費者模式使用了一個任務隊列,生產者將任務添加到隊列中,消費者從隊列中取出任務執行。
這樣的設計的目的有三個:
解耦,生產者和消費者之間沒有直接的關聯,而是通過隊列進行通信。
其次可以實現異步,例如生產者可以不用管消費者的行為,直接將任務添加到隊列中。消費者也可以不在乎生產者,直接從隊列中取任務。
最后,可以平衡生產者和消費者之間的速度差異。
下面是一個簡單的生產者 - 消費者程序示例:
public class ProducerConsumerTest { private BlockingQueuequeue = new LinkedBlockingQueue<>(100); public void produce() { queue.add(new Task()); } public void consume() { Task task = queue.poll(); while (task != null){ task.execute(); task = queue.poll(); } System.out.println("沒有任務了"); } public static void main(String[] args) throws InterruptedException { Test test = new Test(); //生產者線程,創建10個任務 Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { test.produce(); } }); producer.start(); producer.join(); //消費者線程 Thread consumer = new Thread(test::consume); consumer.start(); } } class Task{ public void execute(){ System.out.println("執行任務"); } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74809.html
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:在本例中,講述的無鎖來自于并發包我們將這個無鎖的稱為。在這里,我們使用二維數組來表示的內部存儲,如下變量存放所有的內部元素。為什么使用二維數組去實現一個一維的呢這是為了將來進行動態擴展時可以更加方便。 我們已經比較完整得介紹了有關無鎖的概念和使用方法。相對于有鎖的方法,使用無鎖的方式編程更加考驗一個程序員的耐心和智力。但是,無鎖帶來的好處也是顯而易見的,第一,在高并發的情況下,它比有鎖...
摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
閱讀 2184·2020-06-12 14:26
閱讀 2487·2019-08-29 16:41
閱讀 1890·2019-08-29 15:28
閱讀 2455·2019-08-26 13:43
閱讀 757·2019-08-26 13:37
閱讀 2777·2019-08-23 18:13
閱讀 2800·2019-08-23 15:31
閱讀 1018·2019-08-23 14:10