RabbitMQ 简介与快速上手
一、MQ 简介
MQ(Message Queue):本质是一个先进先出的队列,只不过里面储存的是消息而已。消息可以是文本,JSON,也可以是内嵌对象等等。通常用于分布式系统之间的系统通信。
本文介绍了消息队列(MQ)的基本概念及其在分布式系统中的作用,包括异步解耦、流量削峰、消息分发和延迟通知。重点讲解了 RabbitMQ 作为基于 Erlang 实现的 AMQP 协议中间件的架构特点。内容涵盖 Linux 环境下(Ubuntu 和 CentOS)的安装部署步骤、RabbitMQ 的核心概念(如 Virtual Host、Exchange、Queue、Connection、Channel 等)、工作流程以及基于 Java 语言的快速上手示例,包括生产者和消费者的代码实现。

MQ(Message Queue):本质是一个先进先出的队列,只不过里面储存的是消息而已。消息可以是文本,JSON,也可以是内嵌对象等等。通常用于分布式系统之间的系统通信。
MQ 的作用:接收和转发消息。
RabbitMQ:采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 的消息中间件。
# 更新软件包
sudo apt-get update
# 安装 erlang
sudo apt-get install erlang
查看 erlang 版本:erl
退出命令:halt().
# 更新软件包
sudo apt-get update
# 安装 rabbitmq
sudo apt-get install rabbitmq-server
# 确认安装结果
systemctl status rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl add_user ${账号} ${密码}rabbitmqctl set_user_tags ${账号} ${角色名称}RabbitMQ 用户角色分为 Administrator、Monitoring、Policymaker、Management、Impersonator、None 共六种角色
- Administrator 超级管理员,可登陆管理控制台 (启用 management plugin 的情况下),可查看所有的信息,并且可以对用户,策略 (policy) 进行操作
- Monitoring 监控者,可登陆管理控制台 (启用 management plugin 的情况下),同时可以查看 rabbitmq 节点的相关信息 (进程数,内存使用情况,磁盘使用情况等)
- Policymaker 策略制定者,可登陆管理控制台 (启用 management plugin 的情况下),同时可以对 policy 进行管理。但无法查看节点的相关信息
- Management 普通管理者,仅可登陆管理控制台 (启用 management plugin 的情况下),无法看到节点信息,也无法对策略进行管理
- Impersonator 模拟者,无法登录管理控制台
- None 其他用户,无法登陆管理控制台,通常就是普通的生产者和消费者
sudo systemctl restart rabbitmq-server通过 IP:port 访问界面,默认端口号 15672
sudo systemctl stop rabbitmq-server
9.2. 查找 RabbitMQ 安装情况 dpkg -l | grep rabbitmq
9.3. 卸载 rabbitmq 已安装的相关内容 sudo apt-get purge --auto-remove rabbitmq-server
9.4. 卸载 Erlang
9.4.1. 查看 erlang 安装的相关列表:dpkg -l | grep erlang
9.4.2. 卸载 erlang 已安装的相关内容 sudo apt-get purge --auto-remove erlang安装 Erlang
1.1. 查看系统版本:cat /etc/redhat-release
1.2. 下载 Erlang 的 rpm 包:wget --content-disposition "https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140"
1.3. 安装 Erlang : yum localinstall erlang-23.3.4.11-1.el7.x86_64.rpm
安装 RabbitMQ
2.1. 下载 RabbitMQ 客户端 wget --content-disposition "https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmqserver-3.8.30-1.el7.noarch.rpm/download.rpm?distro_version_id=140"
2.2. 安装 RabbitMQ 客户端:导入签名秘钥:rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc,用 yum 进行安装 yum localinstall rabbitmq-server-3.8.30-1.el7.noarch.rpm
安装 RabbitMQ 管理界面:rabbitmq-plugins enable rabbitmq_management
重启 RabbitMQ:sudo service rabbitmq-server restart
服务操作:
卸载 RabbitMQ:
6.1. 停止 RabbitMQ 服务:service rabbitmq-server stop
6.2. 查看 RabbitMQ 安装列表:yum list|grep rabbitmq
6.3 卸载 rabbitmq 已安装的相关内容 yum -y remove rabbitmq-server.noarch
6.4. 删除 RabbitMQ 相关文件 rm -rf /var/lib/rabbitmq/ rm -rf /usr/local/rabbitmq
6.5. 卸载 Erlang
6.5.1. 查看 erlang 安装的相关列表 yum list | grep erlang
6.5.2. 卸载 erlang 已安装的相关内容 yum -y remove erlang.x86_64
6.5.3. 删除 Erlang 相关文件 rm -rf /usr/lib64/erlang/ rm -rf /usr/local/erlang
Exchange Exchange: 交换机。message 到达 broker 的第一站,它负责接收生产者发送的消息,并根据特定的规则把这些消息路由到一个或多个 Queue 列中。 Exchange 起到了消息路由的作用,它根据类型和规则来确定如何转发接收到的消息.
Queue Queue: 队列,是 RabbitMQ 的内部对象,用于存储消息.
Connection 和 Channel Connection: 连接。是客户端和 RabbitMQ 服务器之间的一个 TCP 连接。这个连接是建立消息传递的基础,它负责传输客户端和服务器之间的所有数据和控制信息. Channel: 通道,信道。Channel 是在 Connection 之上的一个抽象层。在 RabbitMQ 中,一个 TCP 连接可以有多个 Channel,每个 Channel 都是独立的虚拟连接。消息的发送和接收都是基于 Channel 的。 通道的主要作用是将消息的读写操作复用到同一个 TCP 连接上,这样可以减少建立和关闭连接的开销,提高性能.
Producer 和 Consumer Producer: 生产者,是 RabbitMQ Server 的客户端,向 RabbitMQ 发送消息 Consumer: 消费者,也是 RabbitMQ Server 的客户端,从 RabbitMQ 接收消息 Broker:其实就是 RabbitMQ Server,主要是接收和收发消息
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>
建立连接需要:IP,端口号,账号、密码、虚拟主机。
我们使用 com.rabbitmq.client 包下的 ConnectionFactory 连接工厂类设置需要的配置,并且创建连接。
// 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost"); // ip 地址
connectionFactory.setPort(5672); // 默认端口号
connectionFactory.setUsername("${username}"); // 用户
connectionFactory.setPassword("${password}"); // 用户密码
connectionFactory.setVirtualHost("${virtualHost}"); // 虚拟主机
// 获取连接
Connection connection = connectionFactory.newConnection();
// 开启信道
Channel channel = connection.createChannel();
交换机我们使用 RabbitMQ 自带的,
声明队列使用 Channel 类的 queueDeclare 方法。queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数说明:
// 声明队列
channel.queueDeclare("hello", true, false, true, null);
通过 Channel 类的 basicPublish 方法发送消息到队列中 basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
参数说明:
// 发送消息
String msg = "hello rabbitMQ";
channel.basicPublish("", "hello", null, msg.getBytes());
释放信道和连接。
// 资源释放
channel.close();
connection.close();
总代码:
package org.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// 连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost"); // ip 地址
connectionFactory.setPort(5672); // 默认端口号
connectionFactory.setUsername("${username}"); // 用户
connectionFactory.setPassword("${password}"); // 用户密码
connectionFactory.setVirtualHost("${virtualHost}"); // 虚拟主机
// 获取连接
Connection connection = connectionFactory.newConnection();
// 开启信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", true, false, true, null);
// 发送消息
String msg = "hello rabbitMQ";
channel.basicPublish("", "hello", null, msg.getBytes());
// 资源释放
channel.close();
connection.close();
}
}
步骤:
消费消息:使用 Channel 类的 basicConsume 方法 basicConsume(String queue, boolean autoAck, Consumer callback)
参数说明:
Consumer 类说明: Consumer 用于定义消息消费者的行为。当我们需要从 RabbitMQ 接收消息时,需要提供一個实现了 Consumer 接口的对象。
DefaultConsumer 类 是 RabbitMQ 提供的一个默认消费者,实现了 Consumer 接口。
核心方法:
handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) : 从队列接收到消息时,会自动调用该方法。
在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等。
参数说明如下:
总代码:
package org.example.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost"); // ip 地址
connectionFactory.setPort(5672); // 默认端口号
connectionFactory.setUsername("${username}"); // 用户
connectionFactory.setPassword("${password}"); // 用户密码
connectionFactory.setVirtualHost("${virtualHost}"); // 虚拟主机
// 创建连接
Connection connection = connectionFactory.newConnection();
// 开启信道
// 创建信道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", true, false, true, null);
// 消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body));
}
};
channel.basicConsume("hello", true, consumer);
// 释放资源
channel.close();
connection.close();
}
}
运行:

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online