分布式锁3: zk实现分布式锁4 使用临时顺序节点+watch监听+可重入(threadLocal)

2024-01-07 19:24:47

一? zk实现分布式锁的可重入性

1.1 使用ThreadLocal属性

引入ThreadLocal线程局部变量保证zk分布式锁的可重入性。

1.2 关键代码说明

1.3?代码

1.3.1 初始化客户端

1.3.2 分布式锁代码

package com.atguigu.distributed.lock.config;

import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @ClassName: ZkDistriubtedTHLock
 * @Description: TODO
 * @Author: admin
 * @Date: 2024/01/06?16:07:04?
 * @Version: V1.0
 **/
public class ZkDistriubtedTHLock  implements Lock {
    private static final String ROOT_PATH = "/d-zk";
    private String path;
    private ZooKeeper zooKeeper;
    private static final ThreadLocal<Integer> THREAD_LOCAL = new ThreadLocal<>();
    public ZkDistriubtedTHLock(ZooKeeper zooKeeper,String lockName) {
        try {
            this.zooKeeper = zooKeeper;
            if (THREAD_LOCAL.get() == null || THREAD_LOCAL.get() == 0){
                this.path = zooKeeper.create(ROOT_PATH + "/" + lockName + "-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void lock() {
        Integer flag = THREAD_LOCAL.get();
        if (flag != null && flag > 0) {
            THREAD_LOCAL.set(flag + 1);
            return;
        }
        else{
            try {
                String preNode = getPreNode(path);
                // 如果该节点没有前一个节点,说明该节点时最小节点,放行执行业务逻辑
                if (StringUtils.isEmpty(preNode)){
                    THREAD_LOCAL.set(1);
                    return ;
                } else {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    if (this.zooKeeper.exists(ROOT_PATH + "/" + preNode, new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            System.out.println("当前节点=="+path+" 前一个节点:"+ROOT_PATH + "/" + preNode);
                            countDownLatch.countDown();
                        }
                    }) == null) {
                        System.out.println("监听。。。");
                        THREAD_LOCAL.set(1);
                        return;
                    }
                    // 阻塞。。。。
                    countDownLatch.await();
                    System.out.println("wait。。。");
                    THREAD_LOCAL.set(1);
                    return;
                }


            } catch (Exception e) {
                e.printStackTrace();
                // 重新检查。是否获取到锁
                try {
                    Thread.sleep(200);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
                lock();
            }
        }

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }
    /**
     * 获取指定节点的前节点
     * @param path
     * @return
     */
    private String getPreNode(String path){

        try {
            // 获取当前节点的序列化号
            Long curSerial = Long.valueOf(StringUtils.substringAfterLast(path, "-"));
            // 获取根路径下的所有序列化子节点
            List<String> nodes = this.zooKeeper.getChildren(ROOT_PATH, false);

            // 判空
            if (CollectionUtils.isEmpty(nodes)){
                return null;
            }

            // 获取前一个节点
            Long flag = 0L;
            String preNode = null;
            for (String node : nodes) {
                // 获取每个节点的序列化号
                Long serial = Long.valueOf(StringUtils.substringAfterLast(node, "-"));
                if (serial < curSerial && serial > flag){
                    flag = serial;
                    preNode = node;
                }
            }

            return preNode;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
    @Override
    public void unlock() {
        try {
            THREAD_LOCAL.set(THREAD_LOCAL.get() - 1);
            if (THREAD_LOCAL.get() == 0) {
                this.zooKeeper.delete(path, 0);
                THREAD_LOCAL.remove();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

1.3.3 service

1.3.4 controller

1.4 测试

1.4.1 nginx多节点代理

1.服务

2.nginx

1.4.2?jemeter测试

3.jemeter

4.数据库前后

5.服务节点日志

?

文章来源:https://blog.csdn.net/u011066470/article/details/135429014
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。