龙空技术网

RabbitMQ集群

空山细兩 500

前言:

现在大家对“centosrabbitmqc”大体比较关心,兄弟们都想要分析一些“centosrabbitmqc”的相关资讯。那么小编也在网络上收集了一些关于“centosrabbitmqc””的相关知识,希望我们能喜欢,各位老铁们快快来学习一下吧!

本部分分两部分:

集群搭建-通过Docker镜像: bitnami/rabbitmq:3.10.8高可用HA

基本集群+镜像队列在被加入的主机上通过命令rabbitmqctl join_cluster可以将一个新的节点加入集群中。如在linux-02主机上执行:rabbitmqctl join_cluster --ram rabbit@linux-01,即是将linux-02主机的rabbitmq加入到节点linux-01上。通过在任意的主机上执行rabbitmqctl cluster_status可以查看集群的状态。如果不设置同步策略,则发布的信息,即使是在集群状态下,消息也只会在一台主机上,所以,必须要设置同步策略后才可以同步消息。通过命令:rabbitmqctl list_policies 可以查看已经生效的策略。规划图示创建数据目录

mkdir /app/mqc/node{1..3}

设置拥有者,根据bitnami的要求,将目录拥有者设置为1001:

chown 1001:1001 /app/mqc/node{1..3} -R

启动脚本本次由于是在一台主机上创建三个节点,所以直接使用docker-compose.yml一次创建三个节点。bitnami/rabbitmq集群创建时,默认只会给第一个节点开启15672商品,所以其他节点也同样使用plugins环境变量,指定启动用的插件。不需要指定用户名的密码,容器启动后,登录进入窗口再创建用户,必须这样做。使用RABBITMQ_ERL_COOKIE指定相同的Cookie值。RABBITMQ_NODE_TYPE=queue-disc|queue-ram可以指定是内存节点或磁盘节点。

version: '3.9'

services:

node1:

image: 'bitnami/rabbitmq:3.10.8'

container_name: node1

hostname: node1

environment:

- RABBITMQ_NODE_TYPE=stats

- RABBITMQ_NODE_NAME=rabbit@node1

- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3

- RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream

ports:

- 5671:5672

- 15671:15672

volumes:

- ${PWD}/node1:/bitnami

node2:

image: 'bitnami/rabbitmq:3.10.8'

container_name: node2

hostname: node2

environment:

- RABBITMQ_NODE_TYPE=queue-disc

- RABBITMQ_NODE_NAME=rabbit@node2

- RABBITMQ_CLUSTER_NODE_NAME=rabbit@node1

- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3

- RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream

ports:

- 5672:5672

- 15672:15672

volumes:

- ${PWD}/node2:/bitnami

node3:

image: 'bitnami/rabbitmq:3.10.8'

container_name: node3

hostname: node3

environment:

- RABBITMQ_NODE_TYPE=queue-ram

- RABBITMQ_NODE_NAME=rabbit@node3

- RABBITMQ_CLUSTER_NODE_NAME=rabbit@node1

- RABBITMQ_ERL_COOKIE=s3cr3tc00ki3

- RABBITMQ_PLUGINS=rabbitmq_management,rabbitmq_mqtt,rabbitmq_stream

ports:

- 5673:5672

- 15673:15672

volumes:

- ${PWD}/node3:/bitnam

截图:

启动后查看容器

root@ubuntu61:/app/mqc# docker compose ps

NAME COMMAND SERVICE STATUS PORTS

node1 "/opt/bitnami/script…" node1 running 0.0.0.0:5671->5672/tcp, 0.0.0.0:15671->15672/tcp, :::5671->5672/tcp, :::15671->15672/tcp

node2 "/opt/bitnami/script…" node2 running 0.0.0.0:5672->5672/tcp, 0.0.0.0:15672->15672/tcp, :::5672->5672/tcp, :::15672->15672/tcp

node3 "/opt/bitnami/script…" node3 running 0.0.0.0:5673->5672/tcp, 0.0.0.0:15673->15672/tcp, :::5673->5672/tcp, :::15673->15672/tcp

查看集群状态

进入容器:

docker compose exec node1 bash

查看集群状态:

@node1:/$ rabbitmqctl cluster_status

Cluster status of node rabbit@node1 ...

Basics

Cluster name: rabbit@node1

Disk Nodes //disk节点

rabbit@node1

rabbit@node2

RAM Nodes //内存节点

rabbit@node3

Running Nodes

rabbit@node1

rabbit@node2

rabbit@node3

Versions

rabbit@node1: RabbitMQ 3.10.8 on Erlang 24.3.4

