前言:
而今我们对“centos下的m4”大概比较着重,各位老铁们都想要学习一些“centos下的m4”的相关文章。那么小编同时在网络上汇集了一些对于“centos下的m4””的相关知识,希望兄弟们能喜欢,朋友们一起来了解一下吧!它是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。
§Kafka与其他主流分布式消息系统对比
对比
§基本概念
消费者:(Consumer):从消息队列中请求消息的客户端应用程序生产者:(Producer) :向broker发布消息的应用程序AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
kafka支持的客户端语言:Kafka客户端支持当前大部分主流语言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
§Kafka架构
架构
kafka集群中的消息,是通过Topic(主题)来进行组织的。
§Topic
topic
一些基本的概念:
1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。2、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。
§集群工作图
工作图
副本(Replication):为了保证分布式可靠性,kafka0.8开始对每个分区的数据进行备份(不同的Broker上),防止其中一个Broker宕机造成分区上的数据不可用。
无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
§Kafka安装配置
主机IP角色服务端口centos-vm2192.168.134.112slavekafaka,zookeeper9092,8080,2181,2888,3888centos-vm3192.168.134.113masterkafaka,zookeeper9092,8080,2181,2888,3888centos-vm4192.168.134.114slavekafaka,zookeeper9092,8080,2181,2888,3888
端口解析:
9092: kafka服务端口8080: kafka管理端口2181: zookeeper服务端口2888: zookeeper集群通讯端口3888:zookeeper集群选举端口
软件版本:
kafka: 2.11.2-2.1.0zookeeper: 3.4.12
§1. Zookeeper安装
见zookeeper安装文档
§2. kafka安装
§2.1 下载安装kafka
下载地址
解压安装
tar -zxvf kafka_2.11-2.1.0.tgz -C /opt
ln -s /opt/kafka_2.11-2.1.0 /opt/kafka
§2.2 配置kafka
配置文件/opt/kafka/config/server.properties
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/logs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.134.112:2181,192.168.134.113:2181,192.168.134.114:2181 #设置zookeeper的连接端口
zookeeper.connection.timeout.ms=6000 #zookeeper连接超时时间
主要修改以下几个参数
broker.id=0
port=9092
log.dirs=/opt/kafka/logs/
zookeeper.connect=centos-vm2:2181,centos-vm3:2181,centos-vm4:2181
§2.3 启动kafka
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
§2.4 systemd支持
/usr/lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=
Requires=network.target
After=network.target zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/java/jdk1.8.0_102
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
[Install]
WantedBy=multi-user.target
启动kafka
systemctl daemon-reload
ststemctl enable kafaka
ststemctl start kafaka
§2.5 kafka集群验证
创建Topic
./kafka-topics.sh --create --zookeeper centos-vm2:2181 --replication-factor 2 --partitions 1 --topic test
解释:
–replication-factor 2 #复制两份–partitions 1 #创建1个分区–topic #主题为test
在一台服务器上创建一个发布者
./kafka-console-producer.sh --broker-list centos-vm4:9092 --topic test #控制台等待输入信息
在另一台服务器上创建一个订阅者
./kafka-console-consumer.sh --bootstrap-server centos-vm2:9092 --topic test --from-beginning #控制台等待接收发布者的信息
若成功接受到消息,说明集群搭建成功
§Kafka-manager安装配置
kafka manager是管理kafka集群的web界面工具
下载地址
§1. 解压安装
unzip kafka-manager-master.zip
cd kafka-manager-master
./sbt clean dist # 需要配置sbt国内源,且整个执行过程比较耗费内存资源
生成的zip包位置在kafka-manager-master/target/universal目录下,此处假设生成的包名为kafka-manager-1.3.2.1.zip
解压安装
unzip kafka-manager-1.3.2.1.zip
§2. 配置
修改kafka-manager-1.3.2.1/conf/application.conf
kafka-manager.zkhosts="centos-vm2:2181,centos-vm3:2181,centos-vm4:2181"
basicAuthentication.enabled=true
basicAuthentication.username="admin"
basicAuthentication.password="admin"
添加systemd支持文件:/usr/lib/systemd/system/kafka-manager.service
[Unit]
Description=Kafka manager server
Documentation=
Requires=network.target
After=network.target kafka.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/java/jdk1.8.0_102
ExecStart=/opt/kafka-manager-1.3.2.1/bin/kafka-manager -Dconfig.file=/opt/kafka-manager-1.3.2.1/conf/application.conf -Dhttp.port=8090
ExecStop=/opt/kafka-manager-1.3.2.1/bin/kafka-manager stop
[Install]
WantedBy=multi-user.target
§3. 启动
systemctl daemon-reload
systemctl enable kafka-manager
systemctl start kafka-manager
访问:
标签: #centos下的m4