龙空技术网

一文了解Zookeeper如何实现分布式锁

马小聪 106

前言:

而今你们对“分布式锁zookeeper实现方式”大概比较关切,姐妹们都想要知道一些“分布式锁zookeeper实现方式”的相关文章。那么小编也在网摘上汇集了一些有关“分布式锁zookeeper实现方式””的相关内容,希望看官们能喜欢,我们快快来了解一下吧!

在Java中使用多线程编程,需要考虑多线程环境下程序执行结果的正确性,是否达到预期效果,因此需要在操作共享资源时引入锁,共享资源同一时刻只能由一个线程进行操作。 Java提供了多种本地线程锁。例如synchronized锁,JUC包下提供的可重入锁ReentrantLock、读写锁ReentrantReadWriteLock等; Java本地锁适用于单机环境。在分布式环境下,存在多台服务器同时操作同一共享资源的场景时,服务器之间无法感知到Java本地锁的加锁状态,因此需要通过分布式锁来保证集群环境下执行任务的正确性;

常见分布式锁介绍MySQL数据库中添加version字段实现乐观锁;Redis的set命令(存在单点问题,若redis集群中某台机器宕机,可能引发加解锁混乱);Redisson开源框架中实现的RedLock(解决了set方式实现引发的单点问题);通过Zookeeper官方API自主实现分布式锁;Curator开源框架实现的Zookeeper分布式锁InterProcessMutex等;

本文根据Zookeeper官方API实现分布式锁,带大家了解Zookeeper的强大之处,后续各种锁的实现及原理也会带大家一一了解;

Zookeeper实现方式

Zookeeper中数据节点znode分为四种类型,实现分布式锁主要利用临时顺序节点。其特性具体介绍可见【】。

实现思路

客户端中的线程需要加锁时,首先获取持久化锁节点路径下所有临时顺序节点,若当前线程创建的临时顺序节点为最小节点,则表示当前线程加锁成功; 若不是最小节点,则当前线程创建的节点监听比它小的最大节点,阻塞等待被监听节点的删除通知,待前置节点删除后,重新判断当前线程创建的节点是否为最小节点,若是,则加锁成功 若不是最小节点,则重复1、2步的操作,直到加锁成功;

示例及分析

如下图所示,三个客户端线程分别对锁名为“test4”加锁,创建对应的三个临时顺序节点:client0000000000、client0000000001、client0000000002;

首先client0000000000获取锁,client0000000001监听client0000000000,client0000000002监听client0000000001; client0000000000节点删除后,通知client0000000001尝试获取锁; client0000000001节点删除后,通知client0000000002尝试获取锁;

异常情况:若client0000000000持有锁时,client0000000001节点异常消失,那么client0000000002节点检测到client0000000000仍存在,则要监听client0000000000节点;

临时顺序节点创建情况

代码实现(可直接使用,拿走不谢)

