前言:
今天朋友们对“spring集成kafka消费者监听”大概比较重视,姐妹们都想要了解一些“spring集成kafka消费者监听”的相关知识。那么小编同时在网上收集了一些有关“spring集成kafka消费者监听””的相关内容,希望同学们能喜欢,姐妹们快快来学习一下吧!在使用时Kafka时,经常遇到大批量消息在队列中,如果一个消息一个消息的消费的话效率太低下了,这时候要用到批量消费消息。
批量监听器
从版本1.1开始,@KafkaListener可以被配置为批量接收从Kafka话题队列中的Message。要配置监听器容器工厂以创建批处理侦听器,需要设置batchListener属性为true,代码如下:
@BeanKafkaListenerContainerFactory<?> batchFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); // 开启批量监听 return factory;}@Beanpublic Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //设置每次接收Message的数量 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props;}
批量接收
在@KafkaListener注解中声明工厂为batchFactory().
@KafkaListener(topics = "teemo", id = "consumer", containerFactory = "batchFactory")public void listen(List<ConsumerRecord<?, ?>> list) { List<String> messages = new ArrayList<>(); for (ConsumerRecord<?, ?> record : list) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); // 获取消息 kafkaMessage.ifPresent(o -> messages.add(o.toString())); } if (messages.size() > 0) { // 更新索引 updateES(messages); }}
综合示例
package org.fiend.kafka.config;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;/** * @author langpf 2020/2/25 */@Componentpublic class KafkaReceiver { private static Logger log = LoggerFactory.getLogger(KafkaReceiver.class); /** * 单个消息接收 * @param record rd */ // @KafkaListener(id = "hades", autoStartup = "${listener.auto.startup}", topics = "oop, pui, que", concurrency = "2" ) // @KafkaListener(id = "hades", autoStartup = "false", topics = "oop, pui, que", concurrency = "2" ) // @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) }) @KafkaListener(topics = {Constants.TOPIC_NAME}) public void listen(ConsumerRecord<?, ?> record) { String value = (String) record.value(); String topic = record.topic(); Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } } @KafkaListener(topics = {Constants.TOPIC_NAME}) public void batchListen(List<ConsumerRecord<?, ?>> records) { for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } } } /** * 批量接收kafka消息, 接收partition为0的消息 * @param records re */ @KafkaListener(id = "id0", topicPartitions = {@TopicPartition(topic = Constants.TOPIC_NAME, partitions = {"0"})}) public void listenPartition0(List<ConsumerRecord<?, ?>> records) { log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id0 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p0 Received message={}", message); } } } /** * 批量接收kafka消息, 接收partition为1的消息 * @param records re */ @KafkaListener(id = "id1", topicPartitions = {@TopicPartition(topic = Constants.TOPIC_NAME, partitions = {"1"})}) public void listenPartition1(List<ConsumerRecord<?, ?>> records) { log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId()); log.info("Id1 records size " + records.size()); for (ConsumerRecord<?, ?> record : records) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); log.info("Received: " + record); if (kafkaMessage.isPresent()) { Object message = record.value(); String topic = record.topic(); log.info("p1 Received message={}", message); } } }}
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #spring集成kafka消费者监听