龙空技术网

Java开发大型互联网深入理解Zookeeper安装部署之分布式队列实现

图灵学院诸葛老师 51

前言:

此时我们对“分布式队列如何实现”大约比较注意,各位老铁们都想要剖析一些“分布式队列如何实现”的相关资讯。那么小编在网上搜集了一些关于“分布式队列如何实现””的相关文章,希望同学们能喜欢,大家一起来学习一下吧!

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

ZooKeeper包含一个简单的原语集,提供Java和C的接口。

ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

Zookeeper部署

ZooKeeper的部署方式主要有三种,单机模式、伪集群模式、集群模式。其实剩下的两种模式都是集群模式的特殊情况。

系统环境

系统环境

5.2 下载Zookeeper

mkdir /home/taomk/zk 创建文件夹

cd /home/taomk/zk; wget 下载

tar xvf zookeeper-3.4.6.tar.gz 解压缩

mv zookeeper-3.4.6 zookeeper346 重命名

cd zookeeper346; ls -l

image.png

配置环境变量

export ZOOKEEPER_HOME=/home/taomk/zk/zookeeper346

export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf

ZooKeeper的单机模式部署

ZooKeeper的单机模式通常是用来快速测试客户端应用程序的,在实际过程中不可能是单机模式。单机模式的配置也比较简单。

编写配置文件zoo.cfg:ZooKeeper的运行默认是读取zoo.cfg文件里面的内容的。

~mkdir /home/taomk/zk/zoo/zk0

~cp conf/zoo_sample.cfg conf/zk0.cfg

~vim conf/zk0.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/home/conan/zoo/zk0

clientPort=2181

在zk0.cfg这个文件中,我们需要指定 dataDir 的值,它指向了一个目录,这个目录在开始的时候需要为空。下面是每个参数的含义:

tickTime :基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。

dataDir :存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。

clientPort :这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。

使用单机模式时用户需要注意:这种配置方式下没有 ZooKeeper 副本,所以如果 ZooKeeper 服务器出现故障, ZooKeeper 服务将会停止。

启动Zookeeper

~bin/zkServer.sh start zk0.cfg

image.png

zk的服务显示为QuorumPeerMain:

~jps

5321 QuorumPeerMain

5338 Jps

查看运行状态:

~bin/zkServer.sh status zk0.cfg

JMX enabled by default

Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk0.cfg

Mode: standalone

单节点的时,Mode会显示为standalone。

停止ZooKeeper服务

~ bin/zkServer.sh stop zk0.cfg

JMX enabled by default

Using config: /home/taomk/zk/zookeeper346/bin/../conf/zk0.cfg

Stopping zookeeper ... STOPPED

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

实现步骤基本如下:

前提:需要一个队列root节点dir

入队:使用create()创建节点,将共享数据data放在该节点上,节点类型为PERSISTENT_SEQUENTIAL,永久顺序性的(也可以设置为临时的,看需求)。

出队:因为队列可能为空,2种方式处理:一种如果为空则wait等待,一种返回异常。

等待方式:这里使用了CountDownLatch的等待和Watcher的通知机制,使用了TreeMap的排序获取节点顺序最小的数据(FIFO)。

抛出异常:getChildren()获取队列数据时,如果size==0则抛出异常。

一个分布式Queue的实现,详细代码:

View Code

Apache Curator

Curator是一个封装Zookeeper操作的库,使用这个库的好处是Curator帮你管理和Zookeeper的连接,当连接有问题时会自动重试(retry)。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)

CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);

client.start();

Curator已经封装了一些常用的Recipes

Distributed Lock

InterProcessMutex lock = new InterProcessMutex(client, lockPath);

if ( lock.acquire(maxWait, waitUnit) )

{

try

{

// do some work inside of the critical section here

}

finally

{

lock.release();

}

}

Leader Election

LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()

{

public void takeLeadership(CuratorFramework client) throws Exception

{

// this callback will get called when you are the leader

// do whatever leader work you need to and only exit

// this method when you want to relinquish leadership

}

}

LeaderSelector selector = new LeaderSelector(client, path, listener);

selector.autoRequeue(); // not required, but this is behavior that you will probably expect

selector.start();

总结

以 上就是我对Java开发大型互联网深入理解Zookeeper安装部署之分布式队列实现问题及其优化总结,分享给大家,觉得收获的话可以点个关注收藏转发一波喔,谢谢大佬们支持!

最后,每一位读到这里的网友,感谢你们能耐心地看完。希望在成为一名更优秀的Java程序员的道路上,我们可以一起学习、一起进步!都能赢取白富美,走向架构师的人生巅峰!

想了解学习Java方面的技术内容以及Java技术视频的内容可加群:722040762 验证码:头条(06 必过)欢迎大家的加入哟!

标签: #分布式队列如何实现