摘要:允許突發流量的平滑限流器的實現。實例執行結果方法返回結果代表獲取所等待的時間。先看下示例代碼運行效果為了預防突然暴增的流量將系統壓垮,很貼心的增加了預熱。
RateLimiter 類圖
RateLimiter:作為抽象類提供一個限流器的基本的抽象方法。
SmoothRateLimiter:平滑限流器實現,提供了Ratelimiter中的抽象限流方法的平滑實現。
SmoothBursty:允許突發流量的平滑限流器的實現。
SmoothWarmingUp:平滑預熱限流器的實現。
public void test() throws InterruptedException { RateLimiter rateLimiter = RateLimiter.create(2); while (true){ System.out.println(rateLimiter.acquire(2)); TimeUnit.SECONDS.sleep(2); System.out.println(rateLimiter.acquire(1)); System.out.println(rateLimiter.acquire(1)); System.out.println(rateLimiter.acquire(10)); } }
執行結果
acquire方法返回結果代表獲取token所等待的時間。
第一行0等待,剛創建限流器,還沒來得及放任何token,此處存儲的token=0,但是無欠款所以預消費2個;
sleep 2秒,按照每秒2個的速度,先“還”了欠款,然后token直接恢復至max = 2;
第二行0,現有2個token,用一個,無需等待。
第三行0,現有1個token,用一個,無需等待。
第四行0,現有0個token,無欠款,直接借10個。
第五行4.999868,幫上一個還欠款,等待5秒直到還完欠款后,又借了2個。
重復第一行......
在使用RateLimiter.create(double)方法初始化限流器時,實際上默認使用的是SmoothBursty
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; }SmoothBursty
/** The currently stored permits. 當前存儲的令牌數 */ double storedPermits; /** * The maximum number of stored permits. * 最大存儲令牌數 */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 添加令牌時間間隔 */ double stableIntervalMicros; /** * The time when the next request (no matter its size) will be granted. After granting a request, * this is pushed further in the future. Large requests push this further than small requests. * 下一次請求可以獲取令牌的起始時間 * 由于RateLimiter允許預消費,上次請求預消費令牌后 * 下次請求需要等待相應的時間到nextFreeTicketMicros時刻才可以獲取令牌 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future
從acquire()函數開始
public double acquire() { return acquire(1);//默認取一個令牌 } public double acquire(int permits) { long microsToWait = reserve(permits);//從限流器中獲取指定的令牌,并返回需要等待的時間 stopwatch.sleepMicrosUninterruptibly(microsToWait);//讓“鬧鐘”將當前線程停止睡眠指定時間 return 1.0 * microsToWait / SECONDS.toMicros(1L);//返回等待的時間,單位是秒 } final long reserve(int permits) { checkPermits(permits);//參數校驗 synchronized (mutex()) {//獲取鎖,多個請求到達,需要串行的獲取 return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } }
先來看下加鎖的邏輯
private volatile Object mutexDoNotUseDirectly; private Object mutex() { Object mutex = mutexDoNotUseDirectly; if (mutex == null) { synchronized (this) { mutex = mutexDoNotUseDirectly; if (mutex == null) { mutexDoNotUseDirectly = mutex = new Object(); } } } return mutex; }
典型的雙重檢查單例
接著繼續探索獲取令牌的邏輯代碼
final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros);//獲取token并返回下個請求可以來獲取token的時間 return max(momentAvailable - nowMicros, 0);//計算等待時間 }
關鍵函數一:
abstract long reserveEarliestAvailable(int permits, long nowMicros);
SmoothRateLimiter實現了它,是獲取token、消耗token的主流程
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this.storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = **加粗文字** storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; System.out.println(String.format("storedPermitsToSpend=%s,freshPermits=%s,waitMicros=%s,storedPermits=%s", storedPermitsToSpend, freshPermits, waitMicros, storedPermits)); return returnValue; }
更新令牌桶中的token。
計算下次能獲得令牌的時間
扣除本次所需令牌
storedPermitsToSpend代表需要從storedPermits扣除的token,如果storedPermits已經=0了,那么不會扣除到負數
waitMicros代表此次預消費的令牌需要多少時間來恢復,最終將它加到nextFreeTicketMicros**
那么SmoothBursty是怎么實現預消費的呢?,讓我們先看下更新token的邏輯,即void resync(long nowMicros)
void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
更新流程
如果當前時間大于freeTime,則進入更新操作。
將時間差除以令牌恢復間隔,計算出得到恢復的令牌個數
更新令牌桶令牌的存儲數量和freeTime
SmoothBursty是怎么實現預消費的呢?
其實,只要保證一點就可以進行預消費,即無欠款,無欠款就代表當前時間大于等于nextFreeTime,SmoothBursty就是依靠此原理來處理突發的流量。
先看下示例代碼
public void test_warmingUp(){ RateLimiter rateLimiter = RateLimiter.create(2, 4, TimeUnit.SECONDS); while (true){ System.out.println(rateLimiter.acquire(1)); System.out.println(rateLimiter.acquire(1)); System.out.println(rateLimiter.acquire(1)); System.out.println(rateLimiter.acquire(1)); } }
運行效果
0.0 1.372264 1.117788 0.869905 0.620505 0.496059 0.495301 0.496027 0.495794
SmoothWarmingUp為了預防突然暴增的流量將系統壓垮,很貼心的增加了“預熱”。指定的warmupPeriod就是預熱時間,在“冷狀態”即沒有流量進入時,放入每個token的時間不僅僅是1/permitsPerSecond,還要加上一個預熱時間,類注釋上的圖作了很好的解釋。
SmoothWarmingUp在初始階段與SmoothBursty有點不同,SmoothWarmingUp初始storePermits = maxPermits。一直使用permits直至storePermits減少到thresholdPermits(setRate調用時計算)放入token的時間便穩定下來,到達了“熱狀態”,此時與SmoothBursty是一模一樣。但是如果在warmupPeriod時間內沒有流量進入,則再次會進入“冷狀態“。
在實現上SmoothWarmingUp與SmoothBursty基本相同,唯一不同僅僅只有兩個函數的實現上
coolDownIntervalMicros()返回一個token的冷卻時間,SmoothWarmingUp注釋中有介紹,為了保證在warmUpPeriod時間剛好可以恢復maxPermits個token,因此SmoothWarmingUp此函數返回的是warmupPeriodMicros / maxPermits
storedPermitsToWaitTime(double storedPermits, double permitsToTake)返回消耗permitsToTake個token所需要等待的時間,SmoothBursty則是直接返回0.
SmoothWarmingUp的注釋解釋的很到位,在預熱限流器中,計算token的等待時間就可以轉化計算圖中的面積,大家可以順著注釋推導一下。
總結SmoothBursty:初始token為0,允許預消費,放入token的時間固定為1/permitsPerSecond.(一開始直接上)
SmoothWarmingUp:初始token為MaxPermits,允許預消費,可以指定預熱時間,在與預熱時間過后速率恢復平穩與SmoothBursty一致。(老司機有前戲)
SmoothWarmingUp像了改良版的SmoothBursty,有個預熱時間,系統能更加從容的應付流量的來襲,因此一般可以優先使用SmoothWarmingUp。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76815.html
摘要:有如下模塊源碼解析源碼解析源碼解析源碼解析源碼解析源碼解析源碼解析源碼解析源碼解析使用和監控和博客從到學習介紹從到學習上搭建環境并構建運行簡單程序入門從到學習配置文件詳解從到學習介紹從到學習如何自 Flink Metrics 有如下模塊: Flink Metrics 源碼解析 —— Flink-metrics-core Flink Metrics 源碼解析 —— Flink-metr...
摘要:機制博客從到學習介紹從到學習上搭建環境并構建運行簡單程序入門從到學習配置文件詳解從到學習介紹從到學習如何自定義從到學習介紹從到學習如何自定義從到學習轉換從到學習介紹中的從到學習中的幾種詳解從到學習讀取數據寫入到從到學習項目如何運行從 Flink Checkpoint 機制 https://t.zsxq.com/ynQNbeM 博客 1、Flink 從0到1學習 —— Apache Fl...
摘要:序列化機制博客從到學習介紹從到學習上搭建環境并構建運行簡單程序入門從到學習配置文件詳解從到學習介紹從到學習如何自定義從到學習介紹從到學習如何自定義從到學習轉換從到學習介紹中的從到學習中的幾種詳解從到學習讀取數據寫入到從到學習項目如何 Flink 序列化機制 https://t.zsxq.com/JaQfeMf 博客 1、Flink 從0到1學習 —— Apache Flink 介紹 2...
摘要:模塊中的類結構如下博客從到學習介紹從到學習上搭建環境并構建運行簡單程序入門從到學習配置文件詳解從到學習介紹從到學習如何自定義從到學習介紹從到學習如何自定義從到學習轉換從到學習介紹中的從到學習中的幾種詳解從到學習讀取數據寫入到從到學 Flink-Client 模塊中的類結構如下: https://t.zsxq.com/IMzNZjY showImg(https://segmentfau...
摘要:模塊中的類結構如下博客從到學習介紹從到學習上搭建環境并構建運行簡單程序入門從到學習配置文件詳解從到學習介紹從到學習如何自定義從到學習介紹從到學習如何自定義從到學習轉換從到學習介紹中的從到學習中的幾種詳解從到學習讀取數據寫入到從到學 Flink-Annotations 模塊中的類結構如下: https://t.zsxq.com/f6eAu3J showImg(https://segme...
閱讀 1818·2023-04-26 02:51
閱讀 2849·2021-09-10 10:50
閱讀 3026·2021-09-01 10:48
閱讀 3594·2019-08-30 15:53
閱讀 1816·2019-08-29 18:40
閱讀 405·2019-08-29 16:16
閱讀 2024·2019-08-29 13:21
閱讀 1816·2019-08-29 11:07