rabbit@node2: RabbitMQ 3.10.8 on Erlang 24.3.4

rabbit@node3: RabbitMQ 3.10.8 on Erlang 24.3.4

Maintenance status

Node: rabbit@node1, status: not under maintenance

Node: rabbit@node2, status: not under maintenance

Node: rabbit@node3, status: not under maintenance

截图:

创建用户

登录任意节点容器内部,执行以下命令:

创建账户:

rabbitmqctl add_user admin 123456

创建角色:

rabbitmqctl set_user_tags admin administrator

设置用户权限:

rabbitmqctl set_permissions -p “/” admin “.*” “.*” “.*”

登录ui

由于每一台主机,都已打开了managerment:15672端口,所以访问任何一个影射的端口都可以查看ui:

以下显示了集群的状态:

创建镜像(mirror)策略name:任意名称即可。pattern:正则表达式,用于指定哪些交换机和队列即被镜像。这儿设置了以prefix-开始的。ha-mode:exactlyha-mode: 指明镜像队列的模式,有效取值范围为all/exactly/nodes。all:表示在集群所有的代理上进行镜像。exactly:表示在指定个数的代理上进行镜像,代理的个数由ha-params指定。nodes:表示在指定的代理上进行镜像,代理名称通过ha-params指定。ha-params: ha-mode模式需要用到的参数。ha-sync-mode: 表示镜像队列中消息的同步方式,有效取值范围为:automatic,manually。automatic:表示自动向master同步数据。manually:表示手动向master同步数据。Priority: 可选参数, policy的优先级。测试创建交换机

在任意主机创建一个以prefix开始的交换机,注意,创建好的交换机,features中包含一个copy 标签。

创建队列注意,队列以名称prefix-开始。注意创建成功后,会出现一个+1的标签,即为已经镜像到其他的节点。代码开发(Java项目)

现在我们通过连接node1,并向node1发送信息,然后连接其他任意 的节点消费消息。

开发生产者设置任意一个ip地址,或全部都设置上也可以。由于镜像策略设置的prefix-开始的交换机和队列会被镜像,所以只能声明prefix-开始的交换机和队列。发布消息正常发布即可。

package wj.rabbitmq.cluster;

import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;

import java.util.List;

@Slf4j

public class ClusterSender {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("123456");

List<Address> list = new ArrayList<>();

//以下节点,至少指定一个,以下只使用node2,node3

//list.add(new Address("192.168.56.61",5671));

list.add(new Address("192.168.56.61",5672));

list.add(new Address("192.168.56.61",5673));

Connection con = factory.newConnection(list);

Channel channel = con.createChannel();

AMQP.BasicProperties prop = MessageProperties.PERSISTENT_TEXT_PLAIN;

channel.basicPublish(

"prefix-exchange",//指定之前在Ui上创建的交换机

"prefix-routing",//指定路由key,此值可以路由到prefix-queue队列

prop,//持久化

"Hello Cluster Msg".getBytes());

log.info("消息发送完成");

}

}

截图:

运行生产者

21:08:46.110 [main] 消息发送完成

消费者代码

package wj.rabbitmq.cluster;

import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

@Slf4j

public class ClusterReceiver {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("123456");

List<Address> list = new ArrayList<>();

//以下节点,可以任意指定至少一个,如消费者这儿只使用node3

//list.add(new Address("192.168.56.61",5671));

//list.add(new Address("192.168.56.61",5672));

list.add(new Address("192.168.56.61",5673));

Connection con = factory.newConnection(list);

Channel channel = con.createChannel();

channel.basicConsume(

"prefix-queue",//指定需要消费的queue

true,//auto ack true

new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

log.info("消费消息:{}",new String(body));

}

});

}

}

截图:

运行消费者输出:

21:10:56.526 [pool-2-thread-4] 消费消息:Hello Cluster Msg

代码开发(SpringBoot项目)springboot配置文件添加addresses后,默认的host,port配置将被ignore。其他配置保持不变。

server.port=8888

spring.application.name=helloworld

# 配置mq

#spring.rabbitmq.host=192.168.56.61

#spring.rabbitmq.port=5672

spring.rabbitmq.addresses=192.168.56.61:5671,192.168.56.61:5672,192.168.56.61:5673

spring.rabbitmq.username=admin

spring.rabbitmq.password=123456

spring.rabbitmq.listener.simple.prefetch=1

# 配置日志

logging.level.root=INFO

logging.file.name=./logs/log.log

截图:

开发配置类