import org.apache.commons.lang3.StringUtils;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.InitializingBean;import org.springframework.stereotype.Service; import java.util.ArrayList;import java.util.Iterator;import java.util.List;import java.util.TreeSet;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit; @Servicepublic class ZkLockDemo implements InitializingBean, Watcher {    private static Logger logger = LoggerFactory.getLogger(ZkLockDemo.class);    private static volatile ZooKeeper zk;    static String zkAddress = "127.0.0.1:2181";    /**     * 根节点     */    private String root = "/locksNode";     /**     * 存储当前线程创建的锁(临时顺序节点的全路径)     */    private ThreadLocal<List<String>> nodePathList = new ThreadLocal<>();     public ZkLockDemo() {    }     @Override    public void afterPropertiesSet() {        createRootNode();    }     /**     * 创建锁的持久化根节点     */    private void createRootNode() {        try {            if (StringUtils.isBlank(zkAddress)) {                throw new NullPointerException("zooKeeper address conf error");            }            CountDownLatch countDownLatch = new CountDownLatch(1);            //建立zk连接            logger.info("开始连接zk", root);            zk = new ZooKeeper(zkAddress, 10000, new Watcher() {                @Override                public void process(WatchedEvent event) {                    if (event.getState() == Event.KeeperState.SyncConnected) {                        countDownLatch.countDown();                    }                }            });            //等待锁连接成功            countDownLatch.await(10, TimeUnit.SECONDS);            if (zk == null) {                throw new NullPointerException("zooKeeper connect failure");            }            Stat stat = zk.exists(root, true);            if (stat == null) {                //创建持久化根节点                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);                logger.info("根节点{}创建完成", root);            } else {                logger.info("根节点{}已存在,直接使用", root);            }        } catch (Exception e) {            e.printStackTrace();        }    }     /**     * 监听zk是否需要重连接     * @param watchedEvent     */    @Override    public void process(WatchedEvent watchedEvent) {        try {            //zk的session过期时,重新创建连接            if (watchedEvent.getState() == Event.KeeperState.Expired) {                logger.info("zk连接过期,重新创建连接");                zk.close();                zk = null;                createRootNode();            }        } catch (InterruptedException e) {            e.printStackTrace();        }    }     /**     * 创建具体的锁节点     *     * @param lockPath     */    private void createLockNode(String lockPath) {        try {            //判断指定锁路径是否存在,若不存在则创建            Stat stat = zk.exists(lockPath, true);            if (stat == null) {                //创建持久化锁节点                zk.create(lockPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);                logger.info("锁路径创建成功:{}", lockPath);            } else {                logger.info("锁路径已经存在:{}", lockPath);            }        } catch (KeeperException.NodeExistsException e) {            logger.error("node节点已经存在,本次创建失败:{}", e.getMessage());        } catch (Exception e) {            e.printStackTrace();        }    }     /**     * 阻塞锁     * @param lockName 锁名     */    public void lock(String lockName) {        try {            //创建锁目录            String lockPath = root + "/" + lockName;            createLockNode(lockPath);             //当前线程创建的临时顺序节点            String clientLockNode = zk.create(lockPath + "/client", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);             //获取当前临时顺序节点的前一个节点,若获取的前置节点为null,则表示当前节点获取到锁            String preNode=getPreNode(lockPath,clientLockNode);            CountDownLatch latch = new CountDownLatch(1);            if(preNode!=null){                //注册监听                Stat lockStat = zk.exists(preNode, new LockWatcher(latch,lockPath,clientLockNode));                if (lockStat != null) {                    // 等待                    latch.await();                    latch = null;                    addLock(clientLockNode);                    logger.info("阻塞线程锁获取成功,锁路径为:{}", clientLockNode);                }            }        } catch (Exception e) {            throw new RuntimeException(e);        }    }     /**     * 获取当前线程创建的临时顺序节点的前一个节点     * @param lockPath 锁路径     * @param clientLockNode 当前线程创建的临时顺序节点     * @return 前一个临时顺序节点     */    private String getPreNode(String lockPath,String clientLockNode){        String preNode=null;        try {            // 取出lockPath下所有子节点            List<String> subNodes = zk.getChildren(lockPath, true);            TreeSet<String> sortedNodes = new TreeSet<>();            for (String node : subNodes) {                sortedNodes.add(lockPath + "/" + node);            }            //获取最小临时顺序节点            String minNode = sortedNodes.first();            // 如果当前节点是最小节点,则表示取得锁            if (clientLockNode.equals(minNode)) {                addLock(clientLockNode);                logger.info("锁获取成功,锁路径为:{}", clientLockNode);            }else{                //获取比当前节点小的最大节点进行监听                preNode = sortedNodes.lower(clientLockNode);                logger.info("阻塞等待获取锁,锁路径为:{},监听的前置节点为:{}", clientLockNode, preNode);            }        }catch (Exception e){         }        return preNode;    }     /**     * 监听临时顺序节点是否被删除     */    class LockWatcher implements Watcher {        private CountDownLatch latch = null;        private String lockPath = null;        private String clientLockNode = null;        public LockWatcher(CountDownLatch latch,String lockPath,String clientLockNode) {            this.latch = latch;            this.lockPath=lockPath;            this.clientLockNode=clientLockNode;        }        @Override        public void process(WatchedEvent event) {            if (event.getType() == Event.EventType.NodeDeleted) {                //若当前节点的前置节点被删除,需重新判断当前节点是否还存在前置节点                //正常情况下前置节点删除,则表示当前节点获取锁                //当前置节点没有获取锁,但是异常断连时,当前节点则需监听剩余的最大前置节点                String preNode=getPreNode(lockPath,clientLockNode);                if(preNode==null){                    latch.countDown();                }else{                    try {                        zk.exists(preNode, new LockWatcher(latch,lockPath,clientLockNode));                    }catch (Exception e){                     }                }            }        }    }     /**     * 尝试获取锁     * @param lockName     * @return     */    public boolean tryLock(String lockName) {        try {            //创建锁目录            String lockPath = root + "/" + lockName;            createLockNode(lockPath);            //当前线程创建的临时顺序节点            String clientLockNode = zk.create(lockPath + "/client", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);            String preNode = getPreNode(lockPath, clientLockNode);            // 如果当前节点是最小节点,则表示取得锁            addLock(clientLockNode);            if (preNode == null) {                logger.info("锁获取成功,锁路径为:{}", clientLockNode);                return true;            }else{                unlock(lockName);            }        } catch (Exception e) {            throw new RuntimeException(e);        }        return false;    }     /**     * 存储本次线程中添加的锁     * @param lockPath     */    private void addLock(String lockPath) {        List<String> list = nodePathList.get();        if (list == null) {            list = new ArrayList<>();        }        list.add(lockPath);        nodePathList.set(list);    }     /**     * 删除锁     */    public void unlock(String lockName) {        try {            String lockPathPrefix = root + "/" + lockName;            String lockPath = "";            List<String> list = nodePathList.get();            if (list != null && list.size() > 0) {                Iterator<String> iterator = list.iterator();                while (iterator.hasNext()) {                    String lockWholePath = iterator.next();                    if (lockWholePath.contains(lockPathPrefix)) {                        lockPath = lockWholePath;                        iterator.remove();                        break;                    }                }                if (StringUtils.isNotBlank(lockPath)) {                    Stat stat = zk.exists(lockPath, true);                    if (stat != null) {                        zk.delete(lockPath, -1);                        logger.info("锁释放成功,锁路径为:{}", lockPath);                    }                }            }        } catch (Exception e) {            e.printStackTrace();        }    }}
优点性能较好,可用性高,可以很方便的实现阻塞锁;客户端宕机等异常情况下,当前客户端持有的锁可实时释放;依据Zookeeper官方API自定义实现,有问题方便排查;缺点Zookeeper官方API抛出的各种异常需手动处理;Zookeeper连接管理,session失效管理需手动处理;Watch只生效一次,再使用时需重新注册;不适用场景:一个线程中先添加A锁再添加B锁,同时另一个线程先添加B锁再添加A锁,该种死锁问题无法解决;

结束

标签: #分布式锁zookeeper实现方式