摘要:之分布式鎖的實現方案如何優雅地實現分布式鎖博客地址分布式鎖關鍵詞分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。
關鍵詞Redis之分布式鎖的實現方案 - 如何優雅地實現分布式鎖(JAVA)
博客地址 https://blog.piaoruiqing.cn/2019/05/19/redis分布式鎖/
分布式鎖: 是控制分布式系統之間同步訪問共享資源的一種方式。
spring-data-redis: Spring針對redis的封裝, 配置簡單, 提供了與Redis存儲交互的抽象封裝, 十分優雅, 也極具擴展性, 推薦讀一讀源碼
Lua: Lua 是一種輕量小巧的腳本語言, 可在redis執行.
前言本文闡述了Redis分布式鎖的一種簡單JAVA實現及優化進階, 實現了自動解鎖、自定義異常、重試、注解鎖等功能, 嘗試用更優雅簡潔的代碼完成分布式鎖.
需求互斥性: 在分布式系統環境下, 一個鎖只能被一個線程持有.
高可用: 不會發生死鎖、即使客戶端崩潰也可超時釋放鎖.
非阻塞: 獲取鎖失敗即返回.
方案Redis具有極高的性能, 且其命令對分布式鎖支持友好, 借助SET命令即可實現加鎖處理.
實現SET
EX seconds -- Set the specified expire time, in seconds.
PX milliseconds -- Set the specified expire time, in milliseconds.
NX -- Only set the key if it does not already exist.
XX -- Only set the key if it already exist.
簡單實現
做法為set if not exist(如果不存在則賦值), redis命令為原子操作, 所以多帶帶使用set命令時不用擔心并發導致異常.
具體代碼實現如下: (spring-data-redis:2.1.6)
依賴引入<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
<version>2.1.4.RELEASEversion>
dependency>
配置RedisTemplate
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
StringRedisSerializer keySerializer = new StringRedisSerializer();
RedisSerializer<");new StringRedisSerializer();
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(factory);
template.setKeySerializer(keySerializer);
template.setHashKeySerializer(keySerializer);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
return template;
}
簡單的分布式鎖實現
/**
* try lock
* @author piaoruiqing
*
* @param key lock key
* @param value value
* @param timeout timeout
* @param unit time unit
* @return
*/
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
以上代碼即完成了一個簡單的分布式鎖功能:
其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); 即為執行redis命令:
redis> set dlock:test-try-lock a EX 10 NX
OK
redis> set dlock:test-try-lock a EX 10 NX
null
早期版本spring-data-redis分布式鎖實現及注意事項
方法Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);是在2.1版本中新增的, 早期版本中setIfAbsent無法同時指定過期時間, 若先使用setIfAbsent再設置key的過期時間, 會存在產生死鎖的風險, 故舊版本中需要使用另外的寫法進行實現. 以spring-data-redis:1.8.20為例
/**
* try lock
* @author piaoruiqing
*
* @param key lock key
* @param value value
* @param timeout timeout
* @param unit time unit
* @return
*/
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
return redisTemplate.execute(new RedisCallback() {
@Override
public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
JedisCommands commands = (JedisCommands)connection.getNativeConnection();
String result = commands.set(key, value, "NX", "PX", unit.toMillis(timeout));
return "OK".equals(result);
}
});
}
spring-data-redis:1.8.20默認redis客戶端為jedis, 可通過getNativeConnection直接調用jedis方法進行操作. 新舊版本實現方式最終效果相同.
優化進階優化一 (自動解鎖及重試)基于AOP實現分布式鎖注解工具 - 不僅能用, 而且好用
自動解鎖、重試: 上一節針對分布式鎖的簡單實現可滿足基本需求, 但仍有較多可優化改進之處, 本小節將針對分布式鎖自動解鎖及重試進行優化
實現AutoCloseable接口, 可使用try-with-resource方便地完成自動解鎖.
/**
* distributed lock
* @author piaoruiqing
*
* @since JDK 1.8
*/
abstract public class DistributedLock implements AutoCloseable {
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
/**
* release lock
* @author piaoruiqing
*/
abstract public void release();
/*
* (non-Javadoc)
* @see java.lang.AutoCloseable#close()
*/
@Override
public void close() throws Exception {
LOGGER.debug("distributed lock close , {}", this.toString());
this.unlock();
}
}
RedisDistributedLock是Redis分布式鎖的抽象, 繼承了DistributedLock并實現了unlock接口.
/**
* redis distributed lock
*
* @author piaoruiqing
* @date: 2019/01/12 23:20
*
* @since JDK 1.8
*/
public class RedisDistributedLock extends DistributedLock {
private RedisOperations operations;
private String key;
private String value;
private static final String COMPARE_AND_DELETE = // (一)
"if redis.call("get",KEYS[1]) == ARGV[1]
" +
"then
" +
" return redis.call("del",KEYS[1])
" +
"else
" +
" return 0
" +
"end";
/**
* @param operations
* @param key
* @param value
*/
public RedisDistributedLock(RedisOperations operations, String key, String value) {
this.operations = operations;
this.key = key;
this.value = value;
}
/*
* (non-Javadoc)
* @see com.piaoruiqing.demo.distributed.lock.DistributedLock#release()
*/
@Override
public void release() { // (二)
List keys = Collections.singletonList(key);
operations.execute(new DefaultRedisScript(COMPARE_AND_DELETE), keys, value);
}
/*
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
}
}
(一): 通過Lua腳本進行解鎖, 使對比鎖的值+刪除成為原子操作, 確保解鎖操作的正確性. 簡單來說就是防止刪了別人的鎖. 例如: 線程A方法未執行完畢時鎖超時了, 隨后B線程也獲取到了該鎖(key相同), 但此時如果A線程方法執行完畢嘗試解鎖, 如果不比對value, 那么A將刪掉B的鎖, 這時候C線程又能加鎖, 業務將產生更嚴重的混亂.(不要過分依賴分布式鎖, 在數據一致性要求較高的情況下, 數據庫層面也要進行一定的處理, 例如唯一鍵約束、事務等來確保數據的正確)
(二): 使用RedisOperations執行Lua腳本進行解鎖操作.
可參閱redis官方文檔
/**
* @author piaoruiqing
* @param key lock key
* @param timeout timeout
* @param retries number of retries
* @param waitingTime retry interval
* @return
* @throws InterruptedException
*/
public DistributedLock acquire(String key, long timeout, int retries, long waitingTime) throws InterruptedException {
final String value
= RandomStringUtils.randomAlphanumeric(4) + System.currentTimeMillis(); // (一)
do {
Boolean result
= stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); // (二)
if (result) {
return new RedisDistributedLock(stringRedisTemplate, key, value);
}
if (retries > NumberUtils.INTEGER_ZERO) {
TimeUnit.MILLISECONDS.sleep(waitingTime);
}
if(Thread.currentThread().isInterrupted()){
break;
}
} while (retries-- > NumberUtils.INTEGER_ZERO);
return null;
}
(一): 鎖值要保證唯一, 使用4位隨機字符串+時間戳基本可滿足需求 注: UUID.randomUUID()在高并發情況下性能不佳.
(二): 嘗試加鎖, 代碼中是2.1版本的做法, 早起版本參考上一節的實現.
此代碼已經可以滿足自動解鎖和重試的需求了, 使用方法:
// 根據key加鎖, 超時時間10000ms, 重試2次, 重試間隔500ms
try(DistributedLock lock = redisLockService.acquire(key, 10000, 2, 500);){
// do something
}
但還可以再優雅一點, 將模板代碼封裝起來, 可支持Lambda表達式:
/**
* lock handler
* @author piaoruiqing
*
* @since JDK 1.8
*/
@FunctionalInterface // (一)
public interface LockHandler<T> {
/**
* the logic you want to execute
*
* @author piaoruiqing
*
* @return
* @throws Throwable
*/
T handle() throws Throwable; // (二)
}
(一): 定義函數式接口, 將業務邏輯放入Lambda表達式使代碼更加簡潔.
(二): 業務中的異常不建議在分布式鎖中處理, 直接拋出來更合理.
使用LockHandler完成加鎖的實現:
public T tryLock(String key, LockHandler handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable {
try (DistributedLock lock = this.acquire(key, timeout, retries, waitingTime);) {
if (lock != null) {
LOGGER.debug("get lock success, key: {}", key);
return handler.handle();
}
LOGGER.debug("get lock fail, key: {}", key);
return null;
}
}
此時可以通過比較優雅的方式使用分布式鎖來完成編碼:
@Test
public void testTryLock() throws Throwable {
final String key = "dlock:test-try-lock";
AnyObject anyObject = redisLockService.tryLock(key, () -> {
// do something
return new AnyObject();
}, 10000, true, 0, 0);
}
自定義異常: 前文中針對分布式鎖的封裝可滿足多數業務場景, 但是考慮這樣一種情況, 如果業務本身會返回NULL當前的實現方式可能會存在錯誤的處理, 因為獲取鎖失敗也會返回NULL. 避免返回NULL固然是一種解決方式, 但無法滿足所有的場景, 此時支持自定義異常或許是個不錯的選擇.
實現起來很容易, 在原代碼的基礎之上增加onFailure參數, 如果鎖為空直接拋出異常即可.
public T tryLock(String key, LockHandler handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<"); throws Throwable { // (一)
try (DistributedLock lock = this.getLock(key, timeout, retries, waitingTime);) {
if (lock != null) {
LOGGER.debug("get lock success, key: {}", key);
return handler.handle();
}
LOGGER.debug("get lock fail, key: {}", key);
if (null != onFailure) {
throw onFailure.newInstance(); // (二)
}
return null;
}
}
(一): Class<");限定onFailure必須是RuntimeException或其子類. 筆者認為使用RuntimeException在語義上更容易理解. 如有需要使用其他異常也未嘗不可(如獲取鎖失敗需要統一處理等情況).
(二): 反射
優化三 (優雅地使用注解)結合APO優雅地使用注解完成分布式鎖:
為了減小篇幅折疊部分注釋
/**
* distributed lock
* @author piaoruiqing
* @date: 2019/01/12 23:15
*
* @since JDK 1.8
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
/** timeout of the lock */
long timeout() default 5L;
/** time unit */
TimeUnit unit() default TimeUnit.MILLISECONDS;
/** number of retries */
int retries() default 0;
/** interval of each retry */
long waitingTime() default 0L;
/** key prefix */
String prefix() default "";
/** parameters that construct a key */
String[] argNames() default {};
/** construct a key with parameters */
boolean argsAssociated() default true;
/** whether unlock when completed */
boolean autoUnlock() default true;
/** throw an runtime exception while fail to get lock */
Class<");default NoException.class;
/** no exception */
public static final class NoException extends RuntimeException {
private static final long serialVersionUID = -7821936618527445658L;
}
}
timeout: 超時時間
unit: 時間單位
retries: 重試次數
waitingTime: 重試間隔時間
prefix: key前綴, 默認為包名+類名+方法名
argNames: 組成key的參數
注解可使用在方法上, 需要注意的是, 本文注解通過spring AOP實現, 故對象內部方法間調用將無效.
/**
* distributed lock aspect
* @author piaoruiqing
* @date: 2019/02/02 22:35
*
* @since JDK 1.8
*/
@Aspect
@Order(10) // (一)
public class DistributedLockableAspect implements KeyGenerator { // (二)
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
@Resource
private RedisLockClient redisLockClient;
/**
* {@link DistributedLockable}
* @author piaoruiqing
*/
@Pointcut(value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)")
public void distributedLockable() {}
/**
* @author piaoruiqing
*
* @param joinPoint
* @param lockable
* @return
* @throws Throwable
*/
@Around(value = "distributedLockable() && @annotation(lockable)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable {
long start = System.nanoTime();
final String key = this.generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString();
Object result = redisLockClient.tryLock(
key, () -> {
return joinPoint.proceed();
},
lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(),
lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()),
lockable.onFailure()
);
long end = System.nanoTime();
LOGGER.debug("distributed lockable cost: {} ns", end - start);
return result;
}
}
(一): 切面優先級
(二): KeyGenerator為自定義的key生成策略, 使用 prefix+argName+arg作為key, 具體實現見源碼.
此時可以通過注解的方式使用分布式鎖, 這種方式對代碼入侵較小, 且簡潔.
@DistributedLockable(
argNames = {"anyObject.id", "anyObject.name", "param1"},
timeout = 20, unit = TimeUnit.SECONDS,
onFailure = RuntimeException.class
)
public Long distributedLockableOnFaiFailure(AnyObject anyObject, String param1, Object param2, Long timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
LOGGER.info("distributed-lockable: " + System.nanoTime());
} catch (InterruptedException e) {
}
return System.nanoTime();
}
擴展
分布式鎖的實現有多種方式, 可根據實際場景和需求選擇不同的介質進行實現:
Redis: 性能高, 對分布式鎖支持友好, 實現簡單, 多數場景下表現較好.
Zookeeper: 可靠性較高, 對分布式鎖支持友好, 實現較復雜但有現成的實現可以使用.
數據庫: 實現簡單, 可使用樂觀鎖/悲觀鎖實現, 性能一般, 高并發場景下不推薦
結語本文闡述了Redis分布式鎖的JAVA實現, 完成了自動解鎖、自定義異常、重試、注解鎖等功能, 源碼見地址.
本實現還有諸多可以優化之處, 如:
重入鎖的實現
優化重試策略為訂閱Redis事件: 訂閱Redis事件可以進一步優化鎖的性能, 可通過wait+notifyAll來替代文中的sleep.
篇幅有限, 后續再行闡述.
參考文獻redis.io/topics/dist…
martin.kleppmann.com/2016/02/08/…
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/7228.html
摘要:集群實現分布式鎖上面的討論中我們有一個非常重要的假設是單點的。但是其實這已經超出了實現分布式鎖的范圍,單純用沒有命令來實現生成。這個問題用實現分布式鎖暫時無解。結論并不能實現嚴格意義上的分布式鎖。 關于Redis實現分布式鎖的問題,網絡上很多,但是很多人的討論基本就是把原來博主的貼過來,甚至很多面試官也是一知半解經不起推敲就來面候選人,最近結合我自己的學習和資料查閱,整理一下用Redi...
摘要:分布式鎖的作用在單機環境下,有個秒殺商品的活動,在短時間內,服務器壓力和流量會陡然上升。分布式集群業務業務場景下,每臺服務器是獨立存在的。這里就用到了分布式鎖這里簡單介紹一下,以的事務機制來延生。 Redis 分布式鎖的作用 在單機環境下,有個秒殺商品的活動,在短時間內,服務器壓力和流量會陡然上升。這個就會存在并發的問題。想要解決并發需要解決以下問題 1、提高系統吞吐率也就是qps 每...
閱讀 1206·2021-11-24 09:39
閱讀 2129·2021-11-22 13:54
閱讀 2111·2021-09-08 10:45
閱讀 1443·2021-08-09 13:43
閱讀 2985·2019-08-30 15:52
閱讀 3083·2019-08-29 15:38
閱讀 2848·2019-08-26 13:44
閱讀 3055·2019-08-26 13:30