摘要:原文地址來,控制一下的并發數量問題做一些各種各樣的業務邏輯處理在這里,假設是一個外部傳入的參數不可預測,有可能值非常大,有人會全部丟進去循環。因此用這個方案基本靈活控制并發數量小手一緊。
原文地址:來,控制一下 Goroutine 的并發數量
問題func main() { userCount := math.MaxInt64 for i := 0; i < userCount; i++ { go func(i int) { // 做一些各種各樣的業務邏輯處理 fmt.Printf("go func: %d ", i) time.Sleep(time.Second) }(i) } }
在這里,假設 userCount 是一個外部傳入的參數(不可預測,有可能值非常大),有人會全部丟進去循環。想著全部都并發 goroutine 去同時做某一件事。覺得這樣子會效率會更高,對不對!
那么,你覺得這里有沒有什么問題?
噩夢般的開始當然,在特定場景下,問題可大了。因為在本文被丟進去同時并發的可是一個極端值。我們可以一起觀察下圖的指標分析,看看情況有多 “崩潰”。下圖是上述代碼的表現:
輸出結果... go func: 5839 go func: 5840 go func: 5841 go func: 5842 go func: 5915 go func: 5524 go func: 5916 go func: 8209 go func: 8264 signal: killed
如果你自己執行過代碼,在 “輸出結果” 上你會遇到如下問題:
系統資源占用率不斷上漲
輸出一定數量后:控制臺就不再刷新輸出最新的值了
信號量:signal: killed
系統負載 CPU短時間內系統負載暴增
虛擬內存短時間內占用的虛擬內存暴增
topPID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE BOOSTS ... 73414 test 100.2 01:59.50 9/1 0 18 6801M+ 0B 114G+ 73403 73403 running *0[1]小結
如果仔細看過監控工具的示意圖,就可以知道其實我間隔的執行了兩次,能看到系統間的使用率幅度非常大。當進程被殺掉后,整體又恢復為正常值
在這里,我們回到主題,就是在不控制并發的 goroutine 數量 會發生什么問題?大致如下:
CPU 使用率浮動上漲
Memory 占用不斷上漲。也可以看看 CMPRS,它表示進程的壓縮數據的字節數。已經到達 114G+ 了
主進程崩潰(被殺掉了)
簡單來說,“崩潰” 的原因就是對系統資源的占用過大。常見的比如:打開文件數(too many files open)、內存占用等等
危害對該臺服務器產生非常大的影響,影響自身及相關聯的應用。很有可能導致不可用或響應緩慢,另外啟動了復數 “失控” 的 goroutine,導致程序流轉混亂
解決方案在前面花了大量篇幅,渲染了在存在大量并發 goroutine 數量時,不控制的話會出現 “嚴重” 的問題,接下來一起思考下解決方案。如下:
控制/限制 goroutine 同時并發運行的數量
改變應用程序的邏輯寫法(避免大規模的使用系統資源和等待)
調整服務的硬件配置、最大打開數、內存等閾值
接下來正式的開始解決這個問題,希望你認真閱讀的同時加以思考,因為這個問題在實際項目中真的是太常見了!
問題已經拋出來了,你需要做的是想想有什么辦法解決這個問題。建議你自行思考一下技術方案。再接著往下看 :-)
嘗試 chanfunc main() { userCount := 10 ch := make(chan bool, 2) for i := 0; i < userCount; i++ { ch <- true go Read(ch, i) } //time.Sleep(time.Second) } func Read(ch chan bool, i int) { fmt.Printf("go func: %d ", i) <- ch }
輸出結果:
go func: 1 go func: 2 go func: 3 go func: 4 go func: 5 go func: 6 go func: 7 go func: 8 go func: 0
嗯,我們似乎很好的控制了 2 個 2 個的 “順序” 執行多個 goroutine。但是,問題出現了。你仔細數一下輸出結果,才 9 個值?
這明顯就不對。原因出在當主協程結束時,子協程也是會被終止掉的。因此剩余的 goroutine 沒來及把值輸出,就被送上路了(不信你把 time.Sleep 打開看看,看看輸出數量)
嘗試 sync... var wg = sync.WaitGroup{} func main() { userCount := 10 for i := 0; i < userCount; i++ { wg.Add(1) go Read(i) } wg.Wait() } func Read(i int) { defer wg.Done() fmt.Printf("go func: %d ", i) }
嗯,單純的使用 sync.WaitGroup 也不行。沒有控制到同時并發的 goroutine 數量(代指達不到本文所要求的目標)
小結單純簡單使用 channel 或 sync 都有明顯缺陷,不行。我們再看看組件配合能不能實現
嘗試 chan + sync... var wg = sync.WaitGroup{} func main() { userCount := 10 ch := make(chan bool, 2) for i := 0; i < userCount; i++ { wg.Add(1) go Read(ch, i) } wg.Wait() } func Read(ch chan bool, i int) { defer wg.Done() ch <- true fmt.Printf("go func: %d, time: %d ", i, time.Now().Unix()) time.Sleep(time.Second) <-ch }
輸出結果:
go func: 9, time: 1547911938 go func: 1, time: 1547911938 go func: 6, time: 1547911939 go func: 7, time: 1547911939 go func: 8, time: 1547911940 go func: 0, time: 1547911940 go func: 3, time: 1547911941 go func: 2, time: 1547911941 go func: 4, time: 1547911942 go func: 5, time: 1547911942
從輸出結果來看,確實實現了控制 goroutine 以 2 個 2 個的數量去執行我們的 “業務邏輯”,當然結果集也理所應當的是亂序輸出
方案一:簡單 Semaphore在確立了簡單使用 chan + sync 的方案是可行后,我們重新將流轉邏輯封裝為 gsema,主程序變成如下:
import ( "fmt" "time" "github.com/EDDYCJY/gsema" ) var sema = gsema.NewSemaphore(3) func main() { userCount := 10 for i := 0; i < userCount; i++ { go Read(i) } sema.Wait() } func Read(i int) { defer sema.Done() sema.Add(1) fmt.Printf("go func: %d, time: %d ", i, time.Now().Unix()) time.Sleep(time.Second) }分析方案
在上述代碼中,程序執行流程如下:
設置允許的并發數目為 3 個
循環 10 次,每次啟動一個 goroutine 來執行任務
每一個 goroutine 在內部利用 sema 進行調控是否阻塞
按允許并發數逐漸釋出 goroutine,最后結束任務
看上去人模人樣,沒什么嚴重問題。但卻有一個 “大” 坑,認真看到第二點 “每次啟動一個 goroutine” 這句話。這里有點問題,提前產生那么多的 goroutine 會不會有什么問題,接下來一起分析下利弊,如下:
利
適合量不大、復雜度低的使用場景
幾百幾千個、幾十萬個也是可以接受的(看具體業務場景)
實際業務邏輯在運行前就已經被阻塞等待了(因為并發數受限),基本實際業務邏輯損耗的性能比 goroutine 本身大
goroutine 本身很輕便,僅損耗極少許的內存空間和調度。這種等待響應的情況都是躺好了,等待任務喚醒
Semaphore 操作復雜度低且流轉簡單,容易控制
弊
不適合量很大、復雜度高的使用場景
有幾百萬、幾千萬個 goroutine 的話,就浪費了大量調度 goroutine 和內存空間。恰好你的服務器也接受不了的話
Semaphore 操作復雜度提高,要管理更多的狀態
小結基于什么業務場景,就用什么方案去做事
有足夠的時間,允許你去追求更優秀、極致的方案(用第三方庫也行)
用哪種方案,我認為主要基于以上兩點去思考,都是 OK 的。沒有對錯,只有當前業務場景能不能接受,這個預先啟動的 goroutine 數量你的系統是否能夠接受
當然了,常見/簡單的 Go 應用采用這類技術方案,基本就能解決問題了。因為像本文第一節 “問題” 如此超巨大數量的情況,情況很少。其并不存在那些 “特殊性”。因此用這個方案基本 OK
靈活控制 goroutine 并發數量小手一緊。隔壁老王發現了新的問題。“方案一” 中,在輸入輸出一體的情況下,在常見的業務場景中確實可以
但,這次新的業務場景比較特殊,要控制輸入的數量,以此達到改變允許并發運行 goroutine 的數量。我們仔細想想,要做出如下改變:
輸入/輸出要抽離,才可以分別控制
輸入/輸出要可變,理所應當在 for-loop 中(可設置數值的地方)
允許改變 goroutine 并發數量,但它也必須有一個最大值(因為允許改變是相對)
方案二:靈活 chan + syncpackage main import ( "fmt" "sync" "time" ) var wg sync.WaitGroup func main() { userCount := 10 ch := make(chan int, 5) for i := 0; i < userCount; i++ { wg.Add(1) go func() { defer wg.Done() for d := range ch { fmt.Printf("go func: %d, time: %d ", d, time.Now().Unix()) time.Sleep(time.Second * time.Duration(d)) } }() } for i := 0; i < 10; i++ { ch <- 1 ch <- 2 //time.Sleep(time.Second) } close(ch) wg.Wait() }
輸出結果:
... go func: 1, time: 1547950567 go func: 3, time: 1547950567 go func: 1, time: 1547950567 go func: 2, time: 1547950567 go func: 2, time: 1547950567 go func: 3, time: 1547950567 go func: 1, time: 1547950568 go func: 2, time: 1547950568 go func: 3, time: 1547950568 go func: 1, time: 1547950568 go func: 3, time: 1547950569 go func: 2, time: 1547950569
在 “方案二” 中,我們可以隨時隨地的根據新的業務需求,做如下事情:
變更 channel 的輸入數量
能夠根據特殊情況,變更 channel 的循環值
變更最大允許并發的 goroutine 數量
總的來說,就是可控空間都盡量放開了,是不是更加靈活了呢 :-)
方案三:第三方庫go-playground/pool
nozzle/throttler
Jeffail/tunny
panjf2000/ants
比較成熟的第三方庫也不少,基本都是以生成和管理 goroutine 為目標的池工具。我簡單列了幾個,具體建議大家閱讀下源碼或者多找找,原理相似
總結在本文的開頭,我花了大力氣(極端數量),告訴你同時并發過多的 goroutine 數量會導致系統占用資源不斷上漲。最終該服務崩盤的極端情況。為的是希望你今后避免這種問題,給你留下深刻的印象
接下來我們以 “控制 goroutine 并發數量” 為主題,展開了一番分析。分別給出了三種方案。在我看來,各具優缺點,我建議你挑選合適自身場景的技術方案就可以了
因為,有不同類型的技術方案也能解決這個問題,千人千面。本文推薦的是較常見的解決方案,也歡迎大家在評論區繼續補充 :-)
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/30001.html
摘要:它避免了上下文切換的額外耗費,兼顧了多線程的優點,簡化了高并發程序的復雜。而可以理解為一種語言的協程。線程輕量級進程,,是程序執行流的最小單元。一個標準的線程由線程,當前指令指針,寄存器集合和堆棧組成。其實就是或者等語言中的多線程開發。 grape 全部視頻:https://segmentfault.com/a/11... 原視頻地址:https://biglive.xueersi.c...
摘要:比如主協程啟動個子協程,主協程等待所有子協程退出后再繼續后續流程,這種場景下也可輕易實現。這個例子中,父協程僅僅是等待子協程結束,其實父協程也可以向管道中寫入數據通知子協程結束,這時子協程需要定期地探測管道中是否有消息出現。一.設計原理Go 語言中最常見的、也是經常被人提及的設計模式就是:不要通過共享內存來通信,我們應該使用通信來共享內存通過共享內存來通信是直接讀取內存的數據,而通過通信來共...
摘要:并發表示在一段時間內有多個動作存在。并發帶來的問題在享受并發編程帶來的高性能高吞吐量的同時,也會因為并發編程帶來一些意想不到弊端。并發過程中多線程之間的切換調度,上下文的保存恢復等都會帶來額外的線程切換開銷。 0x01 什么是并發 要理解并發首選我們來區分下并發和并行的概念。 并發:表示在一段時間內有多個動作存在。 并行:表示在同一時間點有多個動作同時存在。 例如:此刻我正在寫博客,但...
摘要:年,騰訊鼓勵和推進內外部開源落地執行。年,開源變得流程化制定和發布了騰訊的開源策略和具體流程,并發布第一批的個官方開源項目。年,騰訊在公司層面成立了技術委員會,開源協同成為騰訊技術發展的核心戰略。 .markdown-body{word-break:break-word;line-height:1.75;font-weight:400;font-size:15px;overflow-x:h...
摘要:每個進程的第一個線程都會隨著該進程的啟動而被創建,它們可以被稱為其所屬進程的主線程。因此,線程也被稱為輕量級進程。與進程調度類似,在線程之間快速切換,制造了線程并行運行的假象。也就是說,線程之間是沒有保護的。其中的指代的就是系統級線程。 并發的發展歷史 其實,在早期計算機并沒有包含操作系統,...
閱讀 3433·2021-11-22 09:34
閱讀 1899·2019-08-30 12:53
閱讀 3490·2019-08-28 18:07
閱讀 2977·2019-08-27 10:55
閱讀 2959·2019-08-26 10:12
閱讀 3584·2019-08-23 18:21
閱讀 1338·2019-08-23 14:10
閱讀 1469·2019-08-23 13:04