详解RabbitMQ工作模式之发布确认模式

详解RabbitMQ工作模式之发布确认模式

​​​​​​​

目录

发布确认模式

概述

消息丢失问题

发布确认的三种模式

实现步骤

应用场景

代码案例

引入依赖

常量类

单条确认

运行代码

批量确认

运行代码

异步确认

运行代码

对比批量确认和异步确认模式


发布确认模式

概述
发布确认模式用于确保消息已经被正确地发送到RabbitMQ服务器,并被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。这一机制基于通道(Channel)级别,通过两个阶段的确认来保证消息的可靠性。
消息丢失问题

作为消息中间件, 都会⾯临消息丢失的问题.
消息丢失⼤概分为三种情况:

1. ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.
2. 消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 导致消息丢失.
3. 消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费失败的消息从队列中删除了。

RabbitMQ也对上述问题给出了相应的解决⽅案. 问题2可以通过持久化机制. 问题3可以采⽤消息应答机制.
针对问题1, 可以采⽤发布确认(Publisher Confirms)机制实现. 

发布确认的三种模式

RabbitMQ的发布确认模式主要有三种形式:单条确认、批量确认和异步确认。

单条确认(Single Publisher Confirm)

特点:在发布一条消息后,等待服务器确认该消息是否成功接收。
优点:实现简单,每条消息的确认状态清晰。
缺点:性能开销较大,特别是在高并发的场景下,因为每条消息都需要等待服务器的确认。

批量确认(Batch Publisher Confirm)

特点:允许在一次性确认多个消息是否成功被服务器接收。
优点:在大量消息的场景中可以提高效率,因为可以减少确认消息的数量。
缺点:当一批消息中有一条消息发送失败时,整个批量确认失败。此时需要重新发送整批消息,但不知道是哪条消息发送失败,增加了调试和处理的难度。

异步确认(Asynchronous Confirm)

