前言:
目前兄弟们对“kafka环境搭建及demo”大概比较讲究,同学们都需要剖析一些“kafka环境搭建及demo”的相关内容。那么小编在网摘上汇集了一些对于“kafka环境搭建及demo””的相关资讯,希望大家能喜欢,姐妹们快快来学习一下吧!KafKa:削峰,异步,解耦
废话不多说,直接撸起袖子来一套!
Linux中 找个地方直接wget 下载
sudo wget
解压
sudo tar zxf kafka_2.11-2.3.0.tgz
进入到bin目录
cd kafka_2.11-2.3.0/bin
修改server.properties:
打开红框中2段注释第1段注释打开即可,无需修改第2段注释填写本机(服务器)IP即可保存
先启动kafka自带的zookeeper
sudo nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &
查看是否启动成功
sudo netstat -lnp |grep 2181
再启动kafka
sudo nohup ./kafka-server-start.sh ../config/server.properties &
查看是否启动成功
sudo netstat -lnp |grep 2181
到这里呢,kafka就已经在我的ubuntu上部署完成了,接下来我们进行SpringBoot的整合。
整合SpringBoot 2
导入Pom依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>
配置yml或properties 这里我演示yml
spring: kafka: bootstrap-servers: kafka所在服务器IP:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
无需配置类,无需配置类,无需配置类!
编写消费者:
package com.example.demo.queue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Servicepublic class KafkaConsumer { public static Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics = {"001"}) public void listen(String message) { logger.info("接受到消息:{}", message); }}
编写生产者:
/** * @Author: ChangYu * @Date: 7/17/2019 2:22 PM * @Version 1.0 */@Api(tags = "测试接口")@RestController@RequestMapping(value = "/api/test")public class TestController { public static Logger logger = LoggerFactory.getLogger(TestController.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @ApiOperation(value = "消息发送") @RequestMapping(value = "/produceMessage", method = RequestMethod.POST) public GeneralResponseDto produceMessage(@RequestParam("message") String message) throws IOException { ListenableFuture futureCallback = kafkaTemplate.send("001", message); futureCallback.addCallback((new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { throw new ServiceException("发送失败:" + message); } @Override public void onSuccess(Object result) { logger.info("发送成功:{}", message); } })); return GeneralResponseDto.addSuccess("发送成功", message); }}
GeneralResponseDto是统一报文返回类;
ServiceException 是自定义异常类;
同学们自己建一个就好了。
到这里呢,SpringBoot2的整合就已完成。
测试走一波
这里直接用整合好的Swagger测试一波
看图就行
服务端消费者收到了 频道为001 的订阅消息!
So,同学们你们学会了吗?
标签: #kafka环境搭建及demo