龙空技术网

项目系统中使用Spring boot集成kafka业务实现系统处理消费实例

不断进步中 170

前言:

现在看官们对“bus集成apachekafka”大体比较看重,朋友们都想要剖析一些“bus集成apachekafka”的相关知识。那么小编也在网摘上搜集了一些关于“bus集成apachekafka””的相关知识,希望我们能喜欢,大家快快来了解一下吧!

框架学习整理记录下日常spring boot开发Kafka下面涉及的文件类;1、QiDongManagerApp.java 启动类处理 ,2、application.yml 配置文件,3、TestCommandBusProduce.java 生产者接口,4、TestCommandBusProduceImpl.java 生产者接口实现类,5、TestCommandBusConsumer.java 总线消费者,6、TestCommandBusListener.java业务场景监听处理逻辑7、maven下pom.xml集成kafka包

pom.xml文件

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.8.1</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>0.8.2.0</version>

</dependency>

application.yml 配置文件

app:

kafka:

event-bus:

cluster-name: teste_kcz8px7x01

system-url:

topic: Test_COMMAND_BUS_PRODUCE

product-topic-tokens: COMMAND_BUS_PRODUCE:test

product-poolsize: 5

consume-thread-count: 1

consume-message-group-size: 20

consume-topic-tokens: COMMAND_BUS_MANAGER:test01

启动类处理

QiDongManagerApp.java

@Bean

@ConfigurationProperties(prefix = "app.kafka.event-bus")

public EventBusKafkaConf eventBusKafkaConf() {

return new EventBusKafkaConf();

}

@Bean

@Profile(value = {"sit","prod"})

public TestCommandBusProduce TestCommandBusProduce(EventBusKafkaConf eventBusKafkaConf){

return new TestCommandBusProduceImpl(eventBusKafkaConf);

}

@Bean

@Profile(value = {"sit","prod"})

public TestCommandBusConsumer TestCommandBusConsumer(EventBusKafkaConf eventBusKafkaConf, IStringMessageConsumeListener ConsumeListener){

return new TestCommandBusConsumer(eventBusKafkaConf,ConsumeListener);

}

监听时间配置

EventBusKafkaConf.java

@Data

public class EventBusKafkaConf {

/**集群名称*/

private String clusterName;

/**地址*/

private String systemUrl;

/**命令总线:主题*/

private String topic;

private String productTopicTokens;

/**线程池大小*/

private int productPoolsize;

private int consumeMessageGroupSize;

private int consumeThreadCount;

private String consumeTopicTokens;

}

TestCommandBusProduce.java

public interface TestCommandBusProduce extends InitializingBean, DisposableBean {

/**

* 发送消息

*

* @param message

*/

public void sendSingleString(String id,String type,String message) ;

/**

* 发送消息到某个分区

* @param id 消息唯一编号

* @param type 消息类型

* @param key 分区key

* @param message 消息数据

*/

public void sendSingleStringWithKey(String id,String type,String key,String message) ;

/**

* 发送消息

*

* @param message

*/

public void batchSendString(String id,String type,List<String> list) ;

public void batchSendList(List<String> list);

public void sendSingleString(String message);

}

TestCommandBusProduceImpl.java

@Slf4j

