龙空技术网

ActiveMQ消息持久化到mysql与设置消息有效期

程序员界的彭于晏 1152

前言:

现在咱们对“mysql数据过期时间”大概比较重视,看官们都需要了解一些“mysql数据过期时间”的相关文章。那么小编在网摘上收集了一些关于“mysql数据过期时间””的相关文章,希望姐妹们能喜欢,同学们一起来了解一下吧!

在生产环境中,手工签收的方式比较合适,因为某个消息在消费端没有成功处理的情况下,可以不给ActiveMQ消息中间件发送针对这个消息的确认签收。同时,记录相关信息到日志文件或数据库中,以便后续做相应处理。在默认情况下,消息在ActiveMQ消息中间件中是不会过期的,可以根据实际的项目需要去设置消息的过期时间,单位毫秒。

消息优先级总共十个,即0-9。其中,0-4是普通消息,5-9是加急消息,默认优先级为4。加急消息理论上优先于普通消息被消费。消息的优先级并不能确保消息发送和消费的先后顺序,如果项目需要的话,可以在ActiveMQ中间件与消费者之间添加一个排队系统,来保证消费者顺序消费消息。

ActiveMQ的消息持久化机制

ActiveMQ的持久化机制包含JDBC,KahaDB、LevelDB

在activemq.xml中查看默认的broker持久化机制。

这里,将持久化方式改为JDBC

<persistenceAdapter>

<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->

<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />

</persistenceAdapter>

createTablesOnStartup默认值是true,即每次ActiveMQ启动的时候都重新创建数据表,一般是首次启动设置为true,之后设置为false。

同时,在broker标签外设置bean

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

<property name="driverClassName" value="com.mysql.jdbc.Driver"/>

<property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/>

<property name="username" value="root"/>

<property name="password" value="root"/>

<property name="maxActive" value="200"/>

<property name="poolPreparedStatements" value="true"/>

</bean>

将mysql的jdbc jar包放置到activemq的lib目录下。同时,将commons-pool.jar、commons-dbcp.jar和commons-collections.jar放置到activemq的lib目录下。

重新启动ActiveMQ,查看启动的命令行输出信息和登录ActiveMQ的默认控制台。若ActiveMQ正常启动和运行,则表示其JDBC持久化机制设置没有问题。

执行Producer

package com.cb;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

public static void main(String[] args) throws Exception{

//1.创建ConnectionFactory对象

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(

"cb",

"123456",

"tcp://localhost:61616");

//2.创建一个Connection并开启

Connection connection=connectionFactory.createConnection();

connection.start();

//3.创建Session会话,用来接收消息,通过参数可以设置:是否启用事务、消息签收模式

//参数设置生产者使用事务、客户端(消费者)签收方式

Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);

//4.创建Destination对象。在点对点模式中,该对象被称为Queue;在发布订阅模式中,该对象被称为Topic

Destination destination=session.createQueue("queue1");

//5.创建消息的生产者

MessageProducer messageProducer=session.createProducer(null);

//6.设置生产者的消息持久化与非持久化特性

//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//7.选择需要的JMS消息格式,创建并发送消息,此处选择的是TextMessage字符串对象

TextMessage textMessage=session.createTextMessage();

textMessage.setText("生产者"+"activemq消息测试");

//messageProducer.send(textMessage);

//第3个参数:是否持久化;第4个参数:优先级(0~4普通 5~9加急);第5个参数:消息在ActiveMQ中间件中存放的有效期

messageProducer.send(destination, textMessage,DeliveryMode.PERSISTENT, 4, 1000*60*10);

//使用事务,必须有commit操作

session.commit();

//8.释放Connection

if(null!=connection){

connection.close();

}

}

}

查看控制台

因为使用了JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。

执行Consumer

package com.cb;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

public static void main(String[] args) throws Exception{

//1.创建ConnectionFactory对象

ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(

"cb",

"123456",

"tcp://localhost:61616");

//2.创建一个Connection并开启

Connection connection=connectionFactory.createConnection();

connection.start();

//3.创建Session会话,用来接收消息,通过参数可以设置:是否启用事务、消息签收模式

Session session=connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

//4.创建Destination对象。在点对点模式中,该对象被称为Queue;在发布订阅模式中,该对象被称为Topic

Destination destination=session.createQueue("queue1");

//5.创建消息的消费者

MessageConsumer messageConsumer=session.createConsumer(destination);

//6.消费者从消息中间件的Queue获取消息

while(true){

TextMessage textMessage=(TextMessage) messageConsumer.receive();

if(null==textMessage){

break;

}

System.out.println("消费者接收到的内容:"+textMessage.getText());

//手动确认签收

textMessage.acknowledge();

}

//7.释放Connection

if(null!=connection){

connection.close();

}

}

}

控制台输出

因为消息已经被消费掉,再次查看mysql数据库中的activemq_msgs表,发现消息已经不存在了。

ActiveMQ设置消息的有效期

在上述的Producer类中,将消息的有效期设置为10分钟,若这条消息发送到了ActiveMQ消息中间件但一直未被消费,直到10分钟的时间到,消息则过期。

执行Producer,但是不执行Consumer。查看ActiveMQ的管控台

10分钟之后

标签: #mysql数据过期时间