Redis分布式锁进阶源码分析
Redis分布式锁进阶源码分析
根据秒杀场景演示
1、如何写一个商品秒杀代码?
@Autowired
StringRedisTemplate redisTemplate;
public String stock() {
String key = "stock_01";
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
}else {
return "fail";
}
return "success";
}
上面的写法会造成并发问题,多个客户端同时请求此方法,查询到的库存一致,同时扣减,导致超卖。
2、加上Java锁
public synchronized String stock() {
String key = "stock_01";
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
}else {
return "fail";
}
return "success";
}
加上Java锁,会避免此问题,但是,如果是分布式项目,一个节点会部署到多个容器或者在多个Tomcat中运行,Java锁无法解决这种问题
3、使用redis setnx命令获取锁
每次执行扣减库存前,先用setnx命令插入一个标志,标记此线程方法获取到锁,获取成功方能扣减,不成功就返回。执行完扣减后删除标志。
注意:命令setnx key value,将 key 的值设为value,当且仅当key不存在;若给定的key已经存在,则不做任何动作。设置成功,返回1;设置失败,返回0。
public String stock() {
String key = "stock_01";
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock"); //setnx
if(!ifAbsent){
return "fail";
}
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
} else {
return "fail";
}
redisTemplate.delete(key); //执行完扣减后删除key
return "success";
}
上面的代码如果执行完setnx命令后,程序异常报错,锁得不到释放,其他线程无法扣减库存,这时候就有人说了,可以加上try和finally,在finally中删除key这样就可以解决。
4、增加try和finally
public String stock() {
String key = "stock_01";
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock");
try {
if(!ifAbsent){
return "fail";
}
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
} else {
return "fail";
}
}finally {
redisTemplate.delete(key); //执行完扣减后删除key
}
return "success";
}
如果执行到try中的代码服务器刚好宕机,没有执行finally中的删除key,还是不会释放锁,如何解决?
5、给锁设置过期时间
public String stock() {
String key = "stock_01";
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock",10, TimeUnit.SECONDS);//执行setnx,并给key设置过期时间10秒
try {
if(!ifAbsent){
return "fail";
}
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
} else {
return "fail";
}
}finally {
redisTemplate.delete(key); //执行完扣减后删除key
}
return "success";
}
上面代码还是会有问题,如果扣减代码执行时间大于我们设置的过期时间,redis已经删除了key,其他线程可以获取到锁,并正常执行,但是第一次获取到锁的线程扣减完库存之后,执行了删除key的操作,导致下一个线程丢失锁。可以给这个setnx命令的value设置一个唯一值来区分哪个线程获取到锁
6、增长过期时间,并setnx增加唯一value
public String stock() {
String key = "stock_01";
String id = UUID.randomUUID().toString();//增加唯一id,
Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, id, 30, TimeUnit.SECONDS);//把id存入到value中
try {
if (!ifAbsent) {
return "fail";
}
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
} else {
return "fail";
}
} finally {
if (id.equals(redisTemplate.opsForValue().get(key))) {//对比id是否一致,一致才可删除锁,避免所误删
redisTemplate.delete(key); //执行完扣减后删除key
}
}
return "success";
}
这时候已经能解决大部分秒杀场景了,虽然已经考虑的足够多的情况了,但是很不幸,上面代码还是会出现问题
a、增长过期时间其实治标不治本,出问题的概率会变小,但是不代表不会出问题,代码执行时间还是会超过过期时间,导致锁丢失
b、执行到finally中的对比id已经执行,而删除key没有执行,过期时间到了,此时第二个线程获取到锁,但是第一个线程又执行了删除,极端情况还是会出现误删锁导致超卖
面临这两个问题如何解决:
a、动态修改时间,即锁续命:开启一个线程执行一个定时任务,去判断执行任务的线程有没有结束,如果没结束就增加过期时间“续命”
b、判断有没有key和删除key的操作要有原子性:Java中没有提供这种操作,但是Lua脚本可以实现
7、使用redisson
a、引入pom:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.7.0</version>
</dependency>
b、增加配置类:
@Configuration
@Slf4j
public class RedissonManager {
//集群环境使用-节点信息
@Value("${spring.redis.cluster.nodes:default}")
private String clusterNodes;
//公共-密码
@Value("${spring.redis.password:default}")
private String password;
//单机环境使用
@Value("${spring.redis.host:default}")
private String host;
//单机环境使用
@Value("${spring.redis.port:6379}")
private String port;
//单机环境使用
@Value("${spring.redis.database:0}")
private int database;
@Bean
@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "cluster")
public RedissonClient redissonClient() {
// 集群环境使用
Config config = new Config();
config.useClusterServers()
.addNodeAddress(clusterNodes.split(","))
.setPassword(password);
return Redisson.create(config);
}
@Bean
@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "singleton", matchIfMissing = true)
public RedissonClient redissonSingletonClient() {
// 单机打包使用
Config config = new Config();
config.useSingleServer().setAddress(host + ":" + port).setPassword(password).setDatabase(database);
return Redisson.create(config);
}
}
c、代码如下
@Autowired
StringRedisTemplate redisTemplate;
@Autowired
Redisson redisson;
public String stock() {
String key = "stock_01";
RLock lock = redisson.getLock(key);
lock.lock();
try {
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
if (stockNum > 0) {
redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
} else {
return "fail";
}
} finally {
lock.unlock();
return "fail";
}
return "success";
}
8、源码分析
a、RedissonLock.tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//锁续命的执行周期,默认30秒,this.internalLockLeaseTime = java.util.concurrent.TimeUnit.SECONDS.toMillis(30L);
this.internalLockLeaseTime = unit.toMillis(leaseTime);
//执行Lua脚本
return this.commandExecutor.evalWriteAsync(
this.getName(), LongCodec.INSTANCE, command,
//redis.call,用于执行Redis脚本。这个命令会将脚本中的Redis命令调用转化为Lua数据类型,并执行这个脚本。
//edis.call('exists', key),用于检查指定的键是否存在,如果键存在,则返回1;键不存在,则返回0。
"if (redis.call('exists', KEYS[1]) == 0) then " +//判断key不存在
// 保存到Hash(哈希表) 中
// hset:指定要执行的Redis命令为hset,hset key field value:将哈希表key中的域field的值设为value
// KEYS[1]:哈希表的键名,为this.getName()也就是代码中传过来的key
// ARGV[2]:指定要设置的字段名,为this.getLockName(threadId),也就是value为当前线程id
// 1:指定要将字段设置为的值
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置过期时间
// ARGV[1]为this.internalLockLeaseTime,默认30秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) " +//如果key存在,锁重入
// hincrby:指定要执行的Redis命令为hincrby,hincrby key field increment:为哈希表key中的域field的值加上增量increment
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 重置过期时间为30秒
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//以毫秒为单位返回key的剩余时间
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
b、RedissonLock.tryAcquireAsync
//此方法异步地尝试获取锁,它不会阻塞锁的线程
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1L) {
//没有获取到锁,返回失败
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//获取到锁
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
//注册一个回调方法,这个方法在异步方法执行完成执行
ttlRemainingFuture.addListener(new FutureListener<Long>() {
public void operationComplete(Future<Long> future) throws Exception {
//判断监听方法是否执行完成
if (future.isSuccess()) {
//执行完成获取结果
Long ttlRemaining = (Long)future.getNow();
if (ttlRemaining == null) {
//scheduleExpirationRenewal会每隔10秒给锁刷新过期时间,默认置为30秒,直到这个锁获取不到
RedissonLock.this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
}
9、Redisson分布式锁的源码分析结果
- 锁标识:Redisson使用Hash数据结构来表示锁。在这个Hash中,key为锁的名字,field为当前竞争锁成功的线程的唯一标识,value为重入次数。
- 队列:所有竞争锁失败的线程,会被放入一个队列中,等待锁的释放。这些线程会订阅当前锁的解锁事件,一旦锁被释放,就会唤醒队列中的一个线程来尝试获取锁。这个机制是通过Semaphore来实现的线程的挂起和唤醒。
- 加锁:加锁的核心源码在tryLockInnerAsync方法中。这个方法首先会将锁的租约时间转换为毫秒,然后执行一个Lua脚本尝试获取锁。如果获取锁成功,就会设置一个定时任务来续期锁的租约时间,避免锁因为超时而被自动释放。如果获取锁失败,就会将当前线程放入等待队列中,等待锁的释放。
- 解锁:解锁的核心源码在unlockInnerAsync方法中。这个方法会执行一个Lua脚本来释放锁。如果释放锁成功,就会唤醒等待队列中的一个线程来尝试获取锁。
Redisson分布式锁的实现原理主要基于Redis的单线程特性和Lua脚本的原子性。通过使用Lua脚本,可以保证加锁和解锁的操作是原子的,不会被其他操作打断。同时,通过定时任务来续期锁的租约时间,可以避免因为网络延迟等原因导致锁被提前释放。
总的来说,Redisson分布式锁的实现提供了一种高效、可靠的分布式锁解决方案,可以很好地满足分布式系统中的并发控制需求。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!