RabbitMQ 发布确认模式
概述
发布确认模式用于确保消息已经被正确地发送到 RabbitMQ 服务器,并被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。这一机制基于通道(Channel)级别,通过两个阶段的确认来保证消息的可靠性。

消息丢失问题
作为消息中间件,都会面临消息丢失的问题。消息丢失大概分为三种情况:
- 生产者问题。因为应用程序故障、网络抖动等各种原因,生产者没有成功向 Broker 发送消息。
- 消息中间件自身问题。生产者成功发送给了 Broker,但是 Broker 没有把消息保存好,导致消息丢失。
- 消费者问题。Broker 发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 Broker 将消费失败的消息从队列中删除了。

RabbitMQ 也对上述问题给出了相应的解决方案。问题 2 可通过持久化机制解决。问题 3 可采用消息应答机制。针对问题 1,可采用发布确认(Publisher Confirms)机制实现。
发布确认的三种模式
RabbitMQ 的发布确认模式主要有三种形式:单条确认、批量确认和异步确认。
单条确认(Single Publisher Confirm)
特点:在发布一条消息后,等待服务器确认该消息是否成功接收。 优点:实现简单,每条消息的确认状态清晰。 缺点:性能开销较大,特别是在高并发的场景下,因为每条消息都需要等待服务器的确认。
批量确认(Batch Publisher Confirm)
特点:允许在一次性确认多个消息是否成功被服务器接收。 优点:在大量消息的场景中可以提高效率,因为可以减少确认消息的数量。 缺点:当一批消息中有一条消息发送失败时,整个批量确认失败。此时需要重新发送整批消息,但不知道是哪条消息发送失败,增加了调试和处理的难度。
异步确认(Asynchronous Confirm)
特点:通过回调函数处理消息的确认和未确认事件,更加灵活。 优点:在异步场景中能够更好地处理消息的状态,提高了系统的并发性能和响应速度。 缺点:实现相对复杂,需要处理回调函数的逻辑和状态管理。
实现步骤
- 设置通道为发布确认模式:在生产者发送消息之前,需要将通道设置为发布确认模式。这可以通过调用 channel.confirmSelect() 方法来实现。
- 发送消息并等待确认:生产者发送消息时,每条消息都会分配一个唯一的、递增的整数 ID(DeliveryTag)。生产者可以通过调用 channel.waitForConfirms() 方法来等待所有已发送消息的确认,或者通过其他方式处理确认回调。
- 处理确认回调:为了处理确认回调,需要创建一个 ConfirmListener 接口的实现。在实现的 handleAck() 方法中,可以处理成功接收到确认的消息的逻辑;在 handleNack() 方法中,可以处理未成功接收到确认的消息的逻辑。