特点:通过回调函数处理消息的确认和未确认事件,更加灵活。
优点:在异步场景中能够更好地处理消息的状态,提高了系统的并发性能和响应速度。
缺点:实现相对复杂,需要处理回调函数的逻辑和状态管理。
实现步骤
1.设置通道为发布确认模式:在生产者发送消息之前,需要将通道设置为发布确认模式。这可以通过调用channel.confirmSelect()方法来实现。
2.发送消息并等待确认:生产者发送消息时,每条消息都会分配一个唯一的、递增的整数ID(DeliveryTag)。生产者可以通过调用channel.waitForConfirms()方法来等待所有已发送消息的确认,或者通过其他方式处理确认回调。
3.处理确认回调:为了处理确认回调,需要创建一个ConfirmCallback接口的实现。在实现的handleAck()方法中,可以处理成功接收到确认的消息的逻辑;在handleNack()方法中,可以处理未成功接收到确认的消息的逻辑。
应用场景
发布确认模式适用于对数据安全性要求较高的场景,如金融交易、订单处理等。在这些场景中,消息的丢失或重复都可能导致严重的业务问题。通过使用发布确认模式,可以确保消息被正确地发送到RabbitMQ服务器,并被成功接收和持久化,从而提高了系统的可靠性和稳定性。
代码案例
引入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.21.0</version> </dependency>
常量类
public class Constants { public static final String HOST = "47.98.109.138"; public static final int PORT = 5672; public static final String USER_NAME = "study"; public static final String PASSWORD = "study"; public static final String VIRTUAL_HOST = "aaa"; //publisher confirms public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1"; public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2"; public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3"; }
单条确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 100; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #1: Publishing Messages Individually //单独确认 publishingMessagesIndividually(); } /** * 单独确认 */ private static void publishingMessagesIndividually() throws Exception { try(Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null); //4. 发送消息, 并等待确认 long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes()); //等待确认 channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } }
运行代码

我们可以看到,以发送消息条数为100条为例,单条确认模式是非常耗时的。 

批量确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #2: Publishing Messages in Batches //批量确认 publishingMessagesInBatches(); } /** * 批量确认 * @throws Exception */ private static void publishingMessagesInBatches() throws Exception{ try(Connection connection = createConnection()) { //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null); //4. 发送消息, 并进行确认 long start = System.currentTimeMillis(); int batchSize = 100; int outstandingMessageCount = 0; for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes()); outstandingMessageCount++; if (outstandingMessageCount==batchSize){ channel.waitForConfirmsOrDie(5000); outstandingMessageCount = 0; } } if (outstandingMessageCount>0){ channel.waitForConfirmsOrDie(5000); } long end = System.currentTimeMillis(); System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } }
运行代码

我们可以看到,以发送消息条数为10000条为例,单条确认模式是比较快的。 

异步确认
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitmq.constant.Constants; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; public class PublisherConfirms { private static final Integer MESSAGE_COUNT = 10000; static Connection createConnection() throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); //需要提前开放端口号 connectionFactory.setUsername(Constants.USER_NAME);//账号 connectionFactory.setPassword(Constants.PASSWORD); //密码 connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机 return connectionFactory.newConnection(); } public static void main(String[] args) throws Exception { //Strategy #3: Handling Publisher Confirms Asynchronously //异步确认 handlingPublisherConfirmsAsynchronously(); } /** * 异步确认 */ private static void handlingPublisherConfirmsAsynchronously() throws Exception{ try (Connection connection = createConnection()){ //1. 开启信道 Channel channel = connection.createChannel(); //2. 设置信道为confirm模式 channel.confirmSelect(); //3. 声明队列 channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null); //4. 监听confirm //集合中存储的是未确认的消息ID long start = System.currentTimeMillis(); SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple){ confirmSeqNo.headSet(deliveryTag+1).clear(); }else { confirmSeqNo.remove(deliveryTag); } //业务需要根据实际场景进行处理, 比如重发, 此处代码省略 } }); //5. 发送消息 for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "hello publisher confirms"+i; long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes()); confirmSeqNo.add(seqNo); } while (!confirmSeqNo.isEmpty()){ Thread.sleep(10); } long end = System.currentTimeMillis(); System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start); } } }
运行代码

我们可以看到,以发送消息条数为10000条为例,单条确认模式是非常快的。 

对比批量确认和异步确认模式

我们可以看到,异步确认模式是比批量确认模式快很多的。

Read more

VLM Unlearning 有关论文阅读总结与梳理

VLM Unlearning 有关论文阅读总结与梳理

文章目录 目录 前言 一、什么是 Unlearning 二、AUVIC 三、Neuron Pruning 四、 Neuron Path Editing 五、 MLLM Eraser 前言 本文整理了当前多模态大模型(VLM)中常见的 Unlearning 技术路线,主要包括: * AUVIC * Neuron Pruning * Neuron Path Editing * MLLM Eraser 这些方法的核心目标都是: 让模型“遗忘”指定知识,同时尽量不影响其它知识。 一、什么是 Unlearning 在多模态大模型(Vision-Language Model / VLA)中,我们经常需要: * 删除隐私数据 * 移除不安全知识 * 删除特定人物或敏感概念

By Ne0inhk
20 万星开源神器 OpenClaw 全解析:程序员 + 视频博主双视角实战体验

20 万星开源神器 OpenClaw 全解析:程序员 + 视频博主双视角实战体验

2026 年初,AI 圈最大的黑马非OpenClaw莫属。这个从 Clawdbot、Moltbot 迭代而来的开源项目,在 GitHub 上星标狂飙至 21.7 万,成为现象级 AI Agent 框架。作为一名拥有 7 年大数据开发经验的程序员,同时也是正在转型视频剪辑的博主,我深度体验了这款工具近一个月,发现它不仅能解放开发者的双手,更能为内容创作带来革命性的效率提升。本文将从技术架构、核心功能、安装部署、双身份实战体验四个维度,带你全面解锁 OpenClaw 的奥秘。 一、核心定位与起源:从 “聊天 AI” 到 “能干活的数字员工” 1. 精准定义 一句话概括:OpenClaw 是本地可自托管、多渠道交互、具备强执行能力的开源 AI Agent 执行引擎。它打破了传统

By Ne0inhk

Vitis AI推理加速实战:从零实现FPGA部署完整指南

从模型到硬件:Vitis AI 实战部署指南,让 FPGA 真正跑起深度学习 你有没有遇到过这样的场景?训练好的 PyTorch 模型准确率高达95%,信心满满地准备上板推理——结果在嵌入式 CPU 上一跑, 一张图要300毫秒 ,帧率不到4 FPS。别说实时检测了,连基本交互都卡顿。 这正是我在做工业缺陷检测项目时踩过的坑。后来我们换了一条路:把模型交给 FPGA + Vitis AI ,最终实现 每张图仅需12ms 的惊人加速。整个系统功耗还从5W降到2.5W,彻底告别风扇散热。 今天我就带你走一遍这条“少有人走却极高效”的路径—— 如何用 Xilinx 的 Vitis AI 工具链,把一个标准 PyTorch/TensorFlow 模型真正部署到 Zynq 或 Versal 芯片上,实现低延迟、

By Ne0inhk
QGIS:Maxar Open Data全球高分辨率遥感影像(0.3-0.5米)14TB免费获取

QGIS:Maxar Open Data全球高分辨率遥感影像(0.3-0.5米)14TB免费获取

今天给大家介绍一个插件Maxar Open Data QGIS Plugin,它是一个用于浏览、可视化和下载 Maxar 开放数据卫星图像的 QGIS 插件,用于灾害事件。 简介 Maxar Open Data 计划是全球领先的卫星影像服务商 Maxar Technologies 针对重大突发灾害发起的公益性数据共享项目。该项目旨在通过提供亚米级高分辨率的光学卫星影像,帮助人道主义组织、政府机构和一线救援人员更有效地进行灾情评估、资源调度及灾后重建工作。这些数据通常涵盖地震、洪水、野火及飓风等突发性自然灾害前后的对比图,为全球范围内的灾害响应提供关键的时间序列地理空间支持。 该数据集主要由 Maxar 旗下的高分辨率星座(如 WorldView-1/2/3 和 GeoEye-1)捕获,其空间分辨率可达 30cm 至 50cm,能够清晰识别建筑物损毁、道路阻塞及难民营规模。在灾害发生后的极短时间内,Maxar 会迅速将受灾区域的影像处理为分析就绪数据(ARD),并发布在专用的云平台上,供全球用户免费下载和使用。

By Ne0inhk