前言:
目前小伙伴们对“kafka spring集成”大体比较注重,我们都需要了解一些“kafka spring集成”的相关资讯。那么小编也在网上汇集了一些对于“kafka spring集成””的相关内容,希望大家能喜欢,看官们快快来了解一下吧!Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 Spring Boot 集成 Kafka Streams 进行流式计算。
Spring Boot 集成 Kafka 的基本配置和用法在“Spring Boot 集成 Kafka”有介绍,这里不再详述。
依赖
使用 Kafka Streams 流处理,在集成 Spring Kafka 的基础下,还需要引入:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId></dependency>配置在 application.yml 配置
spring: kafka: streams: application-id: test-kafka-stream # 默认取springboot应用名 bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置# auto-startup: true properties: default: key: serde: org.apache.kafka.common.serialization.Serdes$StringSerde # 序列化key value: serde: org.springframework.kafka.support.serializer.JsonSerde # 序列化value timestamp: extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor spring: json: trusted: packages: com.engrz.lab.* # 允许json反序列化的包
流处理相关配置:spring.kafka.streams.*
更多配置参考:Spring Boot Integration Properties
在 Java 代码中配置(与 application.yml 配置二选一)
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration kStreamsConfigs() { Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); return new KafkaStreamsConfiguration(props);}
值使用 JsonSerde 序列化,需要配置信任包,否则 Spring 会报出:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
使用创建流
使用 @EnableKafkaStreams 注解装配
@Configuration@EnableKafkaStreamspublic class KafkaStreamsConfiguration { @Bean public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) { KStream<String, Object> stream = streamsBuilder.stream("streamTopic"); stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>())); return stream; }}
可以指定多个topic,把接收的内容存到myTopic中
流计算
本章只讲 Spring Boot 集成,关于 Kafka Streams 流计算会放在 Kafka 专题介绍。以下给出一个应用场景示例:
定义一个订单model类
/** * @author Engr-Z * @since 2021/1/29 */@Datapublic class OrderModel implements Serializable { /** * 用户id */ private Integer userId; /** * 订单号 */ private String orderNo; /** * 订单时间 */ private LocalDateTime orderTime; /** * 订单金额 */ private BigDecimal orderAmt; /** * 订单状态 */ private String orderStatus;}找出交易小于1元的订单,发送到 orderTopic
@Beanpublic KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) { KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic"); stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>())); stream.filter((k, v) -> { BigDecimal orderAmt = v.getOrderAmt(); return orderAmt.compareTo(new BigDecimal(1)) < 0; }).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));}
通过实时计算,我们可以解决很多业务问题。如:实时数仓,实时风控等。
除非注明,否则均为"攻城狮·正"原创文章,转载请注明出处。
本文链接:
标签: #kafka spring集成