前言:
如今朋友们对“kafka中的groupid”大致比较珍视,你们都需要知道一些“kafka中的groupid”的相关知识。那么小编同时在网摘上网罗了一些有关“kafka中的groupid””的相关资讯,希望各位老铁们能喜欢,各位老铁们快快来学习一下吧!本文分享自华为云社区《手拉手入门springboot+kafka-云社区-华为云》,作者:QGS。
安装kafka
启动Kafka本地环境需Java 8+以上
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
Kafka启动方式有Zookeeper和Kraft,两种方式只能选择其中一种启动,不能同时使用。
Kafka下载
解压tar -xzf kafka_2.13-3.7.0.tgz
一、Zookeeper启动Kafka(kafka内置zookeeper)
Kafka依赖Zookeeper
1、启动Zookeeper 2、启动Kafka
使用kafka自带Zookeeper启动
./zookeeper-server-start.sh ../config/zookeeper.properties &
./zookeeper-server-stop.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server.properties
二、Zookeeper服务器启动Kafka
Zookeeper服务器安装
tar zxvf apache-zookeeper-3.9.2-bin.tar.gz
配置Zookeeper服务器
cp zoo_sample.cfg zoo.cfg
启动Zookeeper服务器
./zkServer.sh start
修改Zookeeper端口
Zoo.cfg添加内容
admin.serverPort=8099
apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper
Zookeeper服务器启动kafka
/opt/kafka_2.13-3.7.0/bin目录下
./kafka-server-start.sh ../config/server.properties &
Kafka配置文件server.properties
三、使用KRaft启动Kafka
UUID通用唯一识别码(Universally Unique Identifier)
1、生成Cluster UUID(集群UUID):./kafka-storage.sh random-uuid
2.格式化kafka日志目录:./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties
3.启动kafka:./kafka-server-start.sh ../config/kraft/server.properties &
springboot集成kafka
创建topic时,若不指定topic的分区(partition)数量使,则默认为1个分区(partition)
修改server.properties文件
vim server.properties
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.68.133:9092
springboot加入依赖kafka
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
加入spring-kafka依赖后,springboot自动装配好kafkaTemplate的Bean
application.yml配置连接kafka
spring:kafka:bootstrap-servers: 192.168.68.133:9092生产者
发送消息
@Resourceprivate KafkaTemplate<String,String> kafkaTemplate;@Testvoid kafkaSendTest(){kafkaTemplate.send("kafkamsg01","hello kafka");}消费者
接收消息
@Componentpublic class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")public void consume(String message){System.out.println("接收到消息:"+message);}}
若没有配置groupid
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.
@Componentpublic class KafkaConsumer {@KafkaListener(topics = {"kafkamsg01","test"},groupId = "123")public void consume(String message){System.out.println("接收到消息:"+message);}}
关注#华为云开发者联盟# 点击下方,第一时间了解华为云新鲜技术~
华为云博客_大数据博客_AI博客_云计算博客_开发者中心-华为云
标签: #kafka中的groupid #kafka集成spring