核心要点
- 场景:支付/充值等需要最终一致性的链路,用 RabbitMQ 做异步解耦但必须可追责不丢单。
- 结论:Confirm 解决'Broker 收到',mandatory+Return 解决'路由失败可见',持久化 + 幂等兜底'宕机/重投/重复'。
- 产出:同步 Confirm、批量 Confirm、异步 Confirm 三套 Java 模板 + 一张常见故障速查卡。
RabbitMQ 高级特性
消息可靠性
一般我们使用支付宝或者微信转账的时候,都是扫码支付,然后立刻得到结果。支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。
分布式锁
操作某条数据的时候先锁定,可以用 Redis、ZooKeeper 等来实现。在我们修改订单的时候,先锁定该账单,如果该账单有并发操作,后面的操作只有等待上一个操作的锁释放后再依次执行。
- 优点:能够保证数据的强一致性。
- 缺点:高并发场景下可能有性能问题。
消息队列
消息队列是为了保证最终一致性,我们需要确保消息队列有 ACK 机制,客户端收到消息并消费处理之后,发送 ACK 给中间件,如果中间件超过指定时间还没有收到 ACK 消息,则定时去重发消息。
比如我们在用户充值完成之后,会发送充值消息给账户系统,账户系统再去更改账户余额。 优点:异步、高并发 缺点:有一定延时,数据弱一致性,并且必须能够保证该业务操作肯定能够成功,不能失败。
如何保证
我们从以下几个方面来保证消息的可靠性:
- 客户端代码中的异常捕获,包括生产者和消费者
- AMQP、RabbitMQ 的事务机制
- 发送端确认机制
- 消息持久化机制
- Broker 端的高可用集群
- 消费者确认机制
- 消费端限流
- 消息幂等性
异常捕获
先执行业务操作,业务操作成功后执行消息发送,消息发送的过程用 try catch 进行捕获,在一场处理的代码中执行回滚业务或者执行重复操作等,这是一种最大努力的确保方式,并无法保证 100% 绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
另外,可以通过 spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试。
事务机制
AMQP、RabbitMQ 的事务机制,没有捕获到异常不能代表消息就一定投递成功了。一直到事务提交之后都没有异常,说明消息确实投递成功了,但是,这种方式在性能方面开销比较大,一般也不推荐使用。
发送确认
RabbitMQ 后来引入了一种轻量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置为 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上发布的消息都会被拍成一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息也会在消息持久化后发出),RabbitMQ 会发送一个确认 Basic.Ack 给生产者,包含消息的唯一 ID,这样生产者就知道消息已经正确的送达了。
RabbitMQ 回传给生产者的确认消息中的 deliveryTag 字段包含了确认消息的序号,另外,通过设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息是否都已经得到了处理。 生产者投递消息后并不需要一直阻塞着,可以继续投递下一个消息并通过回调的方式处理 ACK 响应。 如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该 Nack 命令。
/**
* Publisher Confirms(同步等待确认)示例
*
* 语义:
* 1) confirmSelect():把 Channel 切换到 confirm 模式(broker 会对 publish 做 ack/nack)
* 2) waitForConfirmsOrDie(timeout):阻塞等待本批消息的 confirm
* - 全部 ack:正常返回
* - 任意 nack / channel 关闭:抛 IOException
* - 超时:抛 TimeoutException
*
* 注意:
* - Confirm 只保证'broker 收到了并落到交换机层面',不保证'路由到了队列'
* - 要保证'路由不到队列也能感知',需要 basicPublish 的 mandatory=true + ReturnListener/ReturnCallback
*/
public class {
;
;
QUEUE;
Exception {
();
factory.setHost();
factory.setVirtualHost();
factory.setUsername();
factory.setPassword();
factory.setPort();
( factory.newConnection();
connection.createChannel()) {
channel.confirmSelect();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, , , );
channel.queueDeclare(QUEUE, , , , );
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
(body, StandardCharsets.UTF_8);
System.err.printf(, replyCode, replyText, exchange, routingKey, returned);
});
;
channel.basicPublish(EXCHANGE, ROUTING_KEY, , , message.getBytes(StandardCharsets.UTF_8));
{
channel.waitForConfirmsOrDie();
System.out.println( + message);
} (TimeoutException e) {
System.err.println( + message);
e;
} (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println( + message);
e;
} (IOException e) {
System.err.println( + message);
e;
}
}
}
}