public class TestCommandBusProduceImpl implements InitializingBean, DisposableBean,TestCommandBusProduce{

private IKafkaProducer kafkaProducer;

private EventBusKafkaConf kafkaConf;

public TestCommandBusProduceImpl(EventBusKafkaConf kafkaConf){

this.kafkaConf = kafkaConf;

}

@Override

public void afterPropertiesSet() throws Exception {

int poolSize = kafkaConf.getProductPoolsize();

String systemUrl = kafkaConf.getSystemUrl();

String clusterName = kafkaConf.getClusterName();

String topicTokens = kafkaConf.getProductTopicTokens();

ProduceConfig produceConfig = new ProduceConfig(poolSize, systemUrl, clusterName, topicTokens);

kafkaProducer = new ProducerPool(produceConfig);

}

/**

*

* @param topicTokens

* @throws Exception

*/

public void setProperties(String topicTokens) throws Exception {

int poolSize = kafkaConf.getProductPoolsize();

String systemUrl = kafkaConf.getSystemUrl();

String clusterName = kafkaConf.getClusterName();

ProduceConfig produceConfig = new ProduceConfig(poolSize, systemUrl, clusterName, topicTokens);

kafkaProducer = new ProducerPool(produceConfig);

}

@Override

public void destroy() throws Exception {

if (kafkaProducer != null) {

kafkaProducer.close();

}

}

/**

* 发送消息

*

* @param message

*/

public void sendSingleString(String id,String type,String message) {

JSONObject jsonObject = new JSONObject();

jsonObject.put("id", id);

jsonObject.put("type", type);

jsonObject.put("eventTime", new Date());

jsonObject.put("data", message);

log.info("sendSingleString jsonObject={}",jsonObject);

String topic = kafkaConf.getTopic();

sendSingleString(topic, jsonObject.toString());

}

public void sendSingleString(String message) {

log.info("sendSingleString jsonObject={}",message);

sendSingleString(kafkaConf.getTopic(), message);

}

private void sendSingleString(String topic, String message) {

//kafkaProducer.sendString(topic, message);

KeyedString keyedMessage=null;

String batchId = getBatchId(message);

if(StringUtils.isNotEmpty(batchId)){

keyedMessage=new KeyedString(batchId,message);

}else{

keyedMessage=new KeyedString("Test-event-stream",message);

}

//保证消息次序

kafkaProducer.sendKeyedString(topic, keyedMessage);

}

/**

* 业务场景开启不同分区

* @param message

* @return

*/

private String getBatchId(String message){

try {

JSONObject msg = JSONObject.parseObject(message);

String type = msg.getString("type");

JSONObject data = JSONObject.parseObject(msg.getString("data"));

//业务场景数据检查

if(TaskInfoConstants.SEND_CONTACT_CHECK.equals(type)

||TaskInfoConstants.SEND_CONTACT_CHECK_RESULT.equals(type)){

return data.getString("batchId");

}

//业务场景计算

if(TaskInfoConstants.SEND_CONTACT_BILL.equals(type)

||TaskInfoConstants.SEND_CONTACT_BILL_RESULT.equals(type)

||TaskInfoConstants.SEND_BILL_CALCSTATUS.equals(type)){

return data.getString("batchId");

}

//业务场景计算

if(TaskInfoConstants.WITHHOLD_CONTACT.equals(type)

||TaskInfoConstants.WITHHOLD_CONTACT_RESULT.equals(type)

||TaskInfoConstants.WITHHOLD_REPORT_RESULT.equals(type)){

return data.getString("batchId");

}

//业务场景

if(TaskInfoConstants.ICS_CONTRACT.equals(type)){

return data.getString("contractNo");

}

//业务场景回调

if(TaskInfoConstants.ZH_SALARY.equals(type)){

return data.getString("taskId");

}

} catch (Exception e) {

log.error(e.getMessage(), e);

}

return null;

}

/**

* 发送消息

*

* @param message

*/

public void batchSendString(String id, String type, List<String> list) {

log.info("sendSingleString id={},type={},list={}",id,type,list);

//保证消息次序

String topic = kafkaConf.getTopic();

batchSendString(topic, list);

}

private void batchSendString(String topic, List<String> list) {

//kafkaProducer.batchSendString(topic, list);

//保证消息次序

List<KeyedString> keyedStringList= Lists.newArrayList();

for(String message:list) {

keyedStringList.add(new KeyedString("Test-event-stream",message));

}

kafkaProducer.batchSendKeyedString(topic, keyedStringList);

}

/**

* 发送消息

*

* @param message

*/

@Override

public void batchSendList(List<String> list) {

log.info("batchSendList id={}",list);

//保证消息次序

String topic = kafkaConf.getTopic();

kafkaProducer.batchSendString(topic, list);

}

@Override

public void sendSingleStringWithKey(String id, String type, String key, String message) {

JSONObject jsonObject = new JSONObject();

jsonObject.put("id", id);

jsonObject.put("type", type);

jsonObject.put("eventTime", new Date());

jsonObject.put("data", message);

log.info("sendSingleStringWithKey key={}, jsonObject={}",key,jsonObject);

String topic = kafkaConf.getTopic();

KeyedString keyedMessage=new KeyedString(key,jsonObject.toString());

//保证消息次序

kafkaProducer.sendKeyedString(topic, keyedMessage);

}

}

TestCommandBusProduce.java

public interface TestCommandBusProduce extends InitializingBean, DisposableBean {

/**

* 发送消息

*

* @param message

*/

public void sendSingleString(String id,String type,String message) ;

/**

* 发送消息到某个分区

* @param id 消息唯一编号

* @param type 消息类型

* @param key 分区key

* @param message 消息数据

*/

public void sendSingleStringWithKey(String id,String type,String key,String message) ;

/**

* 发送消息

*

* @param message

*/

public void batchSendString(String id,String type,List<String> list) ;

public void batchSendList(List<String> list);

public void sendSingleString(String message);

}

TestCommandBusConsumer.java

