前言:
现在姐妹们对“kafka集成spring boot”都比较看重,咱们都想要知道一些“kafka集成spring boot”的相关知识。那么小编在网摘上搜集了一些有关“kafka集成spring boot””的相关资讯,希望姐妹们能喜欢,各位老铁们一起来了解一下吧!前言
本次内容关于Spring boot与kafka集成的实战,kafka的基础用法。
环境准备
首先使用IDEA生成spring boot的骨架程序,按向导生成,如下图所示:
修改pom,添加kafa包
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
修改配置文件(application.yml)
server: port: 8080 servlet: context-path: /demospring: kafka: bootstrap-servers: localhost:9092 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: spring-boot-demo # 手动提交 enable-auto-commit: false auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 60000 listener: log-container-config: false concurrency: 5 # 手动提交 ack-mode: manual_immediate
添加配置类:
/** * <p> * kafka配置类 */@Configuration@EnableConfigurationProperties({KafkaProperties.class})@EnableKafka@AllArgsConstructorpublic class KafkaConfig { private final KafkaProperties kafkaProperties; // @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM); factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); } @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM); return factory; }}
消息处理类
@Component@Slf4jpublic class MessageHandler { @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory") public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) { try { String message = (String) record.value(); log.info("收到消息: {}", message); } catch (Exception e) { log.error(e.getMessage(), e); } finally { // 手动提交 offset acknowledgment.acknowledge(); } }}
测试类
@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringBootDemoMqKafkaApplicationTests { @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 测试发送消息 */ @Test public void testSend() { kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka..."); }}
运行
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #kafka集成spring boot