摘要:比如主協程啟動個子協程,主協程等待所有子協程退出后再繼續后續流程,這種場景下也可輕易實現。這個例子中,父協程僅僅是等待子協程結束,其實父協程也可以向管道中寫入數據通知子協程結束,這時子協程需要定期地探測管道中是否有消息出現。
Go 語言中最常見的、也是經常被人提及的設計模式就是:
"不要通過共享內存來通信,我們應該使用通信來共享內存"
通過共享內存來通信是直接讀取內存的數據,而通過通信來共享內存,是通過發送消息的方式來進行同步。
而通過發送消息來同步的這種方式常見的就是 Go 采用的通信順序進程 CSP(Communication Sequential Process) 模型以及 Erlang 采用的 Actor 模型,這兩種方式都是通過通信來共享內存。
如下圖所示
大部分的語言采用的都是第一種方式直接去操作內存,然后通過互斥鎖,CAS 等操作來保證并發安全。Go 引入了 Channel 和 Goroutine 實現 CSP 模型來解耦這個操作。
優點:
缺點:
目前的 Channel 收發操作均遵循了先進先出的設計,具體規則如下:
鎖(Lock) 是一種常見的并發控制技術,我們一般會將鎖分成樂觀鎖 和 悲觀鎖,即樂觀并發控制和悲觀并發控制,無鎖(lock-free)隊列更準確的描述是使用樂觀并發控制的隊列。樂觀并發控制也叫樂觀鎖,很多人都會誤以為樂觀鎖是與悲觀鎖差不多,然而它并不是真正的鎖,只是一種并發控制的思想.
樂觀并發控制本質上是基于驗證的協議,我們使用原子指令 CAS(compare-and-swap 或者 compare-and-set)在多線程中同步數據,無鎖隊列的實現也依賴這一原子指令。
從某種程度上說,Channel 是一個用于同步和通信的有鎖隊列,使用互斥鎖解決程序中可能存在的線程競爭問題
Go 語言社區也在 2014 年提出了無鎖 Channel 的實現方案,該方案將 Channel 分成了以下三種類型:
同步 Channel — 無緩沖區,發送方會直接將數據交給(Handoff)接收方
異步channel: 基于環形緩存的傳統生產者消費者模型;
chan struct{} 類型的異步 Channel — struct{} 類型不占用內存空間,不需要實現緩沖區和直接發送(Handoff)的語義;
Go 語言的 Channel 在運行時使用 runtime.hchan 結構體表示。我們在 Go 語言中創建新的 Channel 時,實際上創建的都是如下所示的結構:
type hchan struct { qcount uint // 隊列中元素總數量 dataqsiz uint // 循環隊列的長度 buf unsafe.Pointer // 指向長度為 dataqsiz 的底層數組,只有在有緩沖時這個才有意義 elemsize uint16 // 能夠發送和接受的元素大小 closed uint32 // 是否關閉 elemtype *_type // 元素的類型 sendx uint // 當前已發送的元素在隊列當中的索引位置 recvx uint // 當前已接收的元素在隊列當中的索引位置 recvq waitq // 接收 Goroutine 鏈表 sendq waitq // 發送 Goroutine 鏈表 lock mutex // 互斥鎖}// waitq 是一個雙向鏈表,里面保存了 goroutinetype waitq struct { first *sudog last *sudog}
如下圖所示,channel 底層其實是一個循環隊列
Go 語言中所有 Channel 的創建都會使用 make 關鍵字。創建的表達式使用 make(chan T, cap)
來創建 channel.
如果不向 make 傳遞表示緩沖區大小的參數,那么就會設置一個默認值 0,也就是當前的 Channel 不存在緩沖區。
當想要向 Channel
發送數據時,就需要使用 ch <- i
語句.
在發送數據的邏輯執行之前會先為當前 Channel 加鎖,防止多個線程并發修改數據。
如果 Channel 已經關閉,那么向該 Channel 發送數據時會報 “send on closed channel” 錯誤并中止程序。
如果 Channel 沒有被關閉并且已經有處于讀等待的 Goroutine,會取出最先陷入等待的 Goroutine 并直接向它發送數據:
直接發送的過程稱為兩個部分:
runtime.sendDirect
將發送的數據直接拷貝到 x = <-c 表達式中變量 x 所在的內存地址上;runtime.goready
將等待接收數據的 Goroutine 標記成可運行狀態 Grunnable 并把該 Goroutine 放到發送方所在的處理器的 runnext 上等待執行,該處理器在下一次調度時會立刻喚醒數據的接收方;需要注意的是,發送數據的過程只是將接收方的 Goroutine 放到了處理器的 runnext 中,程序沒有立刻執行該 Goroutine。
如果創建的 Channel 包含緩沖區并且 Channel 中的數據沒有裝滿,會使用 runtime.chanbuf
計算出下一個可以存儲數據的位置,然后通過 runtime.typedmemmove
將發送的數據拷貝到緩沖區中并增加 sendx 索引和 qcount 計數器。
當 Channel 沒有接收者能夠處理數據時,向 Channel 發送數據會被下游阻塞,當然使用 select 關鍵字可以向 Channel 非阻塞地發送消息。
可以簡單梳理和總結一下使用 ch <- i
表達式向 Channel 發送數據時遇到的幾種情況:
可以使用兩種不同的方式去接收 Channel 中的數據:
i <- chi, ok <- ch
會根據緩沖區的大小分別處理不同的情況
當 Channel 的緩沖區中已經包含數據時,從 Channel 中接收數據會直接從緩沖區中 的索引位置中取出數據進行處理:
當 Channel 的發送隊列中不存在等待的 Goroutine 并且緩沖區中也不存在任何數據時,從管道中接收數據的操作會變成阻塞的,然而不是所有的接收操作都是阻塞的,與 select 語句結合使用時就可能會使用到非阻塞的接收操作:
使用 close(ch) 來關閉 channel 最后會調用 runtime 中的 closechan 方法.
channel一般用于協程之間的通信,channel也可以用于并發控制。比如主協程啟動N個子協程,主協程等待所有子協程退出后再繼續后續流程,這種場景下channel也可輕易實現。
package mainimport ( "time" "fmt")func Process(ch chan int) { //Do some work... time.Sleep(time.Second) ch <- 1 //管道中寫入一個元素表示當前協程已結束}func main() { channels := make([]chan int, 10) //創建一個10個元素的切片,元素類型為channel for i:= 0; i < 10; i++ { channels[i] = make(chan int) //切片中放入一個channel go Process(channels[i]) //啟動協程,傳一個管道用于通信 } for i, ch := range channels { //遍歷切片,等待子協程結束 <-ch fmt.Println("Routine ", i, " quit!") }}
輸出:
Routine 0 quit!Routine 1 quit!Routine 2 quit!Routine 3 quit!Routine 4 quit!Routine 5 quit!Routine 6 quit!Routine 7 quit!Routine 8 quit!Routine 9 quit!
上面程序通過創建N個channel來管理N個協程,每個協程都有一個channel用于跟父協程通信,父協程創建完所有協程后等待所有協程結束。
這個例子中,父協程僅僅是等待子協程結束,其實父協程也可以向管道中寫入數據通知子協程結束,這時子協程需要定期地探測管道中是否有消息出現。
關閉 channel 時會釋放所有阻塞的 Goroutine,所以我們就可以利用這個特性來做一對多的通知,除了一對多之外我們還用了 done 做了多對一的通知,當然多對一這種情況還是建議直接使用 WaitGroup 即可
package mainimport ( "fmt" "time")func run(stop <-chan struct{}, done chan<- struct{}) { // 每一秒打印一次 for { select { case <-stop: fmt.Println("stop...") // 接收到停止后,向 done 管道中發送數據,然后退出函數 done <- struct{}{} return // 超時1秒將輸出hello case <-time.After(time.Second): fmt.Println("hello...") } }}func main() { // 一對多,使用無緩沖通道,當關閉chan后,其他程序中接收到關閉信號后會統一執行操作 stop := make(chan struct{}) // 多對一,當關閉后,關閉一個chan, 寫入一個數據到管道中 done := make(chan struct{}, 10) for i := 0; i < 10; i++ { go run(stop, done) } // 模擬超時時間 time.Sleep(5 * time.Second) close(stop) for i := 0; i < 10; i++ { <-done }}
輸出:
hello...hello...hello......hello..stop...stop...stop...stop...stop...stop...stop...stop...stop...stop...
利用無緩沖channel,接收早于發送的特點,只有當數據寫入后,接收才能完成實現數據一致性
package mainimport ( "fmt")// 這里只能讀func read(c <-chan int) { fmt.Println("read:", <-c)}// 這里只能寫func write(c chan<- int) { c <- 0}func main() { c := make(chan int) go write(c) read(c)}
超時控制還是建議使用 context
func run(stop <-chan struct{}, done chan<- struct{}) { // 每一秒打印一次 hello for { select { case <-stop: fmt.Println("stop...") done <- struct{}{} return case <-time.After(time.Second): fmt.Println("hello") } }}
根據控制Channel的緩存大小來控制并發執行的Goroutine的最大數目
var limit = make(chan int, 3)func main() { for _, w := range work { go func() { limit <- 1 w() <-limit }() } select{}}
最后一句select{}是一個空的管道選擇語句,該語句會導致main線程阻塞,從而避免程序過早退出。還有for{}
、<-make(chan int)
等諸多方法可以達到類似的效果。因為main線程被阻塞了,如果需要程序正常退出的話可以通過調用os.Exit(0)實現。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/124536.html
摘要:的用例的用法最早是語言傳開來的看一下我從網上扒的代碼其中符號是往當中寫入數據的操作同時注意一般的位置對于來說是阻塞的由于能夠處理異步操作也就是說能做到異步代碼用同步寫法更多的細節搜索應該就能找到除了也實現了對于的支持也就是 CSP 的用例 CSP 的用法最早是 Go 語言傳開來的, 看一下我從網上扒的代碼: package main import fmt func ping(pin...
摘要:協程與信箱得益于,我們可以基于的協程與快速實現一個信箱模式調度。樣例代碼比如在一個聊天室中,我們可以定義一個房間模型。 什么是Actor? Actor對于PHPer來說,可能會比較陌生,寫過Java的同學會比較熟悉,Java一直都有線程的概念(雖然PHP有Pthread,但不普及),它是一種非共享內存的并發模型,每個Actor內的數據獨立存在,Actor之間通過消息傳遞的形式進行交互調...
摘要:為語言提供了強大的協程編程模式。提供的協程語法借鑒自,在此向開發組致敬協程可以與很好地互補。并發執行使用創建協程,可以讓和兩個函數變成并發執行。協程需要拿到請求的結果。 Swoole4為PHP語言提供了強大的CSP協程編程模式。底層提供了3個關鍵詞,可以方便地實現各類功能。 Swoole4提供的PHP協程語法借鑒自Golang,在此向GO開發組致敬 PHP+Swoole協程可以與...
摘要:它避免了上下文切換的額外耗費,兼顧了多線程的優點,簡化了高并發程序的復雜。而可以理解為一種語言的協程。線程輕量級進程,,是程序執行流的最小單元。一個標準的線程由線程,當前指令指針,寄存器集合和堆棧組成。其實就是或者等語言中的多線程開發。 grape 全部視頻:https://segmentfault.com/a/11... 原視頻地址:https://biglive.xueersi.c...
閱讀 1231·2021-11-23 09:51
閱讀 678·2021-11-19 09:40
閱讀 1337·2021-10-11 10:58
閱讀 2347·2021-09-30 09:47
閱讀 3726·2021-09-22 15:55
閱讀 2159·2021-09-03 10:49
閱讀 1250·2021-09-03 10:33
閱讀 697·2019-08-29 17:12