龙空技术网

Kafka集群搭建

木纳哥 519

前言:

而今我们对“centos下的m4”大概比较着重,各位老铁们都想要学习一些“centos下的m4”的相关文章。那么小编同时在网络上汇集了一些对于“centos下的m4””的相关知识,希望兄弟们能喜欢,朋友们一起来了解一下吧!

它是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。

§Kafka与其他主流分布式消息系统对比

对比

§基本概念

消费者:(Consumer):从消息队列中请求消息的客户端应用程序生产者:(Producer) :向broker发布消息的应用程序AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例

kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript

§Kafka架构

架构

kafka集群中的消息,是通过Topic(主题)来进行组织的。

§Topic

topic

一些基本的概念:

1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。2、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

§集群工作图

工作图

副本(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。

无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

§Kafka安装配置

主机IP角色服务端口centos-vm2192.168.134.112slavekafaka,zookeeper9092,8080,2181,2888,3888centos-vm3192.168.134.113masterkafaka,zookeeper9092,8080,2181,2888,3888centos-vm4192.168.134.114slavekafaka,zookeeper9092,8080,2181,2888,3888

端口解析:

9092: kafka服务端口8080: kafka管理端口2181: zookeeper服务端口2888: zookeeper集群通讯端口3888:zookeeper集群选举端口

软件版本:

kafka: 2.11.2-2.1.0zookeeper: 3.4.12

§1. Zookeeper安装

见zookeeper安装文档

§2. kafka安装

§2.1 下载安装kafka

下载地址

解压安装

tar -zxvf kafka_2.11-2.1.0.tgz -C /opt

ln -s /opt/kafka_2.11-2.1.0 /opt/kafka

§2.2 配置kafka

配置文件/opt/kafka/config/server.properties

broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样

port=9092 #当前kafka对外提供服务的端口默认是9092

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

log.dirs=/opt/kafka/logs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

num.partitions=1 #默认的分区数,一个topic默认1个分区数

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务

replica.fetch.max.bytes=5242880 #取消息的最大字节数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除

log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能

zookeeper.connect=192.168.134.112:2181,192.168.134.113:2181,192.168.134.114:2181 #设置zookeeper的连接端口

zookeeper.connection.timeout.ms=6000 #zookeeper连接超时时间

主要修改以下几个参数

broker.id=0

port=9092

log.dirs=/opt/kafka/logs/

zookeeper.connect=centos-vm2:2181,centos-vm3:2181,centos-vm4:2181

§2.3 启动kafka

/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties

§2.4 systemd支持

/usr/lib/systemd/system/kafka.service

[Unit]

Description=Apache Kafka server (broker)

Documentation=

Requires=network.target

After=network.target zookeeper.service

[Service]

Type=simple

User=kafka

Group=kafka

Environment=JAVA_HOME=/usr/java/jdk1.8.0_102

ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

ExecStop=/opt/kafka/bin/kafka-server-stop.sh

[Install]

WantedBy=multi-user.target

启动kafka

systemctl daemon-reload

ststemctl enable kafaka

ststemctl start kafaka

§2.5 kafka集群验证

创建Topic

./kafka-topics.sh --create --zookeeper centos-vm2:2181 --replication-factor 2 --partitions 1 --topic test

解释:

–replication-factor 2 #复制两份–partitions 1 #创建1个分区–topic #主题为test

在一台服务器上创建一个发布者

./kafka-console-producer.sh --broker-list centos-vm4:9092 --topic test #控制台等待输入信息

在另一台服务器上创建一个订阅者

./kafka-console-consumer.sh --bootstrap-server centos-vm2:9092 --topic test --from-beginning #控制台等待接收发布者的信息

若成功接受到消息,说明集群搭建成功

§Kafka-manager安装配置

kafka manager是管理kafka集群的web界面工具

下载地址

§1. 解压安装

unzip kafka-manager-master.zip

cd kafka-manager-master

./sbt clean dist # 需要配置sbt国内源,且整个执行过程比较耗费内存资源

生成的zip包位置在kafka-manager-master/target/universal目录下,此处假设生成的包名为kafka-manager-1.3.2.1.zip

解压安装

unzip kafka-manager-1.3.2.1.zip

§2. 配置

修改kafka-manager-1.3.2.1/conf/application.conf

kafka-manager.zkhosts="centos-vm2:2181,centos-vm3:2181,centos-vm4:2181"

basicAuthentication.enabled=true

basicAuthentication.username="admin"

basicAuthentication.password="admin"

添加systemd支持文件:/usr/lib/systemd/system/kafka-manager.service

[Unit]

Description=Kafka manager server

Documentation=

Requires=network.target

After=network.target kafka.service

[Service]

Type=simple

User=kafka

Group=kafka

Environment=JAVA_HOME=/usr/java/jdk1.8.0_102

ExecStart=/opt/kafka-manager-1.3.2.1/bin/kafka-manager -Dconfig.file=/opt/kafka-manager-1.3.2.1/conf/application.conf -Dhttp.port=8090

ExecStop=/opt/kafka-manager-1.3.2.1/bin/kafka-manager stop

[Install]

WantedBy=multi-user.target

§3. 启动

systemctl daemon-reload

systemctl enable kafka-manager

systemctl start kafka-manager

访问:

标签: #centos下的m4