分布式锁3: zk实现分布式锁4 使用临时顺序节点+watch监听+可重入(threadLocal)
2024-01-07 19:24:47
一? zk实现分布式锁的可重入性
1.1 使用ThreadLocal属性
引入ThreadLocal线程局部变量保证zk分布式锁的可重入性。
1.2 关键代码说明
1.3?代码
1.3.1 初始化客户端
1.3.2 分布式锁代码
package com.atguigu.distributed.lock.config;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @ClassName: ZkDistriubtedTHLock
* @Description: TODO
* @Author: admin
* @Date: 2024/01/06?16:07:04?
* @Version: V1.0
**/
public class ZkDistriubtedTHLock implements Lock {
private static final String ROOT_PATH = "/d-zk";
private String path;
private ZooKeeper zooKeeper;
private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
public ZkDistriubtedTHLock(ZooKeeper zooKeeper,String lockName) {
try {
this.zooKeeper = zooKeeper;
if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){
this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void lock() {
Integer flag = THREAD_LOCAL.get();
if (flag != null && flag > 0) {
THREAD_LOCAL.set(flag + 1);
return;
}
else{
try {
String preNode = getPreNode(path);
// 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
if (StringUtils.isEmpty(preNode)){
THREAD_LOCAL.set(1);
return ;
} else {
CountDownLatch countDownLatch = new CountDownLatch(1);
if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("当前节点=="+path+" 前一个节点:"+ROOT_PATH + "/" + preNode);
countDownLatch.countDown();
}
}) == null) {
System.out.println("监听。。。");
THREAD_LOCAL.set(1);
return;
}
// 阻塞。。。。
countDownLatch.await();
System.out.println("wait。。。");
THREAD_LOCAL.set(1);
return;
}
} catch (Exception e) {
e.printStackTrace();
// 重新检查。是否获取到锁
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
lock();
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
/**
* 获取指定节点的前节点
* @param path
* @return
*/
private String getPreNode(String path){
try {
// 获取当前节点的序列化号
Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
// 获取根路径下的所有序列化子节点
List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);
// 判空
if (CollectionUtils.isEmpty(nodes)){
return null;
}
// 获取前一个节点
Long flag = 0L;
String preNode = null;
for (String node : nodes) {
// 获取每个节点的序列化号
Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
if (serial < curSerial && serial > flag){
flag = serial;
preNode = node;
}
}
return preNode;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
@Override
public void unlock() {
try {
THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
if (THREAD_LOCAL.get() == 0) {
this.zooKeeper.delete(path, 0);
THREAD_LOCAL.remove();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
}
1.3.3 service
1.3.4 controller
1.4 测试
1.4.1 nginx多节点代理
1.服务
2.nginx
1.4.2?jemeter测试
3.jemeter
4.数据库前后
5.服务节点日志
?
文章来源:https://blog.csdn.net/u011066470/article/details/135429014
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!