public class TestCommandBusConsumer implements InitializingBean {

private EventBusKafkaConf kafkaConf;

private IStringMessageConsumeListener ConsumeListener;

public TestCommandBusConsumer(EventBusKafkaConf kafkaConf, IStringMessageConsumeListener ConsumeListener) {

super();

this.kafkaConf = kafkaConf;

this.ConsumeListener = ConsumeListener;

}

@Override

public void afterPropertiesSet() throws Exception {

initKafkaConfig();

}

public void initKafkaConfig() throws KafkaException {

ConsumeOptionalConfig optionalConfig = new ConsumeOptionalConfig();

optionalConfig.setMessageGroupSize(kafkaConf.getConsumeMessageGroupSize());

optionalConfig.setAutoOffsetReset(AutoOffsetReset.NOW);

ConsumeConfig pickupConsumeConfig = getConsumeConfig(kafkaConf.getTopic());

KafkaConsumerRegister.registerStringConsumer(pickupConsumeConfig, ConsumeListener, optionalConfig);

}

private ConsumeConfig getConsumeConfig(String topicName) {

return new ConsumeConfig(kafkaConf.getConsumeTopicTokens(), kafkaConf.getSystemUrl(), kafkaConf.getClusterName(), topicName, kafkaConf.getConsumeThreadCount());

}

}

TestCommandBusListener.java

@Slf4j

@Component

public class TestCommandBusListener implements IStringMessageConsumeListener {

@Resource

private ContactAreaConfigService contactAreaConfigService;

@Resource

private IAgContactInfoSnapshotService iAgContactInfoSnapshotService;

@Resource

private BillDispatcherService billDispatcherService;

@Resource

private YtBillDispatcherService ytBillDispatcherService;

@Resource

private IFileSysDateService iFileSysDateService;

@Autowired

private ApplicationEventPublisher publisher;

@Resource

private IAgContactInfoBatchService iAgContactInfoBatchService;

@Resource

private ContactAreaJtInfoService contactAreaJtInfoService;

@Resource

ExceptionNotifier exceptionNotifier;

@Autowired

private MessageReceiver messageReceiver;

@Override

public void onMessage(List list) throws KafkaConsumeRetryException {

try {

DataPermissionHodler.disablePermissionFilter();

// 剥离消息日志记录过程与消息处理过程

for (int i = 0; i < list.size(); i++) {

JSONObject jsonObject = JSON

.parseObject(list.get(i).toString());

publisher.publishEvent(new KafkaEvent(jsonObject

.getString("id"), jsonObject.getString("refId"),

jsonObject.getString("type"), jsonObject

.getDate("eventTime"), jsonObject

.getString("data")));

}

//消息异步处理分发

for (int i = 0; i < list.size(); i++) {

JSONObject jsonObject = JSON.parseObject(list.get(i).toString());

Message msg=new KafkaEvent(jsonObject.getString("id"),jsonObject.getString("refId"),

jsonObject.getString("type"),jsonObject.getDate("eventTime"),jsonObject.getString("data"));

messageReceiver.pushMessage(msg);

}

for (int i = 0; i < list.size(); i++) {

JSONObject jsonObject = JSON

.parseObject(list.get(i).toString());

String type = jsonObject.getString("type");

if(TaskInfoConstants.SEND_CONTACT_CHECK_RESULT.equals(type)){

billDispatcherService.csAndJsCheck(jsonObject

.getJSONObject("data"));

}

if(TaskInfoConstants.SEND_CONTACT_BILL_RESULT.equals(type)){

billDispatcherService.csAndJsJs(jsonObject

.getJSONObject("data"));

}

if(TaskInfoConstants.SEND_CONTRACT_CHANGED.equals(type)){

contactAreaConfigService.sysnContractConf(jsonObject

.getJSONObject("data"));

contactAreaJtInfoService.sysnContractConfJtInfo(jsonObject

.getJSONObject("data"));

}

// if(TaskInfoConstants.RETURN_BILL_DATA.equals(type)){

// iFileSysDateService.updatesysFileAttbill(jsonObject);

// }

if(TaskInfoConstants.WITHHOLD_CONTACT_RESULT.equals(type)){

ytBillDispatcherService.csAndJsJs(jsonObject

.getJSONObject("data"));

}

if(TaskInfoConstants.WITHHOLD_REPORT_RESULT.equals(type)){

ytBillDispatcherService.csAndYtJsEndJs(jsonObject

.getJSONObject("data"));

}

if(TaskInfoConstants.SEND_BILL_CALCULATE_INFO.equals(type)){

BillCalculateParam billCalculateParam = JSON

.parseObject(JSON.toJSONString(jsonObject

.getJSONObject("data")),

BillCalculateParam.class);

iAgContactInfoBatchService

.deleteBillCalculateData(billCalculateParam);

}

}

} catch (Exception e) {

exceptionNotifier.notify(e);

log.info("ContactAccountListener list={}", JsonUtil.toJson(list));

log.error(e.getMessage(), e);

}

}

}

标签: #bus集成apachekafka