龙空技术网

SpringBoot 整合 kafka 实现组订阅模式

爱音乐的程序员小新人 457

前言:

而今你们对“kafka spring集成”大体比较关切,你们都想要知道一些“kafka spring集成”的相关资讯。那么小编同时在网上网罗了一些对于“kafka spring集成””的相关内容,希望同学们能喜欢,兄弟们一起来学习一下吧!

SpringBoot 整合 kafka 实现组订阅模式:

工程结构图

消息生产者pom.xml配置<?xml version="1.0" encoding="UTF-8"?><project xmlns="" xmlns:xsi="" xsi:schemaLocation=" "> <parent> <artifactId>scms-parent</artifactId> <groupId>yj.scms.parent</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>yj.scms.kafka</groupId> <artifactId>scms-kafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--  --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> <!-- fastjson jar的神奇功能 对象转json json转 对象 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>消息消费者配置:<?xml version="1.0" encoding="UTF-8"?><project xmlns="" xmlns:xsi="" xsi:schemaLocation=" "> <parent> <artifactId>scms-parent</artifactId> <groupId>yj.scms.parent</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>yj.scms.kafka</groupId> <artifactId>scms-kafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--  --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.2</version> </dependency> <!-- fastjson jar的神奇功能 对象转json json转 对象 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
生产者application.properties配置文件#设置端口号server.port=9003#返回页面、数据中文乱码问题spring.http.encoding.force=truespring.http.encoding.charset=UTF-8spring.http.encoding.enabled=trueserver.tomcat.uri-encoding = UTF-8#解决程序读配置文件乱码问题spring.message.encoding = UTF-8#客户端注册进eureka服务列表内#eureka.client.service-url.defaultZone=微服务名称设置在注册中心显示的名称spring.application.name=scms-kafka-send################ 显示ip端口号方式1 ###################################访问路径可以显示IP地址eureka.instance.preferIpAddress=true#微服务名称设置在注册中心显示的名称eureka.instance.instance-id=${spring.cloud.client.ipAddress}:${server.port}################ 显示ip端口号方式2 ###################################访问路径可以显示IP地址#eureka.instance.prefer-ip-address=true#微服务名称设置在注册中心显示的名称#eureka.instance.instance-id=${spring.cloud.client.ip-address}:${server.port}#在eureka微服务info显示内容详细信息info.app.name=user-serviceinfo.company.name=配置文件################################################============== kafka ===================# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider =======================spring.kafka.producer.retries=0# 每次批量发送消息的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer =======================# 指定默认消费者group id#spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer###########################kafak配置结束#######################################################################消费者application.properties配置文件#设置端口号server.port=9002#返回页面、数据中文乱码问题spring.http.encoding.force=truespring.http.encoding.charset=UTF-8spring.http.encoding.enabled=trueserver.tomcat.uri-encoding = UTF-8#解决程序读配置文件乱码问题spring.message.encoding = UTF-8#客户端注册进eureka服务列表内#eureka.client.service-url.defaultZone=微服务名称设置在注册中心显示的名称spring.application.name=scms-kafka################ 显示ip端口号方式1 ###################################访问路径可以显示IP地址eureka.instance.preferIpAddress=true#微服务名称设置在注册中心显示的名称eureka.instance.instance-id=${spring.cloud.client.ipAddress}:${server.port}################ 显示ip端口号方式2 ###################################访问路径可以显示IP地址#eureka.instance.prefer-ip-address=true#微服务名称设置在注册中心显示的名称#eureka.instance.instance-id=${spring.cloud.client.ip-address}:${server.port}#在eureka微服务info显示内容详细信息info.app.name=user-serviceinfo.company.name=配置文件################################################============== kafka ===================# 指定kafka 代理地址,可以多个spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider =======================spring.kafka.producer.retries=0# 每次批量发送消息的数量spring.kafka.producer.batch-size=16384spring.kafka.producer.buffer-memory=33554432# 指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#=============== consumer =======================# 指定默认消费者group id#spring.kafka.consumer.group-id=test-consumer-groupspring.kafka.consumer.auto-offset-reset=earliestspring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer###########################kafak配置结束#######################################################################
生产者send 的java代码package yj.scms.kafka.sender;import com.google.gson.Gson;import com.google.gson.GsonBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;import yj.scms.kafka.pojo.Message;import java.util.Date;import java.util.UUID;/** * kafka生产者 */@Componentpublic class KafkaSender { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //发送消息方法 public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); logger.info("+++++++++++++++++++++ message = {}", gson.toJson(message)); kafkaTemplate.send("hhh", gson.toJson(message)); }}消费者组1 java代码package yj.scms.kafka.receiver;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Optional;/** * kafka消费者 */@Componentpublic class KafkaReceiver { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = {"hhh"},groupId = "xiaofeng1") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.info("-xiaofeng1-- test1:" + record+",record:" + record+",message:" + message+",---"); } }}
消费者组2java代码package yj.scms.kafka.receiver;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;import java.util.Optional;/** * kafka消费者 */@Componentpublic class KafkaReceiverTwo { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = {"hhh"},groupId = "xiaofeng2") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.info("-xiaofeng2-- test2:" + record+",record:" + record+",message:" + message+",---"); } }}

一个生产者发送消息

两个小消费者同时消费

标签: #kafka spring集成