跳到主要内容
极客日志极客日志面向AI+效率的开发者社区
首页博客GitHub 精选镜像工具UI配色美学隐私政策关于联系
搜索内容 / 工具 / 仓库 / 镜像...⌘K搜索
注册
博客列表
Javajava

RabbitMQ 消息确认机制详解:自动与手动模式

综述由AI生成RabbitMQ 消息确认机制用于确保消息可靠投递,分为自动确认和手动确认两种模式。自动确认模式下,消息投递即视为成功,适合低可靠性场景;手动确认需消费者显式调用 Ack 方法,异常时可拒绝并重投,保障高可靠性。Spring-AMQP 提供了 NONE、AUTO、MANUAL 三种策略配置。NONE 模式投递即移除,可能丢消息;AUTO 模式默认开启,异常不确认并重投;MANUAL 模式需手动控制确认逻辑,灵活性最高。通过配置 Listener 的 acknowledge-mode 属性及编写相应的监听器代码,可实现不同级别的消息可靠性保障。

修罗发布于 2026/2/8更新于 2026/5/2823 浏览
RabbitMQ 消息确认机制详解:自动与手动模式

文章配图

消息确认机制

RabbitMQ 的消息确认机制

生产者发送消息之后,到达消费端之后,可能会有以下情况:

a. 消息处理成功 b. 消息处理异常

文章配图

RabbitMQ 向消费者发送消息之后,就会把这条消息删掉,那么第二种情况,就会造成消息丢失。那么如何确保消费端已经成功接收了,并正确处理了呢?为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。

消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:

• 自动确认:当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存 (或者磁盘) 中删除,而不管消费者是否真正地消费到了这些消息。自动确认模式适合对于消息可靠性要求不高的场景。 • 手动确认:当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地调用 Basic.Ack 命令,回复确认信号后才从内存 (或者磁盘) 中移去消息。这种模式适合对消息可靠性要求比较高的场景。

自动确认

源代码:

/** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag.
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements
 * @param callback an interface to the consumer object
 * @return the consumerTag generated by the server
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

代码示例:

DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:" + new String(body));
    }
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分:

一是等待投递给消费者的消息。 二是已经投递给消费者,但是还没有收到消费者确认信号的消息。

如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

文章配图

从 RabbitMQ 的 Web 管理平台上,也可以看到当前队列中 Ready 状态和 Unacked 状态的消息数。

Ready: 等待投递给消费者的消息数 Unacked: 已经投递给消费者,但是未收到消费者确认信号的消息数

手动确认

消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ 也提供了不同的确认应答的方式,消费者客户端可以调用与其对应的 channel 的相关方法,共有以下三种:

  1. 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)

RabbitMQ 已知道该消息并且成功的处理消息。可以将其丢弃了。

参数说明:

  1. deliveryTag: 消息的唯一标识,它是一个单调递增的 64 位的长整型值。deliveryTag 是每个通道(Channel)独立维护的,所以在每个通道上都是唯一的。当消费者确认 (ack) 一条消息时,必须使用对应的通道上进行确认。
  2. multiple: 是否批量确认。在某些情况下,为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认。值为 true 则会一次性 ack 所有小于或等于指定 deliveryTag 的消息。值为 false,则只确认当前指定 deliveryTag 的消息。

文章配图

deliveryTag 是 RabbitMQ 中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性。

  1. 否定确认:Channel.basicReject(long deliveryTag, boolean requeue)

RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。

参数说明:

  1. deliveryTag: 参考 channel.basicAck
  2. requeue: 表示拒绝后,这条消息如何处理。如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。如果 requeue 参数设置为 false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者。
  1. 否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

Basic.Reject 命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令。消费者客户端可以调用 channel.basicNack 方法来实现。

multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。

Spring-AMQP 的消息确认机制

下面我们基于 SpringBoot 来演示消息确认机制,不过和 RabbitMQ Java Client 库有一定差异。

Spring-AMQP 对消息确认机制提供了三种策略。

文章配图

消息确认机制的三种策略的解读:

1. AcknowledgeMode.NONE 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会自动确认消息,从 RabbitMQ 队列中移除消息。如果消费者处理消息失败,消息可能会丢失。 2. AcknowledgeMode.AUTO(默认) 这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。 3. AcknowledgeMode.MANUAL 手动确认模式下,消费者必须在成功处理消息后显式调用 basicAck 方法来确认消息。如果消息未被确认,RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。

代码演示
常量类
public class Constants {
    public static final String ACK_QUEUE = "ack.queue";
    public static final String ACK_EXCHANGE = "ack.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.example.demo.constant.Constants;

@Configuration
public class RabbitMQConfig {
    //消息确认
    @Bean("ackQueue")
    public Queue ackQueue() {
        return QueueBuilder.durable(Constants.ACK_QUEUE).build();
    }

    @Bean("directExchange")
    public DirectExchange directExchange() {
        return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();
    }

    @Bean("ackBinding")
    public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(directExchange).with("ack");
    }
}
声明 RabbitTemplate
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
编写生产消息代码
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.constant.Constants;

@RequestMapping("/producer")
@RestController
public class ProducerController {
    @Resource(name = "rabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/ack")
    public String ack() {
        rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");
        return "消息发送成功";
    }
}
AcknowledgeMode.NONE(演示)

添加配置

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://localhost:5672/
  listener:
    simple:
      acknowledge-mode: none
编写消费消息代码 1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        //消费者逻辑
        System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        System.out.println("业务处理完成");
    }
}

发送消息

文章配图

消费消息

文章配图

编写消费消息代码 2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel)throws UnsupportedEncodingException {
        //消费者逻辑
        System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

发送消息

文章配图

消费消息

文章配图

文章配图

文章配图

此时我们可以看到,消息虽然被消费者收到了,但是因为消费时发生异常导致消息没有被正常消费,而且队列中的消息也已经没有了。

AcknowledgeMode.AUTO(演示)

添加配置

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://localhost:5672/
  listener:
    simple:
      acknowledge-mode: auto
编写消费消息代码 1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        //消费者逻辑
        System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        System.out.println("业务处理完成");
    }
}

生产消息

文章配图

消费消息

文章配图

编写消费消息代码 2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel)throws UnsupportedEncodingException {
        //消费者逻辑
        System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

生产消息

文章配图

消费消息

文章配图

文章配图

文章配图

文章配图

此时我们可以看到,消费消息时发生异常,导致消息没有被正常消费,而且此时控制台不断地打印日志,deliveryTag 的值不断增加,观察控制界面可以看到队列中始终有一条消息,是因为 AUTO 机制下,如果消费消息时发生异常,此时消费方不会确认消息,此时消息就会重新入队,直到消费方能够正常消费消息而不发生异常。

AcknowledgeMode.MANUAL(演示)

添加配置

spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://localhost:5672/
  listener:
    simple:
      acknowledge-mode: manual
编写消费消息代码 1
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //消费者逻辑
        System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        System.out.println("业务处理完成");
    }
}

生产消息

文章配图

消费消息

文章配图

编写消费消息代码 2
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //消费者逻辑
        System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
        //进行业务逻辑处理
        System.out.println("业务逻辑处理");
        int num = 3/0;
        System.out.println("业务处理完成");
    }
}

发送消息

文章配图

消费消息

文章配图

文章配图

文章配图

此时我们看到,MANUAL 消息确认机制下,消费方消费消息时发生异常,没有手动确认,会导致消息在队列中的状态变为 Unacked 状态。

编写消费消息代码 3
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //消费者逻辑
            System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
            //进行业务逻辑处理
            System.out.println("业务逻辑处理");
            System.out.println("业务处理完成");
            //肯定确认
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //否定确认
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

生产消息

文章配图

消费消息

文章配图

此时我们可以看到,在 MANUAL 消息确认机制下,如果消息手动确认了,就没问题了。

编写消费消息代码 4
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.example.demo.constant.Constants;

@Component
public class AckListener {
    @RabbitListener(queues = Constants.ACK_QUEUE)
    public void handMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //消费者逻辑
            System.out.printf("接收到消息:%s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
            //进行业务逻辑处理
            System.out.println("业务逻辑处理");
            int num = 3/0;
            System.out.println("业务处理完成");
            //肯定确认
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //否定确认
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

生产消息

文章配图

消费消息

文章配图

文章配图

此时我们可以看到,当消费消息时,因为发生异常,程序会运行到拒绝消息,但是因为设置了重新入队,会导致控制台不停地打印消息,deliveryTag 的值不断增加。

目录

  1. 消息确认机制
  2. RabbitMQ 的消息确认机制
  3. 自动确认
  4. 手动确认
  5. Spring-AMQP 的消息确认机制
  6. 代码演示
  7. 常量类
  8. 声明队列和交换机并绑定二者关系
  9. 声明 RabbitTemplate
  10. 编写生产消息代码
  11. AcknowledgeMode.NONE(演示)
  12. 编写消费消息代码 1
  13. 编写消费消息代码 2
  14. AcknowledgeMode.AUTO(演示)
  15. 编写消费消息代码 1
  16. 编写消费消息代码 2
  17. AcknowledgeMode.MANUAL(演示)
  18. 编写消费消息代码 1
  19. 编写消费消息代码 2
  20. 编写消费消息代码 3
  21. 编写消费消息代码 4
  • 💰 8折买阿里云服务器限时8折了解详情
  • Magick API 一键接入全球大模型注册送1000万token查看
  • 🤖 一键搭建Deepseek满血版了解详情
  • 一键打造专属AI 智能体了解详情
极客日志微信公众号二维码

微信扫一扫,关注极客日志

微信公众号「极客日志V2」,在微信中扫描左侧二维码关注。展示文案:极客日志V2 zeeklog

更多推荐文章

查看全部
  • AI 大模型的掌握与运用技巧
  • 8 篇必读的大模型前沿论文
  • GR-RL:基于离线 RL 与在线微调的机器人系鞋带 VLA 策略
  • 前端 Word 文档生成实战:DOCX.js 完整使用指南
  • Ling Studio 实测:Ring-2.5-1T 如何优化 AI 开发工作流
  • 使用 Bright Data Web Scraper API + Python 高效抓取 Glassdoor 数据:从配置到结构化输出全流程实战
  • Windows 环境下 Git 的安装与基础配置
  • 荣耀 MWC 2026 展示 Robot Phone 与人形机器人,布局 AI 硬件生态
  • Arduino BLDC 机器人 IMU 角度读取、PID 控制与互补滤波
  • GitHub 学生开发者包认证全流程指南
  • SPA 单页应用无刷新更新部署方案与实战优化
  • 基于三省六部制的 AI Agent 协作架构 Edict 解析
  • 网络安全攻防:黑客攻击简要流程
  • iOS 26 开发兼容适配:UITabBar 液态玻璃效果与 WiFi SSID 获取
  • Python 使用 Turtle 库绘制动态樱花树及飘落效果
  • Rokid JSAR 实战指南:Web 技术栈 AR 开发环境搭建与 3D 时钟项目详解
  • C++ 模板进阶:特化、萃取与可变参数模板
  • C++ 内存管理:new/delete 原理及与 malloc/free 对比
  • 基于 SpringBoot 与 Vue3 的桂林旅游导游平台系统设计
  • 产品经理理解前端后端数据库——用产品语言解析技术架构

相关免费在线工具

  • Keycode 信息

    查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online

  • Escape 与 Native 编解码

    JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online

  • JavaScript / HTML 格式化

    使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online

  • JavaScript 压缩与混淆

    Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online

  • Base64 字符串编码/解码

    将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online

  • Base64 文件转换器

    将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online