Redis分布式锁进阶源码分析

2023-12-26 19:42:33


根据秒杀场景演示

1、如何写一个商品秒杀代码?

@Autowired
StringRedisTemplate redisTemplate;

public String stock() {
    String key = "stock_01";
    int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
    if (stockNum > 0) {
        redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
    }else {
        return "fail";
    }
    return "success";
}

上面的写法会造成并发问题,多个客户端同时请求此方法,查询到的库存一致,同时扣减,导致超卖。

2、加上Java锁

public synchronized String stock() {
	String key = "stock_01";
	int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
	if (stockNum > 0) {
		redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
	}else {
		return "fail";
	}
	return "success";
}

加上Java锁,会避免此问题,但是,如果是分布式项目,一个节点会部署到多个容器或者在多个Tomcat中运行,Java锁无法解决这种问题

3、使用redis setnx命令获取锁

每次执行扣减库存前,先用setnx命令插入一个标志,标记此线程方法获取到锁,获取成功方能扣减,不成功就返回。执行完扣减后删除标志。

注意:命令setnx key value,将 key 的值设为value,当且仅当key不存在;若给定的key已经存在,则不做任何动作。设置成功,返回1;设置失败,返回0。

public String stock() {
	String key = "stock_01";
	Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock");	//setnx
	if(!ifAbsent){
		return "fail";
	}
	int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
	if (stockNum > 0) {
		redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
	} else {
		return "fail";
	}
	redisTemplate.delete(key);	//执行完扣减后删除key
	return "success";
}

上面的代码如果执行完setnx命令后,程序异常报错,锁得不到释放,其他线程无法扣减库存,这时候就有人说了,可以加上try和finally,在finally中删除key这样就可以解决。

4、增加try和finally

 public String stock() {
	String key = "stock_01";
	Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock");
	try {
		if(!ifAbsent){
			return "fail";
		}
		int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
		if (stockNum > 0) {
			redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
		} else {
			return "fail";
		}
	}finally {
		redisTemplate.delete(key);	//执行完扣减后删除key
	}
	return "success";
}

如果执行到try中的代码服务器刚好宕机,没有执行finally中的删除key,还是不会释放锁,如何解决?

5、给锁设置过期时间

public String stock() {
	String key = "stock_01";
	Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, "stock",10, TimeUnit.SECONDS);//执行setnx,并给key设置过期时间10秒
	try {
		if(!ifAbsent){
			return "fail";
		}
		int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
		if (stockNum > 0) {
			redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
		} else {
			return "fail";
		}
	}finally {
		redisTemplate.delete(key);  //执行完扣减后删除key
	}
	return "success";
}

上面代码还是会有问题,如果扣减代码执行时间大于我们设置的过期时间,redis已经删除了key,其他线程可以获取到锁,并正常执行,但是第一次获取到锁的线程扣减完库存之后,执行了删除key的操作,导致下一个线程丢失锁。可以给这个setnx命令的value设置一个唯一值来区分哪个线程获取到锁

6、增长过期时间,并setnx增加唯一value

public String stock() {
	String key = "stock_01";
	String id = UUID.randomUUID().toString();//增加唯一id,
	Boolean ifAbsent = redisTemplate.opsForValue().setIfAbsent(key, id, 30, TimeUnit.SECONDS);//把id存入到value中
	try {
		if (!ifAbsent) {
			return "fail";
		}
		int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
		if (stockNum > 0) {
			redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
		} else {
			return "fail";
		}
	} finally {
		if (id.equals(redisTemplate.opsForValue().get(key))) {//对比id是否一致,一致才可删除锁,避免所误删
			redisTemplate.delete(key);  //执行完扣减后删除key
		}
	}
	return "success";
}

这时候已经能解决大部分秒杀场景了,虽然已经考虑的足够多的情况了,但是很不幸,上面代码还是会出现问题
a、增长过期时间其实治标不治本,出问题的概率会变小,但是不代表不会出问题,代码执行时间还是会超过过期时间,导致锁丢失
b、执行到finally中的对比id已经执行,而删除key没有执行,过期时间到了,此时第二个线程获取到锁,但是第一个线程又执行了删除,极端情况还是会出现误删锁导致超卖

面临这两个问题如何解决:
a、动态修改时间,即锁续命:开启一个线程执行一个定时任务,去判断执行任务的线程有没有结束,如果没结束就增加过期时间“续命”
b、判断有没有key和删除key的操作要有原子性:Java中没有提供这种操作,但是Lua脚本可以实现

7、使用redisson

a、引入pom:

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>2.7.0</version>
</dependency>

b、增加配置类:

@Configuration
@Slf4j
public class RedissonManager {    
	//集群环境使用-节点信息    
	@Value("${spring.redis.cluster.nodes:default}")
	private String clusterNodes;    
	
	//公共-密码    
	@Value("${spring.redis.password:default}")
	private String password;

	//单机环境使用
	@Value("${spring.redis.host:default}")
	private String host;

	//单机环境使用
	@Value("${spring.redis.port:6379}")
	private String port;

	//单机环境使用
	@Value("${spring.redis.database:0}")
	private int database;
	
