摘要:當為空時,會使用線性輪詢策略,當有時,會通過遍歷負載均衡器中維護的所有服務實例,會過濾掉故障的實例,并找出并發請求數最小的一個。
客戶端負載均衡Spring Cloud Ribbon
?Spring Cloud Ribbon是一個基于HTTP和TCP的客戶端負載均衡工具,基于Netflix Ribbon實現。
目錄客戶端負載均衡
源碼分析
負載均衡器
負載均衡策略(本文重點)
配置詳解
自動化配置
負載均衡器?負載均衡器相關內容見上一篇文章
負載均衡策略 AbstractLoadBalancerRule?負載均衡策略的抽象類,在該抽象類中定義了負載均衡器ILoadBalancer對象,該對象能夠在具體實現選擇服務策略時,獲取到一些負載均衡器中維護的信息作為分配依據,并以此設計一些算法來實現針對特定場景的高效策略。
package com.netflix.loadbalancer; import com.netflix.client.IClientConfigAware; public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }RandomRule
?該策略實現了從服務實例清單中隨機選擇一個服務實例的功能。下面先看一下源碼:
package com.netflix.loadbalancer; import java.util.List; import java.util.Random; import com.netflix.client.config.IClientConfig; public class RandomRule extends AbstractLoadBalancerRule { Random rand; public RandomRule() { rand = new Random(); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } ListupList = lb.getReachableServers(); List allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn"t actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { // TODO Auto-generated method stub } }
?分析源碼可以看出,IRule接口中Server choose(Object key)函數的實現委托給了該類中的Server choose(ILoadBalancer lb, Object key)函數,該方法增加了一個負載均衡器參數。從具體的實現可以看出,它會使用負載均衡器來獲得可用實例列表upList和所有的實例列表allList,并且使用rand.nextInt(serverCount)函數來獲取一個隨機數,并將該隨機數作為upList的索引值來返回具體實例。同時,具體的選擇邏輯在一個while (server == null)循環之內,而根據選擇邏輯的實現,正常情況下每次都應該選出一個服務實例,如果出現死循環獲取不到服務實例時,則很有可能存在并發的Bug。
RoundRobinRule?該策略實現了按照線性輪詢的方式依次選擇每個服務實例的功能。下面看一下源碼:
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { ListreachableServers = lb.getReachableServers(); List allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } /** * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } }
?RoundRobinRule具體實現和RandomRule類似,但是循環條件和從可用列表獲取實例的邏輯不同。循環條件中增加了一個count計數變量,該變量會在每次循環之后累加,如果循環10次還沒獲取到Server,就會結束,并打印一個警告信息No available alive servers after 10 tries from load balancer:...。
?線性輪詢的實現是通過AtomicInteger nextServerCyclicCounter對象實現,每次進行實例選擇時通過調用int incrementAndGetModulo(int modulo)方法來實現。
RetryRule?該策略實現了一個具備重試機制的實例選擇功能。從源碼中可以看出,內部定義了一個IRule對象,默認是RoundRobinRule實例,choose方法中則實現了對內部定義的策略進行反復嘗試的策略,若期間能夠選擇到具體的服務實例就返回,若選擇不到并且超過設置的嘗試結束時間(maxRetryMillis參數定義的值 + choose方法開始執行的時間戳)就返回null。
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; /* * Loop if necessary. Note that the time CAN be exceeded depending on the * subRule, because we"re not spawning additional threads and returning * early. */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it"s transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } }WeightedResponseTimeRule
?該策略是對RoundRobinRule的擴展,增加了根據實例的運行情況來計算權重,并根據權重來挑選實例,以達到更優的分配效果。它的實現主要有三個核心內容。
定時任務?WeightedResponseTimeRule策略在初始化的時候會通過serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)啟動一個定時任務,用來為每個服務實例計算權重,該任務默認30s執行一次。
權重計算?在源碼中我們可以輕松找到用于存儲權重的對象private volatile List
public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen Double weightSoFar = 0.0; // create new list and hot swap the reference ListfinalWeights = new ArrayList (); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } }
?該方法的實現主要分為兩個步驟:
根據LoadBalancerStats中記錄的每個實例的統計信息,累加所有實例的平均響應時間,得到總平均響應時間totalResponseTime,該值會用于后續的計算。
為負載均衡器中維護的實例清單逐個計算權重(從第一個開始),計算規則為weightSoFar + totalResponseTime - 實例的平均響應時間,其中weightSoFar初始化為0,并且每計算好一個權重需要累加到weightSoFar上供下一次計算使用。
?通過概算計算出來的權重值只是代表了各實例權重區間的上限。下面圖節選自Spring Cloud 微服務實戰。
?下面看一下Server choose(ILoadBalancer lb, Object key)如何選擇Server的
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread ListcurrentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // last one in the list is the sum of all weights double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; }
?下面我們看一下源碼的主要步驟有:
首先先獲取accumulatedWeights中最后一個權重,如果該權重小于0.001或者實例的數量不等于權重列表的數量,就采用父類的線性輪詢策略
如果滿足條件,就先產生一個[0,最大權重值)區間內的隨機數
遍歷權重列表,比較權重值與隨機數的大小,如果權重值大于等于隨機數,就拿當前權重列表的索引值去服務實例列表獲取具體的實例。
?細心的可能會發現第一個服務實例的權重區間是雙閉,最后一個服務實例的權重區間是雙開,其他服務實例的區間都是左開右閉。這是因為隨機數的最小值可以為0,所以第一個實例下限是閉區間,同時隨機數的最大值取不到最大權重值,所以最后一個實例的上限是開區間。
ClientConfigEnabledRoundRobinRule?該策略比較特殊,一般不直接使用它。因為他本身并沒有實現特殊的處理邏輯,在他內部定義了一個RoundRobinRule策略,choose函數的實現其實就是采用了RoundRobinRule的線性輪詢機制。
?在實際開發中,我們并不會直接使用該策略,而是基于它做高級策略擴展。
BestAvailableRule?該策略繼承自ClientConfigEnabledRoundRobinRule,在實現中它注入了負載均衡器的統計對象LoadBalancerStats,同時在choose方法中利用LoadBalancerStats保存的實例統計信息來選擇滿足要求的服務實例。
?當LoadBalancerStats為空時,會使用RoundRobinRule線性輪詢策略,當有LoadBalancerStats時,會通過遍歷負載均衡器中維護的所有服務實例,會過濾掉故障的實例,并找出并發請求數最小的一個。
?該策略的特性是可以選出最空閑的服務實例。
PredicateBasedRule?這是一個抽象策略,它繼承了ClientConfigEnabledRoundRobinRule,從命名中可以猜出這是一個基于Predicate實現的策略,Predicate是Google Guava Collection工具對集合進行過濾的條件接口。
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optionalserver = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
?在該源碼中,它定義了一個抽象函數getPredicate來獲取AbstractServerPredicate對象的實現,在choose方法中,通過AbstractServerPredicate的chooseRoundRobinAfterFiltering函數來選擇具體的服務實例。從該方法的命名我們可以看出大致的邏輯:首先通過子類中實現的Predicate邏輯來過濾一部分服務實例,然后再以線性輪詢的方式從過濾后的實例清單中選出一個。
?在上面choose函數中調用的chooseRoundRobinAfterFiltering方法先通過內部定義的getEligibleServers函數來獲取備選的實例清單(實現了過濾),如果返回的清單為空,則用Optional.absent來表示不存在,反之則以線性輪詢的方式從備選清單中獲取一個實例。
?下面看一下getEligibleServers方法的源碼
public ListgetEligibleServers(List servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } }
?上述源碼的大致邏輯是遍歷服務清單,使用this.apply方法來判斷實例是否需要保留,如果是就添加到結果列表中。
?實際上,AbstractServerPredicate實現了com.google.common.base.Predicate接口,apply方法是接口中的定義,主要用來實現過濾條件的判斷邏輯,它輸入的參數則是過濾條件需要用到的一些信息(比如源碼中的new PredicateKey(loadBalancerKey, server)),傳入了關于實例的統計信息和負載均衡器的選擇算法傳遞過來的key。
?AbstractServerPredicate沒有apply的實現,所以這里的chooseRoundRobinAfterFiltering方法只是定義了一個模板策略:先過濾清單,再輪詢選擇。
?對于如何過濾,需要在AbstractServerPredicate的子類中實現apply方法來確定具體的過濾策略。
AvailabilityFilteringRule&emsps;該類繼承自PredicateBasedRule,遵循了先過濾清單,再輪詢選擇的基本處理邏輯,其中過濾條件使用了AvailabilityPredicate,下面看一下AvailabilityPredicate的源碼:
package com.netflix.loadbalancer; import javax.annotation.Nullable; import com.netflix.client.config.IClientConfig; import com.netflix.config.ChainedDynamicProperty; import com.netflix.config.DynamicBooleanProperty; import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; public class AvailabilityPredicate extends AbstractServerPredicate { @Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } }
?從上面的源碼可以看出,主要過的過濾邏輯都是在boolean shouldSkipServer(ServerStats stats)方法中實現,該方法主要判斷服務實例的兩項內容:
是否故障,即斷路由器是否生效已斷開
實例的并發請求數大于閥值,默認值2^32 - 1,該配置可以通過參數
?上面兩項只要滿足一項,apply方法就返回false,代表該服務實例可能存在故障或負載過高,都不滿足就返回true。
?在AvailabilityFilteringRule進行實例選擇時做了小小的優化,它并沒有向父類一樣先遍歷所有的節點進行過濾,然后在過濾后的集合中選擇實例。而是先以線性的方式選擇一個實例,接著使用過濾條件來判斷該實例是否滿足要求,若滿足就直接使用該實例,若不滿足要求就再選擇下一個實例,檢查是否滿足要求,這個過程循環10次如果還沒有找到合適的服務實例,就采用父類的實現方案。
?該策略通過線性輪詢的方式直接嘗試尋找可用且比較空閑的實例來用,優化了每次都要遍歷所有實例的開銷。
ZoneAvoidanceRule?該類也是PredicateBasedRule的子類,它的實現是通過組合過濾條件CompositePredicate,以ZoneAvoidancePredicate為主過濾條件,以AvailabilityPredicate為次過濾條件。
?ZoneAvoidanceRule的實現并沒有像AvailabilityFilteringRule重寫choose函數來優化,所以它遵循了先過濾清單再輪詢選擇的基本邏輯。
?下面看一下CompositePredicate的源碼
package com.netflix.loadbalancer; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Lists; public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private Listfallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; @Override public boolean apply(@Nullable PredicateKey input) { return delegate.apply(input); } @Override public List getEligibleServers(List servers, Object loadBalancerKey) { List result = super.getEligibleServers(servers, loadBalancerKey); Iterator i = fallbacks.iterator(); while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; } }
?從源碼中可以看出,CompositePredicate定義了一個主過濾條件delegate和一組過濾條件列表fallbacks,次過濾條件的過濾順序是按存儲順序執行的。
?在獲取結果的getEligibleServers函數中的主要邏輯是:
使用主過濾條件對所有實例過濾并返回過濾后的實例清單
每次使用次過濾條件過濾前,都要判斷兩個條件,一個是過濾后的實例總數 >= 最小過濾實例數(minimalFilteredServers,默認值為1),另一個是過濾后的實例比例 > 最小過濾百分比(minimalFilteredPercentage,默認為0),只要有一個不符合就不再進行過濾,將當前服務實例列表返回
依次使用次過濾條件列表中的過濾條件對主過濾條件的過濾結果進行過濾。
后續后面會介紹Spring Cloud Ribbon配置方式,請持續關注!!!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76744.html
摘要:中的名稱,一定要是服務中注冊的名稱。添加這個的注解,主要是因為定義的時候報錯,也就是說明沒有被實例化。采用隨機分配的策略。添加訪問層添加電影微服務啟動類電影微服務,使用定制化在客戶端進行負載均衡,使用不同服務不同配置策略。 SpringCloud(第 007 篇)電影微服務,使用定制化 Ribbon 在客戶端進行負載均衡,使用 RibbonClient 不同服務不同配置策略 - 一、大...
摘要:概要什么是實戰整合實現負載均衡是什么是一個客戶端負載均衡的組件什么是負載均衡負載均衡就是分發請求流量到不同的服務器目前的實現有軟件和硬件負載均衡分為兩種服務器端負載均衡如上圖所示服務器端負載均衡是對客戶透明的用戶請求到服務器真正的服務器是由 概要 什么是Spring Cloud Netflix Ribbon? 實戰:整合Ribbon實現負載均衡 Spring Cloud Netfl...
摘要:代碼如下定義了用來存儲負載均衡器各服務實例屬性和統計信息的對象。下面看一下負載均衡器增加了哪些內容。 客戶端負載均衡Spring Cloud Ribbon ?Spring Cloud Ribbon是一個基于HTTP和TCP的客戶端負載均衡工具,基于Netflix Ribbon實現。 目錄 客戶端負載均衡 源碼分析 負載均衡器(本文重點) 負載均衡策略 配置詳解 自動化配置 客戶端負...
摘要:第篇電影微服務,使用在客戶端進行負載均衡一大致介紹是發布的云中間層服務開源項目,主要功能是提供客戶端負載均衡算法。而被注解后,能過用負載均衡,主要是維護了一個被注解的列表,并給列表中的添加攔截器,進而交給負載均衡器去處理。 SpringCloud(第 006 篇)電影微服務,使用 Ribbon 在客戶端進行負載均衡 - 一、大致介紹 1、Ribbon 是 Netflix 發布的云中間層...
摘要:客戶端負載均衡需要客戶端自己維護自己要訪問的服務實例清單,這些服務清單來源于注冊中心在使用進行服務治理時。使用從負載均衡器中挑選出的服務實例來執行請求內容。 客戶端負載均衡Spring Cloud Ribbon ?Spring Cloud Ribbon是一個基于HTTP和TCP的客戶端負載均衡工具,基于Netflix Ribbon實現。 目錄 客戶端負載均衡(本文重點) 源碼分析(本...
摘要:客戶端負載均衡器是一個客戶端負載均衡器,可以讓你對和客戶端的行為進行大量控制,已經使用了,因此,如果你使用,此部分也適用。 客戶端負載均衡器:Ribbon Ribbon是一個客戶端負載均衡器,可以讓你對HTTP和TCP客戶端的行為進行大量控制,Feign已經使用了Ribbon,因此,如果你使用@FeignClient,此部分也適用。 Ribbon中的一個核心概念是命名客戶端,每個負載均...
閱讀 2030·2023-04-26 02:15
閱讀 2306·2021-11-19 09:40
閱讀 1044·2021-10-27 14:13
閱讀 3313·2021-08-23 09:44
閱讀 3614·2019-12-27 12:24
閱讀 657·2019-08-30 15:53
閱讀 1169·2019-08-30 10:53
閱讀 2163·2019-08-26 12:14