為什么需要降載
微服務集群中,調用鏈路錯綜復雜,作為服務提供者需要有一種保護自己的機制,防止調用方無腦調用壓垮自己,保證自身服務的高可用。
最常見的保護機制莫過于限流機制,使用限流器的前提是必須知道自身的能夠處理的最大并發數,一般在上線前通過壓測來得到最大并發數,而且日常請求過程中每個接口的限流參數都不一樣,同時系統一直在不斷的迭代其處理能力往往也會隨之變化,每次上線前都需要進行壓測然后調整限流參數變得非常繁瑣。
那么有沒有一種更加簡潔的限流機制能實現最大限度的自我保護呢?
什么是自適應降載
自適應降載能非常智能的保護服務自身,根據服務自身的系統負載動態判斷是否需要降載。
設計目標:
- 保證系統不被拖垮。
- 在系統穩定的前提下,保持系統的吞吐量。
那么關鍵就在于如何衡量服務自身的負載呢?
判斷高負載主要取決于兩個指標:
- cpu 是否過載。
- 最大并發數是否過載。
以上兩點同時滿足時則說明服務處于高負載狀態,則進行自適應降載。
同時也應該注意高并發場景 cpu 負載、并發數往往波動比較大,從數據上我們稱這種現象為毛刺,毛刺現象可能會導致系統一直在頻繁的進行自動降載操作,所以我們一般獲取一段時間內的指標均值來使指標更加平滑。實現上可以采用準確的記錄一段時間內的指標然后直接計算平均值,但是需要占用一定的系統資源。
統計學上有一種算法:滑動平均(exponential moving average),可以用來估算變量的局部均值,使得變量的更新與歷史一段時間的歷史取值有關,無需記錄所有的歷史局部變量就可以實現平均值估算,非常節省寶貴的服務器資源。
滑動平均算法原理 參考這篇文章講的非常清楚。
變量 V 在 t 時刻記為 Vt,θt 為變量 V 在 t 時刻的取值,即在不使用滑動平均模型時 Vt=θt,在使用滑動平均模型后,Vt 的更新公式如下:
Vt=β?Vt?1+(1?β)?θt
- β = 0 時 Vt = θt
- β = 0.9 時,大致相當于過去 10 個 θt 值的平均
- β = 0.99 時,大致相當于過去 100 個 θt 值的平均
代碼實現
接下來我們來看下 go-zero 自適應降載的代碼實現。
core/load/adaptiveshedder.go
自適應降載接口定義:
// 回調函數Promise interface { // 請求成功時回調此函數 Pass() // 請求失敗時回調此函數 Fail()}// 降載接口定義Shedder interface { // 降載檢查 // 1. 允許調用,需手動執行 Promise.accept()/reject()上報實際執行任務結構 // 2. 拒絕調用,將會直接返回err:服務過載錯誤 ErrServiceOverloaded Allow() (Promise, error)}
接口定義非常精簡意味使用起來其實非常簡單,對外暴露一個`Allow()(Promise,error)。
go-zero 使用示例:
業務中只需調該方法判斷是否降載,如果被降載則直接結束流程,否則執行業務最后使用返回值 Promise 根據執行結果回調結果即可。
func UnarySheddingInterceptor(shedder load.Shedder, metrics *stat.Metrics) grpc.UnaryServerInterceptor { ensureSheddingStat() return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (val interface{}, err error) { sheddingStat.IncrementTotal() var promise load.Promise // 檢查是否被降載 promise, err = shedder.Allow() // 降載,記錄相關日志與指標 if err != nil { metrics.AddDrop() sheddingStat.IncrementDrop() return } // 最后回調執行結果 defer func() { // 執行失敗 if err == context.DeadlineExceeded { promise.Fail() // 執行成功 } else { sheddingStat.IncrementPass() promise.Pass() } }() // 執行業務方法 return handler(ctx, req) }}
接口實現類定義 :
主要包含三類屬性
- cpu 負載閾值:超過此值意味著 cpu 處于高負載狀態。
- 冷卻期:假如服務之前被降載過,那么將進入冷卻期,目的在于防止降載過程中負載還未降下來立馬加壓導致來回抖動。因為降低負載需要一定的時間,處于冷卻期內應該繼續檢查并發數是否超過限制,超過限制則繼續丟棄請求。
- 并發數:當前正在處理的并發數,當前正在處理的并發平均數,以及最近一段內的請求數與響應時間,目的是為了計算當前正在處理的并發數是否大于系統可承載的最大并發數。
// option參數模式ShedderOption func(opts *shedderOptions)// 可選配置參數shedderOptions struct { // 滑動時間窗口大小 window time.Duration // 滑動時間窗口數量 buckets int // cpu負載臨界值 cpuThreshold int64}// 自適應降載結構體,需實現 Shedder 接口adaptiveShedder struct { // cpu負載臨界值 // 高于臨界值代表高負載需要降載保證服務 cpuThreshold int64 // 1s內有多少個桶 windows int64 // 并發數 flying int64 // 滑動平滑并發數 avgFlying float64 // 自旋鎖,一個服務共用一個降載 // 統計當前正在處理的請求數時必須加鎖 // 無損并發,提高性能 avgFlyingLock syncx.SpinLock // 最后一次拒絕時間 dropTime *syncx.AtomicDuration // 最近是否被拒絕過 droppedRecently *syncx.AtomicBool // 請求數統計,通過滑動時間窗口記錄最近一段時間內指標 passCounter *collection.RollingWindow // 響應時間統計,通過滑動時間窗口記錄最近一段時間內指標 rtCounter *collection.RollingWindow}
自適應降載構造器:
func NewAdaptiveShedder(opts ...ShedderOption) Shedder { // 為了保證代碼統一 // 當開發者關閉時返回默認的空實現,實現代碼統一 // go-zero很多地方都采用了這種設計,比如Breaker,日志組件 if !enabled.True() { return newNopShedder() } // options模式設置可選配置參數 options := shedderOptions{ // 默認統計最近5s內數據 window: defaultWindow, // 默認桶數量50個 buckets: defaultBuckets, // cpu負載 cpuThreshold: defaultCpuThreshold, } for _, opt := range opts { opt(&options) } // 計算每個窗口間隔時間,默認為100ms bucketDuration := options.window / time.Duration(options.buckets) return &adaptiveShedder{ // cpu負載 cpuThreshold: options.cpuThreshold, // 1s的時間內包含多少個滑動窗口單元 windows: int64(time.Second / bucketDuration), // 最近一次拒絕時間 dropTime: syncx.NewAtomicDuration(), // 最近是否被拒絕過 droppedRecently: syncx.NewAtomicBool(), // qps統計,滑動時間窗口 // 忽略當前正在寫入窗口(桶),時間周期不完整可能導致數據異常 passCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), // 響應時間統計,滑動時間窗口 // 忽略當前正在寫入窗口(桶),時間周期不完整可能導致數據異常 rtCounter: collection.NewRollingWindow(options.buckets, bucketDuration, collection.IgnoreCurrentBucket()), }}
降載檢查 Allow()
:
檢查當前請求是否應該被丟棄,被丟棄業務側需要直接中斷請求保護服務,也意味著降載生效同時進入冷卻期。如果放行則返回 promise,等待業務側執行回調函數執行指標統計。
// 降載檢查func (as *adaptiveShedder) Allow() (Promise, error) { // 檢查請求是否被丟棄 if as.shouldDrop() { // 設置drop時間 as.dropTime.Set(timex.Now()) // 最近已被drop as.droppedRecently.Set(true) // 返回過載 return nil, ErrServiceOverloaded } // 正在處理請求數加1 as.addFlying(1) // 這里每個允許的請求都會返回一個新的promise對象 // promise內部持有了降載指針對象 return &promise{ start: timex.Now(), shedder: as, }, nil}
檢查是否應該被丟棄shouldDrop()
:
// 請求是否應該被丟棄func (as *adaptiveShedder) shouldDrop() bool { // 當前cpu負載超過閾值 // 服務處于冷卻期內應該繼續檢查負載并嘗試丟棄請求 if as.systemOverloaded() || as.stillHot() { // 檢查正在處理的并發是否超出當前可承載的最大并發數 // 超出則丟棄請求 if as.highThru() { flying := atomic.LoadInt64(&as.flying) as.avgFlyingLock.Lock() avgFlying := as.avgFlying as.avgFlyingLock.Unlock() msg := fmt.Sprintf( "dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f", stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying) logx.Error(msg) stat.Report(msg) return true } } return false}
cpu 閾值檢查 systemOverloaded()
:
cpu 負載值計算算法采用的滑動平均算法,防止毛刺現象。每隔 250ms 采樣一次 β 為 0.95,大概相當于歷史 20 次 cpu 負載的平均值,時間周期約為 5s。
// cpu 是否過載func (as *adaptiveShedder) systemOverloaded() bool { return systemOverloadChecker(as.cpuThreshold)}// cpu 檢查函數systemOverloadChecker = func(cpuThreshold int64) bool { return stat.CpuUsage() >= cpuThreshold}// cpu滑動平均值curUsage := internal.RefreshCpu()prevUsage := atomic.LoadInt64(&cpuUsage)// cpu = cpu??1 * beta + cpu? * (1 - beta)// 滑動平均算法usage := int64(float64(prevUsage)*beta + float64(curUsage)*(1-beta))atomic.StoreInt64(&cpuUsage, usage)
檢查是否處于冷卻期 stillHot
:
判斷當前系統是否處于冷卻期,如果處于冷卻期內,應該繼續嘗試檢查是否丟棄請求。主要是防止系統在過載恢復過程中負載還未降下來,立馬又增加壓力導致來回抖動,此時應該嘗試繼續丟棄請求。
func (as *adaptiveShedder) stillHot() bool { // 最近沒有丟棄請求 // 說明服務正常 if !as.droppedRecently.True() { return false } // 不在冷卻期 dropTime := as.dropTime.Load() if dropTime == 0 { return false } // 冷卻時間默認為1s hot := timex.Since(dropTime) < coolOffDuration // 不在冷卻期,正常處理請求中 if !hot { // 重置drop記錄 as.droppedRecently.Set(false) } return hot}
檢查當前正在處理的并發數highThru()
:
一旦 當前處理的并發數 > 并發數承載上限 則進入降載狀態。
這里為什么要加鎖呢?因為自適應降載時全局在使用的,為了保證并發數平均值正確性。
為什么這里要加自旋鎖呢?因為并發處理過程中,可以不阻塞其他的 goroutine 執行任務,采用無鎖并發提高性能。
func (as *adaptiveShedder) highThru() bool { // 加鎖 as.avgFlyingLock.Lock() // 獲取滑動平均值 // 每次請求結束后更新 avgFlying := as.avgFlying // 解鎖 as.avgFlyingLock.Unlock() // 系統此時最大并發數 maxFlight := as.maxFlight() // 正在處理的并發數和平均并發數是否大于系統的最大并發數 return int64(avgFlying) > maxFlight && atomic.LoadInt64(&as.flying) > maxFlight}
如何得到正在處理的并發數與平均并發數呢?
當前正在的處理并發數統計其實非常簡單,每次允許請求時并發數 +1,請求完成后 通過 promise 對象回調-1 即可,并利用滑動平均算法求解平均并發數即可。
type promise struct { // 請求開始時間 // 統計請求處理耗時 start time.Duration shedder *adaptiveShedder}func (p *promise) Fail() { // 請求結束,當前正在處理請求數-1 p.shedder.addFlying(-1)}func (p *promise) Pass() { // 響應時間,單位毫秒 rt := float64(timex.Since(p.start)) / float64(time.Millisecond) // 請求結束,當前正在處理請求數-1 p.shedder.addFlying(-1) p.shedder.rtCounter.Add(math.Ceil(rt)) p.shedder.passCounter.Add(1)}func (as *adaptiveShedder) addFlying(delta int64) { flying := atomic.AddInt64(&as.flying, delta) // 請求結束后,統計當前正在處理的請求并發 if delta < 0 { as.avgFlyingLock.Lock() // 估算當前服務近一段時間內的平均請求數 as.avgFlying = as.avgFlying*flyingBeta + float64(flying)*(1-flyingBeta) as.avgFlyingLock.Unlock() }}
得到了當前的系統數還不夠 ,我們還需要知道當前系統能夠處理并發數的上限,即最大并發數。
請求通過數與響應時間都是通過滑動窗口來實現的,關于滑動窗口的實現可以參考 自適應熔斷器
那篇文章。
當前系統的最大并發數 = 窗口單位時間內的最大通過數量 * 窗口單位時間內的最小響應時間。
// 計算每秒系統的最大并發數// 最大并發數 = 最大請求數(qps)* 最小響應時間(rt)func (as *adaptiveShedder) maxFlight() int64 { // windows = buckets per second // maxQPS = maxPASS * windows // minRT = min average response time in milliseconds // maxQPS * minRT / milliseconds_per_second // as.maxPass()*as.windows - 每個桶最大的qps * 1s內包含桶的數量 // as.minRt()/1e3 - 窗口所有桶中最小的平均響應時間 / 1000ms這里是為了轉換成秒 return int64(math.Max(1, float64(as.maxPass()*as.windows)*(as.minRt()/1e3)))} // 滑動時間窗口內有多個桶// 找到請求數最多的那個// 每個桶占用的時間為 internal ms// qps指的是1s內的請求數,qps: maxPass * time.Second/internalfunc (as *adaptiveShedder) maxPass() int64 { var result float64 = 1 // 當前時間窗口內請求數最多的桶 as.passCounter.Reduce(func(b *collection.Bucket) { if b.Sum > result { result = b.Sum } }) return int64(result)}// 滑動時間窗口內有多個桶// 計算最小的平均響應時間// 因為需要計算近一段時間內系統能夠處理的最大并發數func (as *adaptiveShedder) minRt() float64 { // 默認為1000ms result := defaultMinRt as.rtCounter.Reduce(func(b *collection.Bucket) { if b.Count <= 0 { return } // 請求平均響應時間 avg := math.Round(b.Sum / float64(b.Count)) if avg < result { result = avg } }) return result}
參考資料
項目地址
https://github.com/zeromicro/go-zero
歡迎使用 go-zero
并 star 支持我們!
微信交流群
關注『微服務實踐』公眾號并點擊 交流群 獲取社區群二維碼。