龙空技术网

Spring boot与kafka集成

一往无前雪碧wr 155

前言:

现在姐妹们对“kafka集成spring boot”都比较看重,咱们都想要知道一些“kafka集成spring boot”的相关知识。那么小编在网摘上搜集了一些有关“kafka集成spring boot””的相关资讯,希望姐妹们能喜欢,各位老铁们一起来了解一下吧!

前言

本次内容关于Spring boot与kafka集成的实战,kafka的基础用法。

环境准备

首先使用IDEA生成spring boot的骨架程序,按向导生成,如下图所示:

修改pom,添加kafa包

 <dependency>            <groupId>org.springframework.kafka</groupId>            <artifactId>spring-kafka</artifactId>  </dependency>

修改配置文件(application.yml)

server:  port: 8080  servlet:    context-path: /demospring:  kafka:    bootstrap-servers: localhost:9092    producer:      retries: 0      batch-size: 16384      buffer-memory: 33554432      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      group-id: spring-boot-demo      # 手动提交      enable-auto-commit: false      auto-offset-reset: latest      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      properties:        session.timeout.ms: 60000    listener:      log-container-config: false      concurrency: 5      # 手动提交      ack-mode: manual_immediate

添加配置类:

/** * <p> * kafka配置类 */@Configuration@EnableConfigurationProperties({KafkaProperties.class})@EnableKafka@AllArgsConstructorpublic class KafkaConfig {    private final KafkaProperties kafkaProperties;   //     @Bean    public KafkaTemplate<String, String> kafkaTemplate() {        return new KafkaTemplate<>(producerFactory());    }    @Bean    public ProducerFactory<String, String> producerFactory() {        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());    }    @Bean    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);        factory.setBatchListener(true);        factory.getContainerProperties().setPollTimeout(3000);        return factory;    }    @Bean    public ConsumerFactory<String, String> consumerFactory() {        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());    }    @Bean("ackContainerFactory")    public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);        factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);        return factory;    }}

消息处理类

@Component@Slf4jpublic class MessageHandler {    @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")    public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {        try {            String message = (String) record.value();            log.info("收到消息: {}", message);        } catch (Exception e) {            log.error(e.getMessage(), e);        } finally {            // 手动提交 offset            acknowledgment.acknowledge();        }    }}

测试类

@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringBootDemoMqKafkaApplicationTests {    @Autowired    private KafkaTemplate<String, String> kafkaTemplate;    /**     * 测试发送消息     */    @Test    public void testSend() {        kafkaTemplate.send(KafkaConsts.TOPIC_TEST, "hello,kafka...");    }}

运行

标签: #kafka集成spring boot