前言:
现在大家对“centosrabbitmqc”大体比较关心,兄弟们都想要分析一些“centosrabbitmqc”的相关资讯。那么小编也在网络上收集了一些关于“centosrabbitmqc””的相关知识,希望我们能喜欢,各位老铁们快快来学习一下吧!本部分分两部分:
集群搭建-通过Docker镜像: bitnami/rabbitmq:3.10.8高可用HA
基本集群+镜像队列在被加入的主机上通过命令rabbitmqctl join_cluster可以将一个新的节点加入集群中。如在linux-02主机上执行:rabbitmqctl join_cluster --ram rabbit@linux-01,即是将linux-02主机的rabbitmq加入到节点linux-01上。通过在任意的主机上执行rabbitmqctl cluster_status可以查看集群的状态。如果不设置同步策略,则发布的信息,即使是在集群状态下,消息也只会在一台主机上,所以,必须要设置同步策略后才可以同步消息。通过命令:rabbitmqctl list_policies 可以查看已经生效的策略。规划图示创建数据目录
mkdir /app/mqc/node{1..3}
设置拥有者,根据bitnami的要求,将目录拥有者设置为1001:
chown 1001:1001 /app/mqc/node{1..3} -R
启动脚本本次由于是在一台主机上创建三个节点,所以直接使用docker-compose.yml一次创建三个节点。bitnami/rabbitmq集群创建时,默认只会给第一个节点开启15672商品,所以其他节点也同样使用plugins环境变量,指定启动用的插件。不需要指定用户名的密码,容器启动后,登录进入窗口再创建用户,必须这样做。使用RABBITMQ_ERL_COOKIE指定相同的Cookie值。RABBITMQ_NODE_TYPE=queue-disc|queue-ram可以指定是内存节点或磁盘节点。
version: '3.9'
services:
node1:
image: 'bitnami/rabbitmq:3.10.8'
container_name: node1
hostname: node1
environment:
- RABBITMQ_NODE_TYPE=stats
- RABBITMQ_NODE_NAME=rabbit@node1
- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
- RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream
ports:
- 5671:5672
- 15671:15672
volumes:
- ${PWD}/node1:/bitnami
node2:
image: 'bitnami/rabbitmq:3.10.8'
container_name: node2
hostname: node2
environment:
- RABBITMQ_NODE_TYPE=queue-disc
- RABBITMQ_NODE_NAME=rabbit@node2
- RABBITMQ_CLUSTER_NODE_NAME=rabbit@node1
- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
- RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream
ports:
- 5672:5672
- 15672:15672
volumes:
- ${PWD}/node2:/bitnami
node3:
image: 'bitnami/rabbitmq:3.10.8'
container_name: node3
hostname: node3
environment:
- RABBITMQ_NODE_TYPE=queue-ram
- RABBITMQ_NODE_NAME=rabbit@node3
- RABBITMQ_CLUSTER_NODE_NAME=rabbit@node1
- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3
- RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream
ports:
- 5673:5672
- 15673:15672
volumes:
- ${PWD}/node3:/bitnam
截图:
启动后查看容器
root@ubuntu61:/app/mqc# docker compose ps
NAME COMMAND SERVICE STATUS PORTS
node1 "/opt/bitnami/script…" node1 running 0.0.0.0:5671->5672/tcp, 0.0.0.0:15671->15672/tcp, :::5671->5672/tcp, :::15671->15672/tcp
node2 "/opt/bitnami/script…" node2 running 0.0.0.0:5672->5672/tcp, 0.0.0.0:15672->15672/tcp, :::5672->5672/tcp, :::15672->15672/tcp
node3 "/opt/bitnami/script…" node3 running 0.0.0.0:5673->5672/tcp, 0.0.0.0:15673->15672/tcp, :::5673->5672/tcp, :::15673->15672/tcp
查看集群状态
进入容器:
docker compose exec node1 bash
查看集群状态:
@node1:/$ rabbitmqctl cluster_status
Cluster status of node rabbit@node1 ...
Basics
Cluster name: rabbit@node1
Disk Nodes //disk节点
rabbit@node1
rabbit@node2
RAM Nodes //内存节点
rabbit@node3
Running Nodes
rabbit@node1
rabbit@node2
rabbit@node3
Versions
rabbit@node1: RabbitMQ 3.10.8 on Erlang 24.3.4
rabbit@node2: RabbitMQ 3.10.8 on Erlang 24.3.4
rabbit@node3: RabbitMQ 3.10.8 on Erlang 24.3.4
Maintenance status
Node: rabbit@node1, status: not under maintenance
Node: rabbit@node2, status: not under maintenance
Node: rabbit@node3, status: not under maintenance
截图:
创建用户
登录任意节点容器内部,执行以下命令:
创建账户:
rabbitmqctl add_user admin 123456
创建角色:
rabbitmqctl set_user_tags admin administrator
设置用户权限:
rabbitmqctl set_permissions -p “/” admin “.*” “.*” “.*”
登录ui
由于每一台主机,都已打开了managerment:15672端口,所以访问任何一个影射的端口都可以查看ui:
以下显示了集群的状态:
创建镜像(mirror)策略name:任意名称即可。pattern:正则表达式,用于指定哪些交换机和队列即被镜像。这儿设置了以prefix-开始的。ha-mode:exactlyha-mode: 指明镜像队列的模式,有效取值范围为all/exactly/nodes。all:表示在集群所有的代理上进行镜像。exactly:表示在指定个数的代理上进行镜像,代理的个数由ha-params指定。nodes:表示在指定的代理上进行镜像,代理名称通过ha-params指定。ha-params: ha-mode模式需要用到的参数。ha-sync-mode: 表示镜像队列中消息的同步方式,有效取值范围为:automatic,manually。automatic:表示自动向master同步数据。manually:表示手动向master同步数据。Priority: 可选参数, policy的优先级。测试创建交换机
在任意主机创建一个以prefix开始的交换机,注意,创建好的交换机,features中包含一个copy 标签。
创建队列注意,队列以名称prefix-开始。注意创建成功后,会出现一个+1的标签,即为已经镜像到其他的节点。代码开发(Java项目)
现在我们通过连接node1,并向node1发送信息,然后连接其他任意 的节点消费消息。
开发生产者设置任意一个ip地址,或全部都设置上也可以。由于镜像策略设置的prefix-开始的交换机和队列会被镜像,所以只能声明prefix-开始的交换机和队列。发布消息正常发布即可。
package wj.rabbitmq.cluster;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class ClusterSender {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123456");
List<Address> list = new ArrayList<>();
//以下节点,至少指定一个,以下只使用node2,node3
//list.add(new Address("192.168.56.61",5671));
list.add(new Address("192.168.56.61",5672));
list.add(new Address("192.168.56.61",5673));
Connection con = factory.newConnection(list);
Channel channel = con.createChannel();
AMQP.BasicProperties prop = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(
"prefix-exchange",//指定之前在Ui上创建的交换机
"prefix-routing",//指定路由key,此值可以路由到prefix-queue队列
prop,//持久化
"Hello Cluster Msg".getBytes());
log.info("消息发送完成");
}
}
截图:
运行生产者
21:08:46.110 [main] 消息发送完成
消费者代码
package wj.rabbitmq.cluster;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class ClusterReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123456");
List<Address> list = new ArrayList<>();
//以下节点,可以任意指定至少一个,如消费者这儿只使用node3
//list.add(new Address("192.168.56.61",5671));
//list.add(new Address("192.168.56.61",5672));
list.add(new Address("192.168.56.61",5673));
Connection con = factory.newConnection(list);
Channel channel = con.createChannel();
channel.basicConsume(
"prefix-queue",//指定需要消费的queue
true,//auto ack true
new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("消费消息:{}",new String(body));
}
});
}
}
截图:
运行消费者输出:
21:10:56.526 [pool-2-thread-4] 消费消息:Hello Cluster Msg
代码开发(SpringBoot项目)springboot配置文件添加addresses后,默认的host,port配置将被ignore。其他配置保持不变。
server.port=8888
spring.application.name=helloworld
# 配置mq
#spring.rabbitmq.host=192.168.56.61
#spring.rabbitmq.port=5672
spring.rabbitmq.addresses=192.168.56.61:5671,192.168.56.61:5672,192.168.56.61:5673
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.listener.simple.prefetch=1
# 配置日志
logging.level.root=INFO
logging.file.name=./logs/log.log
截图:
开发配置类
类似于纯java代码,就是声明以prefix-开头的交换机及绑定关系。
package wj.mq.config.cluster;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ClusterConfig {
@Bean
public DirectExchange prefixExchange(){
DirectExchange exchange =
new DirectExchange("prefix-exchange",//name
true,//durable?
false);//auto delete?
return exchange;
}
@Bean
public Queue prefixQueue(){
Queue queue = new Queue("prefix-queue",//name of queue
true,//durable?
false,//exclusive?
false);//auto delete?
return queue;
}
@Bean
public Binding prefixBinding(@Qualifier("prefixExchange") DirectExchange prefixExchange,
@Qualifier("prefixQueue") Queue prefixQueue){
Binding binding =
BindingBuilder.bind(prefixQueue)//
.to(prefixExchange)//
.with("prefix-routing");
return binding;
}
}
截图:
开发消费者
package wj.mq.rabbitmq.cluster;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class ClusterReceiver {
@RabbitListener(queues = "prefix-queue", ackMode = "MANUAL")
public void process(@Payload() String message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) throws IOException {
log.info("消费消息->{},id={}", message, deliveryTag);
try {
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag,false,true);
e.printStackTrace();
}
}
}
截图:
开发生产者
以下直接使用测试发布一条消息。
package wj.mq;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
public class Demo01_Cluster {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void send(){
String msg = "From SpringBoot";
rabbitTemplate.convertAndSend("prefix-exchange",
"prefix-routing",
msg);
log.info("消息发送完成");
}
}
截图:
运行测试
直接运行测试,控制台,输出以下效果:
2022-10-04 14:14:19.369 : 消息发送完成
2022-10-04 14:14:19.386: 消费消息->From SpringBoot,id=1
到此, 基于java项目和基于SpringBoot连接RabbitMQ集群的测试,都已经可以正常运行了。
宕机测试(测试符合要求-测试通过)
由于要做宕机测试,所以,这儿将上述代码连接的三个节点全部放开。
宕机node1节点
# docker compose stop node1
查看集群状态,node1节点已经宕机
发布信息虽然node1宕机,但我们的连接节点中包含node1可能发布成功。
package wj.rabbitmq.cluster;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class ClusterSender {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123456");
List<Address> list = new ArrayList<>();
//以下节点,至少指定一个,以下只使用node2,node3
list.add(new Address("192.168.56.61",5671));
list.add(new Address("192.168.56.61",5672));
list.add(new Address("192.168.56.61",5673));
Connection con = factory.newConnection(list);
Channel channel = con.createChannel();
AMQP.BasicProperties prop = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish(
"prefix-exchange",//指定之前在Ui上创建的交换机
"prefix-routing",//指定路由key,此值可以路由到prefix-queue队列
prop,//持久化
"Hello Cluster Msg".getBytes());
log.info("消息发送完成");
}
}
截图:
运行输出
21:13:43.214 [main] 消息发送完成
发布以后,查看node2界面,信息已经存入队列
消费消息
package wj.rabbitmq.cluster;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class ClusterReceiver {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("123456");
List<Address> list = new ArrayList<>();
//以下节点,可以任意指定至少一个,如消费者这儿只使用node3
list.add(new Address("192.168.56.61",5671));
list.add(new Address("192.168.56.61",5672));
list.add(new Address("192.168.56.61",5673));
Connection con = factory.newConnection(list);
Channel channel = con.createChannel();
channel.basicConsume(
"prefix-queue",//指定需要消费的queue
true,//auto ack true
new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("消费消息:{}",new String(body));
}
});
}
}
截图:
消费输出
21:16:31.047 [pool-2-thread-4] 消费消息:Hello Cluster Msg
重新启动node1
# docker compose start node1
启动后,多次刷新,查看集群状态:
OK.
标签: #centosrabbitmqc