摘要:序狀態轉換閉開在設定的時間窗口內失敗次數達到閾值,由閉開。進入開的同時啟動進入半開狀態的定時器。半開狀態的計數器目前半開狀態沒有使用時間窗口,僅僅使用連續成功次數來計算,一旦失敗,則將斷路器設置為狀態。
序 狀態轉換
閉->開
在設定的時間窗口內失敗次數達到閾值,由閉->開。
開->半開
在處于開的狀態,對目標的調用做失敗返回,進入開的時候,啟動計時器,設定時間過后進入半開狀態。
半開->開
進入半開狀態,會啟動一個計數器,記錄連續成功的調用次數,超過閾值,進入閉狀態。有一次失敗則進入開狀態,同時清零連續成功調用次數。進入開的同時啟動進入半開狀態的定時器。
半開->閉
進入半開狀態,會啟動一個計數器,記錄連續成功的調用次數,超過閾值,進入閉狀態,同時清零連續成功調用次數。
實現要點切到開狀態啟動的定時器
這里如果使用定時線程來做的話,開的線程多,管理比較麻煩,故這里改為維護一個切換到開狀態的時間,在每次方法調用,判斷是開狀態時,判斷是否已經過了這個超時閾值,超過的話,進入半開狀態。
半開狀態的計數器
目前半開狀態沒有使用時間窗口,僅僅使用連續成功次數來計算,一旦失敗,則將斷路器設置為open狀態。如果連續成功次數達到閾值,則進入close狀態。每次進入half-open的狀態時,連續成功的計數器清零。
主要代碼 斷路器狀態public enum CircuitBreakerState { CLOSED, // working normally, calls are transparently passing through OPEN, // method calls are being intercepted and CircuitBreakerExceptions are being thrown instead HALF_OPEN // method calls are passing through; if another blacklisted exception is thrown, reverts back to OPEN }帶時間窗口的計數器
/** * 帶時間窗口的限流計數器 */ public class LimitCounter { private long startTime; private long timeIntervalInMs; private int maxLimit; private AtomicInteger currentCount; public LimitCounter(long timeIntervalInMs, int maxLimit) { super(); this.timeIntervalInMs = timeIntervalInMs; this.maxLimit = maxLimit; startTime = System.currentTimeMillis(); currentCount = new AtomicInteger(0); } public int incrAndGet() { long currentTime = System.currentTimeMillis(); if ((startTime + timeIntervalInMs) < currentTime) { synchronized (this) { if ((startTime + timeIntervalInMs) < currentTime) { startTime = currentTime; currentCount.set(0); } } } return currentCount.incrementAndGet(); } public boolean thresholdReached(){ return currentCount.get() > maxLimit; } public int get(){ return currentCount.get(); } public /*synchronized*/ void reset(){ currentCount.set(0); } }主要配置
public class CircuitBreakerConfig { //closed狀態的失敗次數閾值 private int failThreshold = 5; //closed狀態的失敗計數的時間窗口 private int failCountWindowInMs = 60*1000; //處于open狀態下進入half-open的超時時間 private int open2HalfOpenTimeoutInMs = 5*1000; //half-open狀態下成功次數閾值 private int consecutiveSuccThreshold = 5; private CircuitBreakerConfig(){ } public static CircuitBreakerConfig newDefault(){ CircuitBreakerConfig config = new CircuitBreakerConfig(); return config; } public int getFailThreshold() { return failThreshold; } public void setFailThreshold(int failThreshold) { this.failThreshold = failThreshold; } public int getFailCountWindowInMs() { return failCountWindowInMs; } public void setFailCountWindowInMs(int failCountWindowInMs) { this.failCountWindowInMs = failCountWindowInMs; } public int getOpen2HalfOpenTimeoutInMs() { return open2HalfOpenTimeoutInMs; } public void setOpen2HalfOpenTimeoutInMs(int open2HalfOpenTimeoutInMs) { this.open2HalfOpenTimeoutInMs = open2HalfOpenTimeoutInMs; } public int getConsecutiveSuccThreshold() { return consecutiveSuccThreshold; } public void setConsecutiveSuccThreshold(int consecutiveSuccThreshold) { this.consecutiveSuccThreshold = consecutiveSuccThreshold; } }斷路器
public class CircuitBreaker { private static final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class); private String name; private CircuitBreakerConfig config; private volatile CircuitBreakerState state = CircuitBreakerState.CLOSED; //最近進入open狀態的時間 private volatile long lastOpenedTime; //closed狀態下失敗次數 private LimitCounter failCount ; //half-open狀態的連續成功次數,失敗立即清零 private AtomicInteger consecutiveSuccCount = new AtomicInteger(0); //構造器 public CircuitBreaker(String name,CircuitBreakerConfig config) { this.config = config; this.name = name; failCount = new LimitCounter(config.getFailCountWindowInMs(),config.getFailThreshold()); } //狀態判斷 public boolean isOpen(){ return CircuitBreakerState.OPEN == state; } public boolean isHalfOpen(){ return CircuitBreakerState.HALF_OPEN == state; } public boolean isClosed(){ return CircuitBreakerState.CLOSED == state; } //狀態操作 /** * closed->open | halfopen -> open */ public void open(){ lastOpenedTime = System.currentTimeMillis(); state = CircuitBreakerState.OPEN; logger.debug("circuit open,key:{}",name); } /** * open -> halfopen */ public void openHalf(){ consecutiveSuccCount.set(0); state = CircuitBreakerState.HALF_OPEN; logger.debug("circuit open-half,key:{}",name); } /** * halfopen -> close */ public void close(){ failCount.reset(); state = CircuitBreakerState.CLOSED; logger.debug("circuit close,key:{}",name); } //閾值判斷 /** * 是否應該轉到half open * 前提是 open state * @return */ public boolean isOpen2HalfOpenTimeout(){ return System.currentTimeMillis() - config.getOpen2HalfOpenTimeoutInMs() > lastOpenedTime; } /** * 是否應該從close轉到open * @return */ public boolean isCloseFailThresholdReached(){ return failCount.thresholdReached(); } /** * half-open狀態下是否達到close的閾值 * @return */ public boolean isConsecutiveSuccessThresholdReached(){ return consecutiveSuccCount.get() >= config.getConsecutiveSuccThreshold(); } //getter public void incrFailCount() { int count = failCount.incrAndGet(); logger.debug("incr fail count:{},key:{}",count,name); } public AtomicInteger getConsecutiveSuccCount() { return consecutiveSuccCount; } public CircuitBreakerState getState() { return state; } }斷路器維護的變量
//最近進入open狀態的時間 private volatile long lastOpenedTime; //closed狀態下失敗次數 private LimitCounter failCount ; //half-open狀態的連續成功次數,失敗立即清零 private AtomicInteger consecutiveSuccCount = new AtomicInteger(0);基于jdk代理的攔截
public class CircuitBreakerInvocationHandler implements InvocationHandler{ private static final Logger logger = LoggerFactory.getLogger(CircuitBreakerInvocationHandler.class); private Object target; public CircuitBreakerInvocationHandler(Object target) { this.target = target; } //動態生成代理對象 public Object proxy(){ return Proxy.newProxyInstance(this.target.getClass().getClassLoader(), this.target.getClass().getInterfaces(), this); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { GuardByCircuitBreaker breakerAnno = method.getAnnotation(GuardByCircuitBreaker.class); if(breakerAnno == null){ return method.invoke(target,args); } Class extends Throwable>[] noTripExs = breakerAnno.noTripExceptions(); int timeout = breakerAnno.timeoutInMs(); int interval = breakerAnno.failCountWindowInMs(); int failThreshold = breakerAnno.failThreshold(); CircuitBreakerConfig cfg = CircuitBreakerConfig.newDefault(); if(interval != -1){ cfg.setFailCountWindowInMs(interval); } if(failThreshold != -1){ cfg.setFailThreshold(failThreshold); } String key = target.getClass().getSimpleName() + method.getName(); CircuitBreaker breaker = CircuitBreakerRegister.get(key); if(breaker == null){ breaker = new CircuitBreaker(key,cfg); CircuitBreakerRegister.putIfAbsent(key,breaker); } Object returnValue = null; logger.debug("breaker state:{},method:{}",breaker.getState(),method.toGenericString()); //breaker state if(breaker.isOpen()){ //判斷是否該進入half open狀態 if(breaker.isOpen2HalfOpenTimeout()){ //進入half open狀態 breaker.openHalf(); logger.debug("method:{} into half open",method.toGenericString()); returnValue = processHalfOpen(breaker,method,args,noTripExs); }else{ throw new CircuitBreakerOpenException(method.toGenericString()); } }else if(breaker.isClosed()){ try{ returnValue = method.invoke(target,args); // 這里看情況是否重置標志 // breaker.close(); }catch (Throwable t){ if(isNoTripException(t,noTripExs)){ throw t; }else{ //增加計數 breaker.incrFailCount(); if(breaker.isCloseFailThresholdReached()){ //觸發閾值,打開 logger.debug("method:{} reached fail threshold, circuit breaker open",method.toGenericString()); breaker.open(); throw new CircuitBreakerOpenException(method.toGenericString()); }else{ throw t; } } } }else if(breaker.isHalfOpen()){ returnValue = processHalfOpen(breaker,method,args,noTripExs); } return returnValue; } private Object processHalfOpen(CircuitBreaker breaker,Method method, Object[] args,Class extends Throwable>[] noTripExs) throws Throwable { try{ Object returnValue = method.invoke(target,args); breaker.getConsecutiveSuccCount().incrementAndGet(); if(breaker.isConsecutiveSuccessThresholdReached()){ //調用成功則進入close狀態 breaker.close(); } return returnValue; }catch (Throwable t){ if(isNoTripException(t,noTripExs)){ breaker.getConsecutiveSuccCount().incrementAndGet(); if(breaker.isConsecutiveSuccessThresholdReached()){ breaker.close(); } throw t; }else{ breaker.open(); throw new CircuitBreakerOpenException(method.toGenericString(), t); } } } private boolean isNoTripException(Throwable t,Class extends Throwable>[] noTripExceptions){ if(noTripExceptions == null || noTripExceptions.length == 0){ return false; } for(Class extends Throwable> ex:noTripExceptions){ //是否是拋出異常t的父類 //t java.lang.reflect.InvocationTargetException if(ex.isAssignableFrom(t.getCause().getClass())){ return true; } } return false; } }
github工程circuit-breaker參考
martinfowler-CircuitBreaker
microsoft-Circuit Breaker Pattern(必讀)
cloud-design-patterns-斷路器模式
HystrixCircuitBreaker
熔斷器設計模式(實現參考)
Creating a circuit breaker with Spring AOP(實現參考)
alenegro81/CircuitBreaker(參考jdk代理實現)
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/65827.html
摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個實現類分別是實現了接口,它不做任何操作實現了接口其方法會拋出方法首先判斷,如果為,則執行方法如果為則調用,否則調用計算,沒有拋出異常的話,則最后執序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...
摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個實現類分別是實現了接口,它不做任何操作實現了接口其方法會拋出方法首先判斷,如果為,則執行方法如果為則調用,否則調用計算,沒有拋出異常的話,則最后執序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...
摘要:序本文主要研究一下的定義了枚舉它還定義了等方法它有兩個實現類分別是實現了接口,它不做任何操作實現了接口其方法會拋出方法首先判斷,如果為,則執行方法如果為則調用,否則調用計算,沒有拋出異常的話,則最后執序 本文主要研究一下Elasticsearch的CircuitBreaker CircuitBreaker elasticsearch-7.0.1/server/src/main/java/or...
閱讀 2783·2023-04-25 18:06
閱讀 2576·2021-11-22 09:34
閱讀 1684·2021-11-08 13:16
閱讀 1302·2021-09-24 09:47
閱讀 3049·2019-08-30 15:44
閱讀 2773·2019-08-29 17:24
閱讀 2584·2019-08-23 18:37
閱讀 2433·2019-08-23 16:55