客戶端負載均衡Spring Cloud Ribbon
?Spring Cloud Ribbon是一個基于HTTP和TCP的客戶端負載均衡工具,基于Netflix Ribbon實現。
負載均衡策略 AbstractLoadBalancerRule?負載均衡策略的抽象類,在該抽象類中定義了負載均衡器ILoadBalancer對象,該對象能夠在具體實現選擇服務策略時,獲取到一些負載均衡器中維護的信息作為分配依據,并以此設計一些算法來實現針對特定場景的高效策略。
package com.netflix.loadbalancer; import com.netflix.client.IClientConfigAware; public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }RandomRule
package com.netflix.loadbalancer; import java.util.List; import java.util.Random; import com.netflix.client.config.IClientConfig; public class RandomRule extends AbstractLoadBalancerRule { Random rand; public RandomRule() { rand = new Random(); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } ListupList = lb.getReachableServers(); List allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn"t actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { // TODO Auto-generated method stub } }
?分析源碼可以看出,IRule接口中Server choose(Object key)函數的實現委托給了該類中的Server choose(ILoadBalancer lb, Object key)函數,該方法增加了一個負載均衡器參數。從具體的實現可以看出,它會使用負載均衡器來獲得可用實例列表upList和所有的實例列表allList,并且使用rand.nextInt(serverCount)函數來獲取一個隨機數,并將該隨機數作為upList的索引值來返回具體實例。同時,具體的選擇邏輯在一個while (server == null)循環之內,而根據選擇邏輯的實現,正常情況下每次都應該選出一個服務實例,如果出現死循環獲取不到服務實例時,則很有可能存在并發的Bug。
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { ListreachableServers = lb.getReachableServers(); List allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } /** * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}. * * @param modulo The modulo to bound the value of the counter. * @return The next value. */ private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } }
?RoundRobinRule具體實現和RandomRule類似,但是循環條件和從可用列表獲取實例的邏輯不同。循環條件中增加了一個count計數變量,該變量會在每次循環之后累加,如果循環10次還沒獲取到Server,就會結束,并打印一個警告信息No available alive servers after 10 tries from load balancer:...。
?線性輪詢的實現是通過AtomicInteger nextServerCyclicCounter對象實現,每次進行實例選擇時通過調用int incrementAndGetModulo(int modulo)方法來實現。
RetryRule?該策略實現了一個具備重試機制的實例選擇功能。從源碼中可以看出,內部定義了一個IRule對象,默認是RoundRobinRule實例,choose方法中則實現了對內部定義的策略進行反復嘗試的策略,若期間能夠選擇到具體的服務實例就返回,若選擇不到并且超過設置的嘗試結束時間(maxRetryMillis參數定義的值 + choose方法開始執行的時間戳)就返回null。
package com.netflix.loadbalancer; import com.netflix.client.config.IClientConfig; public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; /* * Loop if necessary. Note that the time CAN be exceeded depending on the * subRule, because we"re not spawning additional threads and returning * early. */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it"s transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } }WeightedResponseTimeRule
定時任務?WeightedResponseTimeRule策略在初始化的時候會通過serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)啟動一個定時任務,用來為每個服務實例計算權重,該任務默認30s執行一次。
權重計算?在源碼中我們可以輕松找到用于存儲權重的對象private volatile List
public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen Double weightSoFar = 0.0; // create new list and hot swap the reference ListfinalWeights = new ArrayList (); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } }
為負載均衡器中維護的實例清單逐個計算權重(從第一個開始),計算規則為weightSoFar + totalResponseTime - 實例的平均響應時間,其中weightSoFar初始化為0,并且每計算好一個權重需要累加到weightSoFar上供下一次計算使用。
?通過概算計算出來的權重值只是代表了各實例權重區間的上限。下面圖節選自Spring Cloud 微服務實戰。
?下面看一下Server choose(ILoadBalancer lb, Object key)如何選擇Server的
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread ListcurrentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // last one in the list is the sum of all weights double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; }
PredicateBasedRule?這是一個抽象策略,它繼承了ClientConfigEnabledRoundRobinRule,從命名中可以猜出這是一個基于Predicate實現的策略,Predicate是Google Guava Collection工具對集合進行過濾的條件接口。
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optionalserver = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }
public ListgetEligibleServers(List servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } }
?實際上,AbstractServerPredicate實現了com.google.common.base.Predicate接口,apply方法是接口中的定義,主要用來實現過濾條件的判斷邏輯,它輸入的參數則是過濾條件需要用到的一些信息(比如源碼中的new PredicateKey(loadBalancerKey, server)),傳入了關于實例的統計信息和負載均衡器的選擇算法傳遞過來的key。
package com.netflix.loadbalancer; import javax.annotation.Nullable; import com.netflix.client.config.IClientConfig; import com.netflix.config.ChainedDynamicProperty; import com.netflix.config.DynamicBooleanProperty; import com.netflix.config.DynamicIntProperty; import com.netflix.config.DynamicPropertyFactory; public class AvailabilityPredicate extends AbstractServerPredicate { @Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } }
?從上面的源碼可以看出,主要過的過濾邏輯都是在boolean shouldSkipServer(ServerStats stats)方法中實現,該方法主要判斷服務實例的兩項內容:
實例的并發請求數大于閥值,默認值2^32 - 1,該配置可以通過參數
package com.netflix.loadbalancer; import java.util.Iterator; import java.util.List; import javax.annotation.Nullable; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Lists; public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private Listfallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; @Override public boolean apply(@Nullable PredicateKey input) { return delegate.apply(input); } @Override public List getEligibleServers(List servers, Object loadBalancerKey) { List result = super.getEligibleServers(servers, loadBalancerKey); Iterator i = fallbacks.iterator(); while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; } }
每次使用次過濾條件過濾前,都要判斷兩個條件,一個是過濾后的實例總數 >= 最小過濾實例數(minimalFilteredServers,默認值為1),另一個是過濾后的實例比例 > 最小過濾百分比(minimalFilteredPercentage,默認為0),只要有一個不符合就不再進行過濾,將當前服務實例列表返回
后續后面會介紹Spring Cloud Ribbon配置方式,請持續關注!!!
