【缓存】一、Redis的基本使用与Redisson分布式锁
缓存
缓存技术是一种可以大幅度提高系统性能的技术,我们可以在某些适用的场景下使用缓存来大幅度的提高系统性能
读缓存的基本流程:
请求向缓存中查数据
if (命中) {
返回缓存中的数据
} else {
从数据库中取出数据
将该数据在缓存中再存储一份
返回缓存中的数据
}
本地缓存
我们在单体系统应用中,可以使用本地缓存来进行系统的缓存需求,我们可以在模块中自定义一个HashMap,将所需要的信息以键值对的方式存储进去,按照缓存的查找逻辑进行操作,也可以有很高的性能,甚至于说,这种方式减少了一层中间件的IO,甚至比使用Redis的效率还要高,但问题就在于,本地缓存非常不适合于在分布式系统中实现,因为分布式系统中有几个节点,就需要几个本地缓存,而我们也不确定我们会被负载均衡到哪个节点中。
Redis
Redis是一种缓存中间件,其解决了分布式系统难以使用本地缓存的问题:
<!-- 引入redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
在application.yml中进行配置:
spring:
redis:
host: xxx.xxx.xxx.xxx
必须要配置的是redis的地址,其默认端口是6379,若配置了密码,也必须对密码进行配置
SpringBoot帮助我们对redis的使用进行了十分简化的配置,其提供了两种实现方式:
- 返回Map<Object, Object>的RedisTemplate
- 返回Map<String, Object>的StringRedisTemplate
使用测试:
@Autowired
StringRedisTemplate stringRedisTemplate;
@Test
public void testStringRedisTemplate() {
ValueOperations<String, String> testValue = stringRedisTemplate.opsForValue();
testValue.set("test", "Hello World_" + UUID.randomUUID().toString());
String test = testValue.get("test");
System.out.println(test);
// 输出:Hello World_6ebc06c7-245f-4698-a5dd-c5e832ba7b7f
}
真实业务中的使用:
注意,我们使用String作为值进行存储的情况下,我们会把JSON类型的字符串作为Value,故我们在使用时要注意Json与对象的转换,这样做是为了方便我们在全平台进行操作。(序列化与反序列化)
// 真正的业务逻辑
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
// 试图获取一下缓存
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
// 如果缓存中不存在数据
if (StringUtils.isEmpty(catalogJSON)) {
// 从数据库中取出数据
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
// 序列化
catalogJSON = JSON.toJSONString(catalogJsonFromDb);
// 将数据放入缓存
redisTemplate.opsForValue().set("catalogJSON", catalogJSON);
return catalogJsonFromDb;
}
// 反序列化
// TypeReference是我们要转换的类型
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return stringListMap;
}
// 从数据库中读取数据的逻辑
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb(){...}
注意,我们在较老版本的letture中,可能会出现DriectOutOfMemory异常,这个异常是由于老版本的letture在与Netty交互时不能有效的释放内存,我们必须使用更高版本的letture或者使用jedis客户端来避免这个问题
注意,Redis默认的内存会使用Xmx的内存,我们也不能一味的只增大这个容量,因为只要内存不能有效释放,这个早晚会满
在使用Redis时,甚至可以得到1300+的TPS,而没使用时,只有7TPS
Redis三大问题
缓存穿透
一直查询一个在数据库中不存在的数据,导致每次查询都进入数据库,不经过缓存,使用大量的查询很有可能导致数据库崩溃
解决方案:将null存入缓存,并加入短暂的过期时间,致使不存在的数据也会被缓存拦截,但要注意这个null的时间必须短暂,不然会导致查不到存在数据的情况。
缓存雪崩
Redis中的数据在同一时间大量过时(失效),导致大量的数据进入数据库,致使数据库崩溃
解决方案:我们应该给缓存的过期时间再加一个1-5分钟的随机数,让他们不会在同一时间失效
缓存击穿
一个高频,热点Key失效后,被同时超高频率的访问,导致大量数据同时进入数据库,致使数据库崩溃
解决方案:加锁,让请求一个一个来
三大Redis问题的解决方案
使用synchronized加锁避免缓存击穿的发生
// 从数据库中读取数据的逻辑
// 使用sychronized进行加锁,避免缓存击穿
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDb() {
List<CategoryEntity> level1Categories = getLevel1Categories();
// 使用当前对象作为锁,意为只有拿到当前对象的线程才可以执行下面的代码
// 若缓存中已经存储了,则直接从缓存中取出(避免大量线程直接进来,没走上面Redis的情况)
synchronized (this) {
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (!StringUtils.isEmpty(catalogJSON)) {
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return stringListMap;
}
System.out.println("查询了数据库");
List<CategoryEntity> categoryEntities = baseMapper.selectList(null);
Map<String, List<Catelog2Vo>> parentCid = level1Categories.stream().collect(Collectors.toMap(
key -> key.getCatId().toString(),
value -> {
List<CategoryEntity> level2Categories = getParentCid(categoryEntities, value.getCatId());
List<Catelog2Vo> catelogVos = null;
if (level2Categories != null) {
catelogVos = level2Categories.stream().map(item -> {
List<CategoryEntity> level3Categories = getParentCid(categoryEntities, item.getCatId());
List<Catelog2Vo.Catelog3Vo> level3Vos = null;
if (level3Categories != null) {
level3Vos = level3Categories.stream().map(level3Item -> {
Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(item.getCatId().toString(), level3Item.getCatId().toString(), level3Item.getName());
return catelog3Vo;
}).collect(Collectors.toList());
}
Catelog2Vo catelog2Vo = new Catelog2Vo(value.getCatId().toString(), level3Vos, item.getCatId().toString(), item.getName());
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelogVos;
}));
return parentCid;
}
}
但注意,上面这种处理逻辑,在数据库查完之后就释放锁,但此时Redis中还没有存储,故下一个线程仍然会查数据库,造成锁失效
我们解决这种情况的方式,就是将存储Redis的操作也放在synchronized代码块中
类似于下面这样:
synchronized(this) {
....................
// 序列化
catalogJSON = JSON.toJSONString(parentCid);
// 将数据放入缓存
redisTemplate.opsForValue().set("catalogJSON", catalogJSON);
return parentCid;
}
此处还要注意,我们这里使用synchronized做的是本地锁,不适用于分布式系统(多个模块不共享锁(每个模块都有一个this对象)),若要在分布式系统下进行开发,还需要进一步使用分布式锁
分布式锁
分布式锁的基本原理:我们在redis中存储一对键值对,我们让所有的服务都向Redis中Set这个键值对,哪个服务Set成功了那个键值对,就代表这个服务获取到了锁
Redis的Set操作可以在后面携带参数:NX(意思为,如果该key不存在,我们会进行存储,如果存在,则不存储)
// 使用分布式锁
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
List<CategoryEntity> level1Categories = getLevel1Categories();
/**
* 令程序试图向Redis中添加数据,若添加成功,则继续进行后续的查数据库操作
* 若添加失败,则证明已经有其他线程拿到了锁,我们必须进行重试再继续试图拿到锁,然后向下进行(这个时候就有缓存了)
*/
Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", "111");
if (lock) {
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
// 业务执行结束后,释放锁
redisTemplate.delete("lock");
return catalogJsonFromDb;
} else {
// 若没有拿到锁,重试取锁
Map<String, List<Catelog2Vo>> catalogJsonFromDbWithRedisLock = getCatalogJsonFromDbWithRedisLock();
return catalogJsonFromDbWithRedisLock;
}
}
另外,若我们在delete之前发生了断电,系统崩溃等情况,就会导致死锁,
解决死锁的方式一般为:给我们的lock键设置一个过期时间(注意这个过期时间的设置不能分两行写,而应该在一行里以一个原子操作完成)
将加锁方法修改为:
Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", "111", 300, TimeUnit.SECONDS);
这样就实现了原子操作,令加锁不会被中断,也添加了过期时间
但是这样还是有问题,我们在加锁后结束了线程时,我们的锁会被其他线程删除,此时我们还有解决方案:
给我们添加锁的方法添加的Value添加为UUID,让我们每个线程都只能删除自己对应的锁
String uuidString = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfPresent("lock", uuidString, 300, TimeUnit.SECONDS);
if (lock) {
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
// 业务执行结束后,释放锁
if (uuidString.equals(redisTemplate.opsForValue().get("key"))) {
redisTemplate.delete("lock");
}
这样就可以做到尽可能的删除自己的锁了,但是,还有问题:
因为我们设置了数据过期时间,而数据很有可能在我们获取了key之后过期,这样你删除的就又是别人的Key了
根本原因就是,我们的判断和删锁操作必须是一个原子操作,不可以中断
我们可以使用Redis的lua脚本实现这个操作:
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
// 这里需要传入一个他的默认RedisScript,这个RedisScript的泛型就是这个脚本执行的返回值类型,这里是int型
// 这个对象的内容传入脚本和返回值类型的反射类型
// 第二个参数传我们Key的集合,这里使用Arrays.asList()进行转换,
// 第三个参数传入我们要对比的uuidString
// 这样我们就通过lua脚本把Redis的对比和删除操作设置为原子操作了
redisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class), Arrays.asList("lock"), uuidString);
使用自定义锁的完整操作举例如下:
// 真正的业务逻辑
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
// 试图获取一下缓存
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
// 如果缓存中不存在数据
// String catalogJSON = null;
if (StringUtils.isEmpty(catalogJSON)) {
System.out.println("缓存不命中,查询数据库..............");
// 从数据库中取出数据
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDbWithRedisLock();
return catalogJsonFromDb;
}
System.out.println("缓存命中,直接返回...........");
// 反序列化
// TypeReference是我们要转换的类型
Map<String, List<Catelog2Vo>> stringListMap = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return stringListMap;
}
// 使用分布式锁
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLock() {
List<CategoryEntity> level1Categories = getLevel1Categories();
/**
* 令程序试图向Redis中添加数据,若添加成功,则继续进行后续的查数据库操作
* 若添加失败,则证明已经有其他线程拿到了锁,我们必须进行重试再继续试图拿到锁,然后向下进行(这个时候就有缓存了)
*/
String uuidString = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuidString, 300, TimeUnit.SECONDS);
if (lock) {
System.out.println("获取分布式锁成功.......");
// 业务执行结束后,释放锁
// if (uuidString.equals(redisTemplate.opsForValue().get("key"))) {
// redisTemplate.delete("lock");
// }
Map<String, List<Catelog2Vo>> catalogJsonFromDb;
// 使用try-finally块来保证锁的释放
try {
catalogJsonFromDb = getCatalogJsonFromDb();
} finally {
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
// 这里需要传入一个他的默认RedisScript,这个RedisScript的泛型就是这个脚本执行的返回值类型,这里是int型
// 这个对象的内容传入脚本和返回值类型的反射类型
// 第二个参数传我们Key的集合,这里使用Arrays.asList()进行转换,
// 第三个参数传入我们要对比的uuidString
// 这样我们就通过lua脚本把Redis的对比和删除操作设置为原子操作了
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuidString);
}
return catalogJsonFromDb;
} else {
System.out.println("获取分布式锁失败.......等待重试");
// 若没有拿到锁,重试取锁
Map<String, List<Catelog2Vo>> catalogJsonFromDbWithRedisLock = getCatalogJsonFromDbWithRedisLock();
return catalogJsonFromDbWithRedisLock;
}
}
Redisson
我们在使用分布式锁的过程中,不可避免的要对各种各样的场景进行判断操作,而我们使用原生的Redis进行分布式锁的处理,其一有可能会发生我们某种情况没有处理好,系统上线后发生严重错误的情况,其二这种原生操作会对我们的开发效率有很大的限制,故,我们引入一个Redis中推荐的第三方技术来处理分布式锁相关的问题:
Redisson是一种操作Redis的客户端,和letture和jedis类似
引入:引入Redisson的依赖(注意Redisson和SpringBoot依赖有版本关系):
<!-- 引入redission-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.23.3</version>
</dependency>
我们可以使用配置文件或者对象文件(使用Config获取JSON或YAML文件)、Config进行配置(程序化配置)操作:
这里使用Config进行配置操作:
创建一个Config类进行配置:
@Configuration
public class MyRedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() throws IOException {
Config config = new Config();
// 集群模式的配置方式
// config.useClusterServers().addNodeAddress("127.0.0.1:7001", "127.0.0.1:7002");
// 单点模式的配置方式
config.useSingleServer().setAddress("redis://192.168.202.142:6379");
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
进行一点点测试:
@Autowired
RedissonClient redissonClient;
@Test
public void testRedisson() {
System.out.println(redissonClient);
}
可重入锁
Redisson锁最终都继承了JUC锁,在单体系统中,JUC可以完全替代Redisson,换句话说,Redisson就是分布式系统中的JUC
一个简单的示例:
@ResponseBody
@GetMapping("/hello")
public String hello() {
// 创建一个锁
RLock lock = redisson.getLock("my-lock");
// 加锁,阻塞式
lock.lock();
try {
System.out.println("加锁成功,执行业务........." + Thread.currentThread().getId());
Thread.sleep(30000);
}catch (Exception e){
} finally {
lock.unlock();
System.out.println("释放锁.........." + Thread.currentThread().getId());
}
return "hello";
}
注意在这里是不会出现死锁问题的,因为redisson给锁设置了过期时间,这个过期时间是30S,如果业务超长,Redisson的看门狗机制会给锁自动续期,
阻塞式加锁会执行一个while(true)死循环,在循环中想要出去就只能获取到锁,同时,Redisson也支持在加锁时自动添加一个过期时间:lock.lock(10, TimeUnit.SECONDS)
,这样在10秒之后,不管业务是否执行,都会让锁过期
如果我们没有指定过期时间,Redisson会使用看门狗的默认时间:30 * 1000ms
而我们看门狗机制的默认续期时间是 WatchDog / 3 也就是看门狗时间的三分之一,在20s的时候进行续期
另外一种方式是:
boolean res = lock.trylock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
} finally {
lock.unlock();
}
}
这种方式的作用是尝试进行加锁,如果加锁成功,返回true
公平锁
公平锁是指我们的线程按照进入等待的顺序获取锁,谁先来的,就让谁先获得锁
RLock fairLock = redisson.getFairLock("any-lock");
fairLock.lock();
读写锁
我们在进行读写操作的时候,一般是允许同时读,但不允许同时写和边写边读,故这里读写锁就诞生了:
@GetMapping("/write")
@ResponseBody
public String writeValue() {
String s = "";
// 创建读写锁
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
// 获取这个读写锁的写锁
RLock rLock = lock.writeLock();
rLock.lock();
try {
s = UUID.randomUUID().toString();
Thread.sleep(30000);
redisTemplate.opsForValue().set("writeValue", s);
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
@GetMapping("/read")
@ResponseBody
public String readValue() {
String s = "";
RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
RLock rLock = lock.readLock();
rLock.lock();
try {
s = redisTemplate.opsForValue().get("writeValue");
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return s;
}
最关键的问题就是,如果写锁存在,读锁就必须等待,这也就意味着,如果写操作没有完成,读操作就无法获取到数据,这也就保证了我们每次读取获得的都是最新的数据
- 写 + 写:阻塞式等待
- 写 + 读:读等待写
- 读 + 写:写也要等待读
- 读 + 读:共享锁(相当于无锁)
信号量
Redisson也可以获取信号量,这个操作会向Redis中插入一个数值,在这个数值耗尽之前,都是允许进入的。
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
// 声明一个信号量
RSemaphore park = redisson.getSemaphore("park");
// 加上信号量锁
park.acquire();
return "ok => ";
}
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release();
return "ok => ";
}
信号量最常用的操作就是分布式限流,通过设置信号量大小、以及tryAcquire()操作来进行限流,获取到是true再进行操作,获取不到就直接跳转一个提示页面或者流量过大的提示。
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
// 声明一个信号量
RSemaphore park = redisson.getSemaphore("park");
// 加上信号量锁
// .tryAcquire()方法会尝试获取锁,若成功会返回true,不成功直接返回false
boolean b = park.tryAcquire();
if (b) {
return "ok => " + b;
}
return "ok => " + b;
}
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redisson.getSemaphore("park");
park.release();
return "ok => ";
}
闭锁
闭锁机制是一种要求另一个程序调用一定次数之后,才允许另一个程序进行调用的锁,其实现类似于:一个班全部的学生走完之后才允许锁门。实现方式:
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.trySetCount(5);
// 要求闭锁全部完成之后再进行
door.await();
return "放假了......";
}
@GetMapping("/gogogo/{id}")
@ResponseBody
public String gogogo(@PathVariable("id") Long id) throws InterruptedException {
RCountDownLatch door = redisson.getCountDownLatch("door");
door.countDown();
// 要求闭锁全部完成之后再进行
return id + "班的人都走了";
}
总结
Redisson通过看门狗机制和LUA脚本保证了对Redis的操作是一个原子操作,以及我们系统中断时也能保证锁的释放,同时也能保证自己的锁不会去解别人的锁,这就避免了死锁问题的出现
锁的粒度问题:
一般来讲,我们以每一条数据作为一把锁,例如:product-11-lock
这样的每一条数据一把锁,这样可以保证我们一个业务只会影响自己的业务,不会影响到其他内容,否则就可能出现 A 请求被 B 阻塞的问题,就很抽象
实际业务改写就变成下面的样子:
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedissonLock() {
List<CategoryEntity> level1Categories = getLevel1Categories();
RLock lock = redisson.getLock("catalogJson-lock");
lock.lock();
Map<String, List<Catelog2Vo>> catalogJsonFromDb;
// 使用try-finally块来保证锁的释放
try {
catalogJsonFromDb = getCatalogJsonFromDb();
} finally {
lock.unlock();
}
return catalogJsonFromDb;
}
Redis数据一致性问题
我们为了数据的读取速度,使用Redis存储一些热点数据,但是问题也随之出现,若我们取出数据并存入Redis之后,数据库中进行了是数据的修改,我们的Redis和数据库的数据就不一致了,这会导致我们无法读取到正确的数据。
对于Redis的与数据库的一致性问题,我们引出两种解决方式:
- 双写模式:我们每次修改数据库都对Redis进行一次重新写入
- 失效模式:我们在修改数据库后对Redis中对应的键进行删除,等待下次读数据的时候再重新写入
但这两种模式都有一定的问题:
-
双写模式:
我们的线程如果在写缓存的过程中发生了切换,很有可能导致脏数据的出现
A — 写数据库 —(线程切换) B — 写数据库 — B写缓存 — A写缓存
我们本该在缓存中读取到B的数据,但在这种情况下却读取到了A的数据
但这种问题也只是暂时的不一致问题,从整体上来看,我们的数据还是一致的,因为Redis中的数据我们会给他设置一个过期时间,当数据过期之后,再被查询的时候,还是会查到最新的数据,也就是说,其也会最终显示正确的数据,也可以保证最终一致性,但其无法保证实时一致性(若对实时一致性有强烈要求,我们就需要使用加锁的方式将一个线程锁在一起使用)
-
失效模式:
也是发生了线程切换的场景:
A写数据库写了一半 — (线程切换)B读缓存 — B读数据库(尚未更新缓存) — (线程切换)— A继续写数据库 — A删缓存 — (线程切换)B更新缓存
这样就又出现了脏数据情况,不过这也是暂时不一致的情况,其也还是能保证最终一致性
对于这个问题,我们首先要考虑以下的场景:
- 我们要优先考虑业务场景,对于用户维度的情况,其是否可能会发生同时读写的情况(修改个人信息和读个人信息会同时发生吗)
- 我们的业务是否真的一定需要解决实时的一致性问题
- 另外我们对于基础数据,也可以使用canal订阅binlog的方式进行处理
- 实在不行我们就加一个读写锁(最受不了的解决方案)
简单介绍一下Cannal:Cannal是一种中间件,其会把自己伪装成MySql的一个从数据库,MySql在更新的时候,会把更新的信息传递给从数据库,Cannal就把存储在binlog中的更新日志进行更新,并进一步的再对Redis进行修改
-
工作情况,最近在干嘛,为什么减弱项目方向,加强技术方向(项目相关的太低效、唐杰哥那边无法提供高效的技能提升)
-
云视讯问题,很多会无法参加(表达很想参加各种会,但是都参加不了(包括GIS、软评、今天的智慧xxx都错过了))
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!