摘要:共享內存相信對并發有所了解的同學都應該知道在推出后,對內存管理有了更高標準的規范了,這使我們開發并發程序也有更好的標準了,不會有一些模糊的定義導致的無法確定的錯誤。
通過前幾篇的學習,相信大家對Akka應該有所了解了,都說解決并發哪家強,JVM上面找Akka,那么Akka到底在解決并發問題上幫我們做了什么呢?
共享內存眾所周知,在處理并發問題上面,最核心的一部分就是如何處理共享內存,很多時候我們都需要花費很多時間和精力在共享內存上,那么在學習Akka對共享內存是如何管理之前,我們先來看看Java中是怎么處理這個問題的。
Java共享內存相信對Java并發有所了解的同學都應該知道在Java5推出JSR 133后,Java對內存管理有了更高標準的規范了,這使我們開發并發程序也有更好的標準了,不會有一些模糊的定義導致的無法確定的錯誤。
首先來看看一下Java內存模型的簡單構圖:
主內存部分數據的拷貝,線程對自己工作內存的操作速度遠遠快于對主內存的操作,但這也往往會引起共享變量不一致的問題,比如以下一個場景:
int a = 0; public void setA() { a = a + 1; }
上面是一個很簡單的例子,a是一個全局變量,然后我們有一個方法去修改這個值,每次增加一,假如我們用100個線程去運行這段代碼,那a最終的結果會是多少呢?
100?顯然不一定,它可能是80,90,或者其他數,這就造成共享變量不一致的問題,那么為什么會導致這個問題呢,就是我們上面所說的,線程去修改a的時候可能就只是修改了自己工作內存中a的副本,但并沒有將a的值及時的刷新到主內存中,這便會導致其他線程可能讀到未被修改a的值,最終出現變量不一致問題。
那么Java中是怎么處理這種問題,如何保證共享變量的一致性的呢?
同步機制大體上Java中有3類同步機制,但它們所解決的問題并不相同,我們先來看一看這三種機制:
final關鍵詞
volatile關鍵詞
synchronized關鍵詞(這里代表了所有類似監視鎖的機制)
寫過Java程序的同學對這個關鍵詞應該再熟悉不過了,其基本含義就是不可變,不可變變量,比如:
final int a = 10; final String b = "hello";
不可變的含義在于當你對這些變量或者對象賦初值后,不能再重新去賦值,但對于對象來說,我們不能修改的是它的引用,但是對象內的內容還是可以修改的。下面是一個簡單的例子:
final User u = new User(1,"a"); u.id = 2; //可以修改 u = new User(2,"b"); //不可修改
所以在利用final關鍵詞用來保證共享變量的一致性時一定要了解清楚自己的需求,選擇合適的方法,另外final變量必須在定義或者構建對象的時候進行初始化,不然會報錯。
2.volatile關鍵詞很多同學在遇到共享變量不一致的問題后,都會說我在聲明變量前加一個volatile就好了,但事實真是這樣嘛?答案顯然不是。那我們來看看volatile到底為我們做了什么。
前面我們說過每個線程都有自己的工作內存,很多時候線程去修改一個變量的值只是修改了自己工作內存中副本的值,這便會導致主內存的值并不是最新的,其他線程讀取到的變量便會出現問題。volatile幫我們解決了這個問題,它有兩個特點:
線程每次都會去主內存中讀取變量
線程每次修改變量后的值都會及時更新到主內存中去
舉個例子:
volatile int a = 0; public void setA() { a = a + 1; }
現在線程在執行這段代碼時,都會強制去主內存中讀取變量的值,修改后也會馬上更新到主內存中去,但是這真的能解決共享變量不一致的問題嘛,其實不然,比如我們有這么一個場景:兩個線程同時讀取了主內存中變量最新的值,這是我們兩個線程都去執行修改操作,最后結果會是什么呢?這里就留給大家自己去思考了,其實也很簡單的。
那么volatile在什么場景下能保證線程安全,按照官方來說,有以下兩個條件:
對變量的寫操作不依賴于當前值
該變量沒有包含在具有其他變量的不變式中
多的方面這里我就不展開了,推薦兩篇我覺得寫的還不錯的文章:volatile的使用及其原理volatile的適用場景
3.synchronized關鍵詞很多同學在學習Java并發過程中最先接觸的就是synchronized關鍵詞了,它確實能解決我們上述的并發問題,那它到時如何幫我們保證共享變量的一致性的呢?
簡而言之的說,線程在訪問請求用synchronized關鍵詞修飾的方法,代碼塊都會要求獲得一個監視器鎖,當線程獲得了監視器鎖后,它才有權限去執行相應的方法或代碼塊,并在執行結束后釋放監視器鎖,這便能保證共享內存的一致性了,因為本文主要是講Akka的共享內存,過多的篇幅就不展開了,這里推薦一篇解析synchronized原理很不錯的文章,有興趣的同學可以去看看:Synchronized及其實現原理
Akka共享內存Akka中的共享內存是基于Actor模型的,Actor模型提倡的是:通過通訊來實現共享內存,而不是用共享內存來實現通訊,這點是跟Java解決共享內存最大的區別,舉個例子:
在Java中我們要去操作共享內存中數據時,每個線程都需要不斷的獲取共享內存的監視器鎖,然后將操作后的數據暴露給其他線程訪問使用,用共享內存來實現各個線程之間的通訊,而在Akka中我們可以將共享可變的變量作為一個Actor內部的狀態,利用Actor模型本身串行處理消息的機制來保證變量的一致性。
當然要使用Akka中的機制也必須滿足一下兩條原則:
消息的發送必須先于消息的接收
同一個Actor對一條消息的處理先于下一條消息處理
第二個原則很好理解,就是上面我們說的Actor內部是串行處理消息,那我們來看看第一個原則,為什么要保證消息的發送先于消息的接收,是為了防止我們在創建消息的時候發生了不確定的錯誤,接收者將可能接收到不正確的消息,導致發生奇怪的異常,主要表現為消息對象未初始化完整時,若沒有這條規則保證,Actor收到的消息便會不完整。
通過前面的學習我們知道Actor是一種比線程更輕量級,抽象程度更高的一種結構,它幫我們規避了我們自己去操作線程,那么Akka底層到底是怎么幫我們去保證共享內存的一致性的呢?
一個Actor它可能會有很多線程同時向它發送消息,之前我們也說到Actor本身是串行處理的消息的,那它是如何保障這種機制的呢?
MailboxMailbox在Actor模型是一個很重要的概念,我們都知道向一個Actor發送的消息首先都會被存儲到它所對應的Mailbox中,那么我們先來看看MailBox的定義結構(本文所引用的代碼都在akka.dispatch.Mailbox.scala中,有興趣的同學也可以去研究一下):
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}
很清晰Mailbox內部維護了一個messageQueue這樣的消息隊列,并繼承了Scala自身定義的ForkJoinTask任務執行類和我們很熟悉的Runnable接口,由此可以看出,Mailbox底層還是利用Java中的線程進行處理的。那么我們先來看看它的run方法:
override final def run(): Unit = { try { if (!isClosed) { //Volatile read, needed here processAllSystemMessages() //First, deal with any system messages processMailbox() //Then deal with messages } } finally { setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } }
為了配合理解,我們這里先來看一下定義:
@inline final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline final def isClosed: Boolean = currentStatus == Closed
這里我們可以看出Mailbox本身會維護一個狀態Mailbox.Status,是一個Int變量,而且是可變的,并且用到volatile來保證了它的可見性:
@volatile protected var _statusDoNotCallMeDirectly: Status = _ //0 by default
現在我們在回去看上面的代碼,run方法的執行過程,首先它會去讀取MailBox此時的狀態,因為是一個Volatile read,所以能保證讀取到的是最新的值,然后它會先處理任何的系統消息,這部分不需要我們太過關心,之后便是執行我們發送的消息,這里我們需要詳細看一下processMailbox()的實現:
@tailrec private final def processMailbox( left: Int = java.lang.Math.max(dispatcher.throughput, 1), deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit = if (shouldProcessMessage) { val next = dequeue() //去出下一條消息 if (next ne null) { if (Mailbox.debug) println(actor.self + " processing message " + next) actor invoke next if (Thread.interrupted()) throw new InterruptedException("Interrupted while processing actor messages") processAllSystemMessages() if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0)) processMailbox(left - 1, deadlineNs) //遞歸處理下一條消息 } }
從上述代碼中我們可以清晰的看到,當滿足消息處理的情況下就會進行消息處理,從消息隊列列取出下一條消息就是上面的dequeue(),然后將消息發給具體的Actor進行處理,接下去又是處理系統消息,然后判斷是否還有滿足情況需要下一條消息,若有則再次進行處理,可以看成一個遞歸操作,@tailrec也說明了這一點,它表示的是讓編譯器進行尾遞歸優化。
現在我們來看一下一條消息從發送到最終處理在Akka中到底是怎么執行的,下面的內容是我通過閱讀Akka源碼加自身理解得出的,這里先畫了一張流程圖:
消息的大致流程我都在圖中給出了,還有一些細節,必須序列化消息,獲取狀態等就沒有具體說明有興趣的同學可以自己去閱讀以下Akka的源碼,個人覺得Akka的源碼閱讀性還是很好的,比如:
基本沒有方法超過20行
不會有過多的注釋,但關鍵部分會給出,更能加深自己的理解
當然也有一些困擾,我們在不了解各個類,接口之間的關系時,閱讀體驗就會變得很糟糕,當然我用IDEA很快就解決了這個問題。
我們這里來看看關鍵的部分:Actor是如何保證串行處理消息的?
上圖中有一根判定,是否已有線程在執行任務?我們來看看這個判定的具體邏輯:
@tailrec final def setAsScheduled(): Boolean = { //是否有線程正在調度執行該MailBox的任務 val s = currentStatus /* * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Scheduled bit already set. */ if ((s & shouldScheduleMask) != Open) false else updateStatus(s, s | Scheduled) || setAsScheduled() }
從注釋和代碼的邏輯上我們可以看出當已有線程在執行返回false,若沒有則去更改狀態為以調度,直到被其他線程搶占或者更改成功,其中updateStatus()是線程安全的,我們可以看一下它的實現,是一個CAS操作:
@inline protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean = Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)
到這里我們應該可以大致清楚Actor內部是如何保證共享內存的一致性了,Actor接收消息是多線程的,但處理消息是單線程的,利用MailBox中的Status來保障這一機制。
總結通過上面的內容我們可以總結出以下幾點:
Akka并不是說用了什么特殊魔法來保證并發的,底層使用的還是Java和JVM的同步機制
Akka并沒有使用任何的鎖機制,這就避免了死鎖的可能性
Akka并發執行的處理并沒有使用線程切換,不僅提高了線程的使用效率,也大大減少了線程切換消耗
Akka為我們提供了更高層次的并發抽象模型,讓我們不必關心底層的實現,只需著重實現業務邏輯就行,遵循它的規范,讓框架幫我們處理一切難點吧
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70070.html
摘要:是一個構建在上,基于模型的的并發框架,為構建伸縮性強,有彈性的響應式并發應用提高更好的平臺。上述例子中的信件就相當于中的消息,與之間只能通過消息通信。當然模型比這要復雜的多,這里主要是簡潔的闡述一下模型的概念。模型的出現解決了這個問題。 Akka是一個構建在JVM上,基于Actor模型的的并發框架,為構建伸縮性強,有彈性的響應式并發應用提高更好的平臺。本文主要是個人對Akka的學習和應...
摘要:原文鏈接解決了什么問題使用模型來克服傳統面向對象編程模型的局限性,并應對高并發分布式系統所帶來的挑戰。在某些情況,這個問題可能會變得更糟糕,工作線程發生了錯誤但是其自身卻無法恢復。 這段時間由于忙畢業前前后后的事情,拖更了很久,表示非常抱歉,回歸后的第一篇文章主要是看到了Akka最新文檔中寫的What problems does the actor model solve?,閱讀完后覺...
摘要:關于三者的一些概括總結離線分析框架,適合離線的復雜的大數據處理內存計算框架,適合在線離線快速的大數據處理流式計算框架,適合在線的實時的大數據處理我是一個以架構師為年之內目標的小小白。 整理自《架構解密從分布式到微服務》第七章——聊聊分布式計算.做了相應補充和修改。 [TOC] 前言 不管是網絡、內存、還是存儲的分布式,它們最終目的都是為了實現計算的分布式:數據在各個計算機節點上流動,同...
摘要:源碼鏈接進階持久化插件有同學可能會問,我對不是很熟悉亦或者覺得單機存儲并不是安全,有沒有支持分布式數據存儲的插件呢,比如某爸的云數據庫答案當然是有咯,良心的我當然是幫你們都找好咯。 這次把這部分內容提到現在寫,是因為這段時間開發的項目剛好在這一塊遇到了一些難點,所以準備把經驗分享給大家,我們在使用Akka時,會經常遇到一些存儲Actor內部狀態的場景,在系統正常運行的情況下,我們不需要...
閱讀 3389·2023-04-26 01:46
閱讀 2913·2023-04-25 20:55
閱讀 5484·2021-09-22 14:57
閱讀 2980·2021-08-27 16:23
閱讀 1717·2019-08-30 14:02
閱讀 2068·2019-08-26 13:44
閱讀 652·2019-08-26 12:08
閱讀 2961·2019-08-26 11:47