RabbitMQ 简介
消息队列(Message Queue,简称 MQ)本质上是一个先进先出的队列,只不过存储的是消息。消息可以是文本、JSON 或内嵌对象等,通常用于分布式系统之间的通信。
它的主要作用包括接收和转发消息,具体价值体现在以下几个方面:
- 异步解耦:业务流程中某些耗时操作无需即时返回结果。借助 MQ 将其异步化,例如用户注册后发送通知短信或邮件,可作为后台任务处理,不必等待完成才告知用户成功。
- 流量削峰:面对访问量剧增的突发场景,应用仍需保持可用。若按峰值投入资源会造成浪费。使用 MQ 可将请求排队,系统根据自身处理能力逐步消费,避免崩溃。例如秒杀活动。
- 消息分发:当多个系统需对同一数据响应时,可通过 MQ 分发。如支付成功后,支付系统发消息,其他系统订阅即可,无需轮询数据库。
- 延迟通知:利用延迟消息功能,在特定时间后发送通知。例如电商下单超时未支付自动取消订单。
RabbitMQ 采用 Erlang 语言实现 AMQP(高级消息队列协议),是业界广泛使用的消息中间件。
Linux 下安装 RabbitMQ
Ubuntu 环境安装
-
安装 Erlang
sudo apt-get update sudo apt-get install erlang查看版本:
erl,退出命令:halt(). -
安装 RabbitMQ
sudo apt-get update sudo apt-get install rabbitmq-server systemctl status rabbitmq-server -
启用管理界面插件
rabbitmq-plugins enable rabbitmq_management -
配置用户与权限
- 添加用户:
rabbitmqctl add_user <账号> <密码> - 设置标签:
rabbitmqctl set_user_tags <账号> administrator
注意:角色分为 Administrator、Monitoring、Policymaker、Management 等。普通生产者和消费者通常设为
None或仅赋予 Management 权限以便登录控制台。 - 添加用户:
-
重启服务
sudo systemctl restart rabbitmq-server默认通过 IP:15672 访问管理界面。
-
卸载 停止服务后,使用
apt-get purge --auto-remove rabbitmq-server及erlang相关包清理。
CentOS 安装
-
安装 Erlang
cat /etc/redhat-release wget --content-disposition "https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm?distro_version_id=140" yum localinstall erlang-23.3.4.11-1.el7.x86_64.rpm -
安装 RabbitMQ
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc yum localinstall rabbitmq-server-3.8.30-1.el7.noarch.rpm -
启用管理插件并重启
rabbitmq-plugins enable rabbitmq_management sudo service rabbitmq-server restart
工作流程与核心概念
RabbitMQ 的核心组件围绕 Broker(服务器)、Virtual Host(虚拟主机)、Exchange(交换机)、Queue(队列)、Connection(连接)和 Channel(信道)展开。
消息流转逻辑
- Producer 建立 TCP Connection,并在其上创建 Channel。
- 通过 Channel 将消息发送给 Exchange,指定 routing_key。
- Exchange 根据类型(direct/topic/fanout/headers)和 Binding 规则,将消息路由到 0~N 个 Queue。
- Queue 按 FIFO 缓冲消息;若开启持久化则落盘。
- Consumer 建立 Connection/Channel,订阅或拉取 Queue 中的消息。
- Consumer 处理消息后回复 ack,RabbitMQ 收到后删除消息;若未收到 ack 且断开,消息重新入队。
关键概念详解
- Virtual Host (vhost):逻辑隔离单元,类似数据库中的库。不同 vhost 间的 Exchange 和 Queue 完全隔离。
- Exchange:消息入口,负责路由。不直接存储消息,而是根据规则转发。
- Queue:真正存储消息的内部对象。
- Connection & Channel:Connection 是 TCP 长连接,Channel 是其上的轻量级子连接。复用同一个 TCP 连接可显著减少开销,提高性能。
- Producer & Consumer:客户端角色,分别负责发布和接收消息。
Java 快速上手示例
以下示例基于 Java 客户端,展示从依赖引入到收发消息的完整流程。
1. 引入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>
2. 生产者实现
建立连接与信道
我们需要配置 IP、端口、账号密码及虚拟主机。实际开发中建议从配置文件读取,切勿硬编码敏感信息。
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost"); // 替换为实际服务器 IP
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
声明队列与发送消息
简单模式下,可使用默认交换机(空字符串)。注意队列参数的含义:
durable: 是否持久化exclusive: 是否独占autoDelete: 无消费者时是否自动删除
// 声明队列
channel.queueDeclare("hello", true, false, true, null);
// 发送消息
String msg = "Hello RabbitMQ";
channel.basicPublish("", "hello", null, msg.getBytes());
System.out.println("[x] Sent '" + msg + "'");
释放资源
务必关闭信道和连接,防止资源泄漏。
channel.close();
connection.close();
完整代码参考:
package org.example.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, true, null);
String msg = "Hello RabbitMQ";
channel.basicPublish("", "hello", null, msg.getBytes());
channel.close();
connection.close();
}
}
3. 消费者实现
消费者需要监听队列,接收到消息后执行回调。
package org.example.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", true, false, true, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[x] Received: '" + message + "'");
// 业务逻辑处理...
}
};
// autoAck 设为 true 表示自动确认,生产环境建议手动 ack
channel.basicConsume("hello", true, consumer);
}
}
运行消费者后,控制台应能打印出生产者发送的消息。至此,一个基础的点对点消息传递链路已打通。在实际生产中,还需关注异常处理、事务机制以及集群部署策略。