	@Bean
	@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "cluster")
	public RedissonClient redissonClient() {
		// 集群环境使用
		Config config = new Config();
		config.useClusterServers()
				.addNodeAddress(clusterNodes.split(","))
				.setPassword(password);
		return Redisson.create(config);
	}

	@Bean
	@ConditionalOnProperty(name = "spring.redis.mode", havingValue = "singleton", matchIfMissing = true)
	public RedissonClient redissonSingletonClient() {
		// 单机打包使用
		Config config = new Config();
		config.useSingleServer().setAddress(host + ":" + port).setPassword(password).setDatabase(database);
		return Redisson.create(config);
	}
}

c、代码如下

@Autowired
StringRedisTemplate redisTemplate;
@Autowired
Redisson redisson;

public String stock() {
	String key = "stock_01";
	RLock lock = redisson.getLock(key);
	lock.lock();
	try {
		int stockNum = Integer.parseInt(redisTemplate.opsForValue().get(key));
		if (stockNum > 0) {
			redisTemplate.opsForValue().set(key, (stockNum - 1) + "");
		} else {
			return "fail";
		}
	} finally {
		lock.unlock();
		return "fail";
	}
	return "success";
}

8、源码分析

a、RedissonLock.tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	//锁续命的执行周期,默认30秒,this.internalLockLeaseTime = java.util.concurrent.TimeUnit.SECONDS.toMillis(30L);
	this.internalLockLeaseTime = unit.toMillis(leaseTime);
	//执行Lua脚本
	return this.commandExecutor.evalWriteAsync(
			this.getName(), LongCodec.INSTANCE, command,
			//redis.call,用于执行Redis脚本。这个命令会将脚本中的Redis命令调用转化为Lua数据类型,并执行这个脚本。
			//edis.call('exists', key),用于检查指定的键是否存在,如果键存在,则返回1;键不存在,则返回0。
			"if (redis.call('exists', KEYS[1]) == 0) then " +//判断key不存在
				// 保存到Hash(哈希表) 中
				// hset:指定要执行的Redis命令为hset,hset key field value:将哈希表key中的域field的值设为value
				// KEYS[1]:哈希表的键名,为this.getName()也就是代码中传过来的key
				// ARGV[2]:指定要设置的字段名,为this.getLockName(threadId),也就是value为当前线程id
				// 1:指定要将字段设置为的值
				"redis.call('hset', KEYS[1], ARGV[2], 1); " +
				// 设置过期时间
				// ARGV[1]为this.internalLockLeaseTime,默认30秒
				"redis.call('pexpire', KEYS[1], ARGV[1]); " +
				"return nil; " +
			"end; " +
			"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) " +//如果key存在,锁重入
				// hincrby:指定要执行的Redis命令为hincrby,hincrby key field increment:为哈希表key中的域field的值加上增量increment
				"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
				// 重置过期时间为30秒
				"redis.call('pexpire', KEYS[1], ARGV[1]); " +
				"return nil; " +
			"end; " +
			//以毫秒为单位返回key的剩余时间
			"return redis.call('pttl', KEYS[1]);",
			Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

b、RedissonLock.tryAcquireAsync

//此方法异步地尝试获取锁,它不会阻塞锁的线程
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
	if (leaseTime != -1L) {
		//没有获取到锁,返回失败
		return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
	} else {
		//获取到锁
		RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(30L, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
		//注册一个回调方法,这个方法在异步方法执行完成执行
		ttlRemainingFuture.addListener(new FutureListener<Long>() {
			public void operationComplete(Future<Long> future) throws Exception {
				//判断监听方法是否执行完成
				if (future.isSuccess()) {
					//执行完成获取结果
					Long ttlRemaining = (Long)future.getNow();
					if (ttlRemaining == null) {
						//scheduleExpirationRenewal会每隔10秒给锁刷新过期时间,默认置为30秒,直到这个锁获取不到
						RedissonLock.this.scheduleExpirationRenewal(threadId);
					}
				}
			}
		});
		return ttlRemainingFuture;
	}
}

9、Redisson分布式锁的源码分析结果

  • 锁标识:Redisson使用Hash数据结构来表示锁。在这个Hash中,key为锁的名字,field为当前竞争锁成功的线程的唯一标识,value为重入次数。
  • 队列:所有竞争锁失败的线程,会被放入一个队列中,等待锁的释放。这些线程会订阅当前锁的解锁事件,一旦锁被释放,就会唤醒队列中的一个线程来尝试获取锁。这个机制是通过Semaphore来实现的线程的挂起和唤醒。
  • 加锁:加锁的核心源码在tryLockInnerAsync方法中。这个方法首先会将锁的租约时间转换为毫秒,然后执行一个Lua脚本尝试获取锁。如果获取锁成功,就会设置一个定时任务来续期锁的租约时间,避免锁因为超时而被自动释放。如果获取锁失败,就会将当前线程放入等待队列中,等待锁的释放。
  • 解锁:解锁的核心源码在unlockInnerAsync方法中。这个方法会执行一个Lua脚本来释放锁。如果释放锁成功,就会唤醒等待队列中的一个线程来尝试获取锁。

Redisson分布式锁的实现原理主要基于Redis的单线程特性和Lua脚本的原子性。通过使用Lua脚本,可以保证加锁和解锁的操作是原子的,不会被其他操作打断。同时,通过定时任务来续期锁的租约时间,可以避免因为网络延迟等原因导致锁被提前释放。
总的来说,Redisson分布式锁的实现提供了一种高效、可靠的分布式锁解决方案,可以很好地满足分布式系统中的并发控制需求。

文章来源:https://blog.csdn.net/projectNo/article/details/135228401
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。