类似于纯java代码,就是声明以prefix-开头的交换机及绑定关系。

package wj.mq.config.cluster;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ClusterConfig {

@Bean

public DirectExchange prefixExchange(){

DirectExchange exchange =

new DirectExchange("prefix-exchange",//name

true,//durable?

false);//auto delete?

return exchange;

}

@Bean

public Queue prefixQueue(){

Queue queue = new Queue("prefix-queue",//name of queue

true,//durable?

false,//exclusive?

false);//auto delete?

return queue;

}

@Bean

public Binding prefixBinding(@Qualifier("prefixExchange") DirectExchange prefixExchange,

@Qualifier("prefixQueue") Queue prefixQueue){

Binding binding =

BindingBuilder.bind(prefixQueue)//

.to(prefixExchange)//

.with("prefix-routing");

return binding;

}

}

截图:

开发消费者

package wj.mq.rabbitmq.cluster;

import com.rabbitmq.client.Channel;

import lombok.extern.slf4j.Slf4j;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.amqp.support.AmqpHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.messaging.handler.annotation.Payload;

import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j

@Component

public class ClusterReceiver {

@RabbitListener(queues = "prefix-queue", ackMode = "MANUAL")

public void process(@Payload() String message,

@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,

Channel channel) throws IOException {

log.info("消费消息->{},id={}", message, deliveryTag);

try {

channel.basicAck(deliveryTag, false);

} catch (Exception e) {

channel.basicNack(deliveryTag,false,true);

e.printStackTrace();

}

}

}

截图:

开发生产者

以下直接使用测试发布一条消息。

package wj.mq;

import lombok.extern.slf4j.Slf4j;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

@Slf4j

@SpringBootTest

public class Demo01_Cluster {

@Autowired

private RabbitTemplate rabbitTemplate;

@Test

public void send(){

String msg = "From SpringBoot";

rabbitTemplate.convertAndSend("prefix-exchange",

"prefix-routing",

msg);

log.info("消息发送完成");

}

}

截图:

运行测试

直接运行测试,控制台,输出以下效果:

2022-10-04 14:14:19.369 : 消息发送完成

2022-10-04 14:14:19.386: 消费消息->From SpringBoot,id=1

到此, 基于java项目和基于SpringBoot连接RabbitMQ集群的测试,都已经可以正常运行了。

宕机测试(测试符合要求-测试通过)

由于要做宕机测试,所以,这儿将上述代码连接的三个节点全部放开。

宕机node1节点

# docker compose stop node1

查看集群状态,node1节点已经宕机

发布信息虽然node1宕机,但我们的连接节点中包含node1可能发布成功。

package wj.rabbitmq.cluster;

import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;

import java.util.List;

@Slf4j

public class ClusterSender {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("123456");

List<Address> list = new ArrayList<>();

//以下节点,至少指定一个,以下只使用node2,node3

list.add(new Address("192.168.56.61",5671));

list.add(new Address("192.168.56.61",5672));

list.add(new Address("192.168.56.61",5673));

Connection con = factory.newConnection(list);

Channel channel = con.createChannel();

AMQP.BasicProperties prop = MessageProperties.PERSISTENT_TEXT_PLAIN;

channel.basicPublish(

"prefix-exchange",//指定之前在Ui上创建的交换机

"prefix-routing",//指定路由key,此值可以路由到prefix-queue队列

prop,//持久化

"Hello Cluster Msg".getBytes());

log.info("消息发送完成");

}

}

截图:

运行输出

21:13:43.214 [main] 消息发送完成

发布以后,查看node2界面,信息已经存入队列

消费消息

package wj.rabbitmq.cluster;

import com.rabbitmq.client.*;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

@Slf4j

public class ClusterReceiver {

public static void main(String[] args) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("admin");

factory.setPassword("123456");

List<Address> list = new ArrayList<>();

//以下节点,可以任意指定至少一个,如消费者这儿只使用node3

list.add(new Address("192.168.56.61",5671));

list.add(new Address("192.168.56.61",5672));

list.add(new Address("192.168.56.61",5673));

Connection con = factory.newConnection(list);

Channel channel = con.createChannel();

channel.basicConsume(

"prefix-queue",//指定需要消费的queue

true,//auto ack true

new DefaultConsumer(channel){

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

log.info("消费消息:{}",new String(body));

}

});

}

}

截图:

消费输出

21:16:31.047 [pool-2-thread-4] 消费消息:Hello Cluster Msg

重新启动node1

# docker compose start node1

启动后,多次刷新,查看集群状态:

OK.

标签: #centosrabbitmqc