龙空技术网

Spring Boot 集成 Kafka Stream

攻城狮漆正 97

前言:

目前小伙伴们对“kafka spring集成”大体比较注重,我们都需要了解一些“kafka spring集成”的相关资讯。那么小编也在网上汇集了一些对于“kafka spring集成””的相关内容,希望大家能喜欢,看官们快快来了解一下吧!

Kafka 从0.10版本开始支持流处理,我们可以使用 Kafka Streams 来开发实时应用程序。本章介绍 Spring Boot 集成 Kafka Streams 进行流式计算。

Spring Boot 集成 Kafka 的基本配置和用法在“Spring Boot 集成 Kafka”有介绍,这里不再详述。

依赖

使用 Kafka Streams 流处理,在集成 Spring Kafka 的基础下,还需要引入:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-streams</artifactId></dependency>
配置在 application.yml 配置
spring:  kafka:    streams:      application-id: test-kafka-stream # 默认取springboot应用名      bootstrap-servers: ${KAFKA_HOST:localhost}:${KAFKA_PORT:9092} # 会覆盖 spring.kafka.bootstrap-servers 配置#      auto-startup: true      properties:        default:          key:            serde: org.apache.kafka.common.serialization.Serdes$StringSerde # 序列化key          value:            serde: org.springframework.kafka.support.serializer.JsonSerde # 序列化value          timestamp:            extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor        spring:          json:            trusted:              packages: com.engrz.lab.* # 允许json反序列化的包

流处理相关配置:spring.kafka.streams.*

更多配置参考:Spring Boot Integration Properties

在 Java 代码中配置(与 application.yml 配置二选一)

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration kStreamsConfigs() {    Map<String, Object> props = new HashMap<>();    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());    return new KafkaStreamsConfiguration(props);}

值使用 JsonSerde 序列化,需要配置信任包,否则 Spring 会报出:If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).

使用创建流

使用 @EnableKafkaStreams 注解装配

@Configuration@EnableKafkaStreamspublic class KafkaStreamsConfiguration {        @Bean    public KStream<String, Object> kStream(StreamsBuilder streamsBuilder) {        KStream<String, Object> stream = streamsBuilder.stream("streamTopic");        stream.map((k, v) -> new KeyValue<>(k, v)).to("myTopic", Produced.with(Serdes.String(), new JsonSerde<>()));        return stream;    }}

可以指定多个topic,把接收的内容存到myTopic中

流计算

本章只讲 Spring Boot 集成,关于 Kafka Streams 流计算会放在 Kafka 专题介绍。以下给出一个应用场景示例:

定义一个订单model类

/** * @author Engr-Z * @since 2021/1/29 */@Datapublic class OrderModel implements Serializable {    /**     * 用户id     */    private Integer userId;    /**     * 订单号     */    private String orderNo;    /**     * 订单时间     */    private LocalDateTime orderTime;    /**     * 订单金额     */    private BigDecimal orderAmt;    /**     * 订单状态     */    private String orderStatus;}
找出交易小于1元的订单,发送到 orderTopic
@Beanpublic KStream<String, OrderModel> kStream(StreamsBuilder streamsBuilder) {    KStream<String, OrderModel> stream = streamsBuilder.stream("streamTopic");    stream.map((k, v) -> new KeyValue<>(k, v)).to("tableTopic", Produced.with(Serdes.String(), new JsonSerde<>()));    stream.filter((k, v) -> {        BigDecimal orderAmt = v.getOrderAmt();        return orderAmt.compareTo(new BigDecimal(1)) < 0;    }).to("orderTopic", Produced.with(Serdes.String(), new JsonSerde<>()));}

通过实时计算,我们可以解决很多业务问题。如:实时数仓,实时风控等。

除非注明,否则均为"攻城狮·正"原创文章,转载请注明出处。

本文链接:

标签: #kafka spring集成