龙空技术网

快速实用--消息队列Kafka单机搭建并整合SpringBoot Demo

好学阿宇 158

前言:

目前兄弟们对“kafka环境搭建及demo”大概比较讲究,同学们都需要剖析一些“kafka环境搭建及demo”的相关内容。那么小编在网摘上汇集了一些对于“kafka环境搭建及demo””的相关资讯,希望大家能喜欢,姐妹们快快来学习一下吧!

KafKa:削峰,异步,解耦

废话不多说,直接撸起袖子来一套!

Linux中 找个地方直接wget 下载

sudo wget 

解压

sudo tar zxf kafka_2.11-2.3.0.tgz

进入到bin目录

 cd kafka_2.11-2.3.0/bin

修改server.properties:

打开红框中2段注释第1段注释打开即可,无需修改第2段注释填写本机(服务器)IP即可保存

先启动kafka自带的zookeeper

sudo nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &

查看是否启动成功

sudo netstat -lnp |grep 2181

zookeeper启动成功

再启动kafka

 sudo nohup ./kafka-server-start.sh ../config/server.properties &

查看是否启动成功

sudo netstat -lnp |grep 2181

kafka启动成功

到这里呢,kafka就已经在我的ubuntu上部署完成了,接下来我们进行SpringBoot的整合。

整合SpringBoot 2

导入Pom依赖

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>

配置yml或properties 这里我演示yml

spring: kafka: bootstrap-servers: kafka所在服务器IP:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

无需配置类,无需配置类,无需配置类!

编写消费者:

package com.example.demo.queue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Servicepublic class KafkaConsumer { public static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics = {"001"}) public void listen(String message) { logger.info("接受到消息:{}", message); }}

编写生产者:

/** * @Author: ChangYu * @Date: 7/17/2019 2:22 PM * @Version 1.0 */@Api(tags = "测试接口")@RestController@RequestMapping(value = "/api/test")public class TestController { public static Logger logger = LoggerFactory.getLogger(TestController.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @ApiOperation(value = "消息发送") @RequestMapping(value = "/produceMessage", method = RequestMethod.POST) public GeneralResponseDto produceMessage(@RequestParam("message") String message) throws IOException { ListenableFuture futureCallback = kafkaTemplate.send("001", message); futureCallback.addCallback((new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { throw new ServiceException("发送失败:" + message); } @Override public void onSuccess(Object result) { logger.info("发送成功:{}", message); } })); return GeneralResponseDto.addSuccess("发送成功", message); }}

GeneralResponseDto是统一报文返回类;

ServiceException 是自定义异常类;

同学们自己建一个就好了。

到这里呢,SpringBoot2的整合就已完成。

测试走一波

这里直接用整合好的Swagger测试一波

看图就行

服务端消费者收到了 频道为001 的订阅消息!

So,同学们你们学会了吗?

标签: #kafka环境搭建及demo