【Java 开发日记】我们来说一下消息的可靠性投递

【Java 开发日记】我们来说一下消息的可靠性投递

目录

1. 核心概念

2. 面临的挑战

3. 关键实现机制

3.1 生产端保证

3.2 Broker端保证

3.3 消费端保证

4. 完整可靠性方案

4.1 事务消息方案(如RocketMQ)

4.2 最大努力投递方案

4.3 本地消息表方案(经典)

5. 高级特性与优化

5.1 顺序性保证

5.2 批量消息可靠性

5.3 监控与对账

6. 不同MQ的实现差异

7. 实践建议

总结

面试回答


1. 核心概念

可靠性投递(Reliable Delivery)是指确保消息从生产者成功到达消费者,即使面对网络故障、系统崩溃等异常情况也能保证不丢失、不重复、按顺序(部分场景)传递。

2. 面临的挑战

  • 网络不可靠:丢包、延迟、分区
  • 节点故障:生产者/消费者/中间件宕机
  • 重复消费:确认机制可能引发重复
  • 顺序保证:分布式环境下消息乱序

3. 关键实现机制

3.1 生产端保证
// 伪代码示例:生产端确认模式 public void sendWithConfirm(Message msg) { // 1. 持久化到本地数据库(防丢失) messageDao.save(msg); // 2. 发送到消息队列 String msgId = rabbitTemplate.convertAndSend(msg); // 3. 等待Broker确认 boolean ack = waitForAck(msgId, TIMEOUT); // 4. 失败重试(指数退避) if (!ack) { retryWithBackoff(msg); } // 5. 最终记录投递状态 updateDeliveryStatus(msgId, ack); }

技术要点

  • 事务机制:同步方式,性能差(不推荐)
  • 确认机制(Confirm)
    • 普通确认(每消息确认)
    • 批量确认(提高吞吐)
    • 异步监听(最佳实践)
  • 本地消息表:事务消息的替代方案
  • 消息持久化:设置delivery_mode=2
3.2 Broker端保证
消息处理流程: Producer → Broker接收 → 持久化存储 → 推送给Consumer → 等待ACK → 删除/重投

持久化策略

  • 队列持久化durable=true
  • 消息持久化delivery_mode=2
  • 镜像队列:多副本冗余(RabbitMQ)
  • 高可用集群:主从切换时不丢消息
3.3 消费端保证
// 消费端保证示例 @RabbitListener(queues = "order.queue") public void handleOrder(OrderMessage order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { try { // 1. 业务处理 orderService.process(order); // 2. 手动确认(成功才ACK) channel.basicAck(tag, false); // 3. 更新消费记录 consumeRecordService.markConsumed(order.getId()); } catch (Exception e) { // 4. 失败处理:重试或进入死信队列 if (retryCount < MAX_RETRY) { channel.basicNack(tag, false, true); // 重入队列 } else { channel.basicNack(tag, false, false); // 进入死信队列 alarmService.notifyAdmin(order, e); } } }

消费端关键点

  • 手动ACK:避免自动确认导致消息丢失
  • 幂等性设计
public boolean processWithIdempotent(String msgId) { // 基于消息ID去重 if (redis.exists("processed:" + msgId)) { return true; // 已处理过 } // 业务处理 boolean success = doBusinessLogic(); // 记录处理状态 if (success) { redis.setex("processed:" + msgId, 24h, "1"); } return success; }
  • 死信队列(DLQ):处理无法消费的消息
  • 消费重试策略
    • 立即重试(瞬时故障)
    • 延迟重试(业务依赖)
    • 指数退避(防止雪崩)

4. 完整可靠性方案

4.1 事务消息方案(如RocketMQ)
两阶段提交: 1. 发送Half Message(预备消息) 2. 执行本地事务 3. 根据本地事务结果Commit/Rollback 4. Broker检查事务状态并投递/丢弃
4.2 最大努力投递方案
# 补偿机制实现 def reliable_delivery(message): max_retries = 5 for attempt in range(max_retries): try: # 尝试投递 result = mq_client.send(message) if result.confirmed: log_delivery_success(message.id) return True except Exception as e: log_failure(attempt, e) if attempt == max_retries - 1: # 最终失败,人工介入 send_alert_to_admin(message) save_to_compensation_table(message) return False # 等待后重试 sleep(backoff_time(attempt)) return False
4.3 本地消息表方案(经典)
-- 本地消息表结构 CREATE TABLE local_message ( id BIGINT PRIMARY KEY, biz_id VARCHAR(64), -- 业务ID content TEXT, -- 消息内容 status TINYINT, -- 0:待发送, 1:已发送, 2:已确认 retry_count INT, next_retry_time DATETIME, created_at TIMESTAMP );

工作流程

  1. 业务数据+消息记录原子性写入本地DB
  2. 定时任务扫描待发送消息
  3. 调用MQ发送,成功后更新状态
  4. 消费者处理完成后反向确认
  5. 对账程序定期校验数据一致性

5. 高级特性与优化

5.1 顺序性保证
  • 全局有序:单队列单消费者(性能低)
  • 局部有序:相同sharding key的消息发到同一队列
  • 牺牲场景:重试队列可能破坏顺序
5.2 批量消息可靠性
// 批量消息的可靠性处理 public class BatchMessageReliableSender { public void sendBatch(List<Message> batch) { // 1. 批量持久化到本地 batchMessageDao.saveAll(batch); // 2. 设置批次ID String batchId = generateBatchId(); // 3. 发送批次消息 boolean success = mqTemplate.sendBatch(batchId, batch); // 4. 批次确认(或单条补偿) if (success) { markBatchDelivered(batchId); } else { // 逐条重试或记录异常 compensateFailedMessages(batch); } } }
5.3 监控与对账
  • 实时监控
    • 堆积情况监控
    • 消费延迟报警
    • 失败率统计
  • 定期对账:
-- 消息对账SQL示例 SELECT DATE(create_time) as day, COUNT(*) as total_sent, SUM(CASE WHEN status=2 THEN 1 ELSE 0 END) as confirmed, SUM(CASE WHEN status=1 THEN 1 ELSE 0 END) as pending FROM message_record GROUP BY DATE(create_time) HAVING total_sent != confirmed;

6. 不同MQ的实现差异

特性RabbitMQKafkaRocketMQ
可靠性机制确认+持久化+镜像队列副本机制+ACK+Exactly-Once事务消息+本地存储
顺序性单队列保证Partition内有序Queue内有序
事务支持轻量级事务(性能差)支持Exactly-Once语义完整事务消息
最佳适用场景业务消息、高可靠要求日志流、大数据场景金融交易、订单业务

7. 实践建议

  1. 分级可靠性策略
    • 关键业务:事务消息+本地表+对账
    • 普通业务:确认机制+重试+死信队列
    • 日志类:最多一次投递即可
  2. 性能与可靠性的平衡
    • 同步刷盘 vs 异步刷盘
    • 同步复制 vs 异步复制
    • 根据业务重要性选择配置
  3. 灾难恢复设计:
# 配置示例:多级降级 mq: primary: url: "amqp://primary" timeout: 1000ms secondary: url: "amqp://secondary" timeout: 2000ms fallback-to-db: true # 最终降级到数据库

总结

消息的可靠性投递是一个系统工程,需要在生产端、Broker端、消费端协同设计,结合业务场景、性能要求、成本约束做出合适的选择。没有"银弹"方案,只有最适合的方案。建议从简单方案开始,随着业务复杂度增加逐步引入更完善的可靠性机制。

面试回答

首先,消息可靠性投递指的是:
一个消息从发送到被消费者成功处理,过程中不会丢失或重复,保证最终数据的一致性。在实际系统里,消息可能因为网络问题、服务重启等原因丢失或重复,所以我们需要一套机制来确保可靠。

为什么需要它呢?
比如在订单系统中,用户支付成功后要通知物流系统,如果消息丢了,物流就不会触发,用户体验就受损;如果消息重复,可能重复发货,造成损失。所以像金融、交易这些场景,可靠性特别重要。

常见的实现方式,我了解的有几种:

  1. 生产者确认机制
    生产者发消息后,MQ(比如RabbitMQ)会返回一个确认(ACK),如果没收到ACK,生产者可以重发。这样可以防止消息在发送阶段丢失。
  2. 消息持久化
    消息保存到磁盘,而不是只放在内存。这样即使MQ重启,消息也不会丢。
  3. 消费者手动ACK
    消费者处理完消息后,手动告诉MQ“我已经处理完了”,MQ才删除消息;如果处理失败,MQ可以把消息重新投递给其他消费者。避免消息在处理阶段丢失。
  4. 事务消息(比如RocketMQ)
    先发一个“半消息”,等本地事务执行成功,再确认投递;如果失败,就回滚。这适用于分布式事务场景。
  5. 消息去重
    为了避免重复消费,可以在消费端做幂等性设计。比如在数据库里记录消息ID,每次处理前先查一下是否已经处理过。

实际中我们一般会结合业务来设计。
比如一个订单状态同步的场景,我可能会用:生产者确认 + 消息持久化 + 消费者手动ACK + 消费端幂等性。这样基本能覆盖发送、存储、消费各个环节的可靠性。

当然,可靠性和性能之间需要权衡,比如持久化会降低吞吐量,手动ACK会增加延迟。所以要根据业务需求来选择合适的方案。

追加:遇到过消息丢失或重复的问题,你是怎么排查和解决的?
追加:是否了解最终一致性、最大努力通知等模式 ?

如果小假的内容对你有帮助,请点赞评论收藏。创作不易,大家的支持就是我坚持下去的动力!

Read more

【Java 转运营】Day06:抖音直播间标签判定与优化全指南

【Java 转运营】Day06:抖音直播间标签判定与优化全指南

课程视频:夸克网盘 个人随笔:https://wangkay.top 核心逻辑:账号标签直接决定流量精准度与转化效率,需通过多维度监测确认标签匹配度,结合产品、价格、风格动态调整,避免标签偏移影响运营效果 一、账号标签的核心意义 1. 关键作用 * 产品适配:判断产品款式、风格是否与账号定位匹配,避免选品跑偏 * 价格过渡:指导低价起号后的阶梯涨价策略(如9.9→19.9→59.9),直接跳转高客单价转化率下降70%以上 * 风格延续:监测标签固化风险(例:去年爆款风格今年转化率下降40%),及时调整产品线 * 人群精准:反映核心用户画像(如31-40岁女性占比55%),决定选品方向与营销策略有效性 2. 重点监测场景 * 低价起号/高反运营阶段 * 风格转换期(如娱乐号转带货号,成功率<20%) * 粉丝量&

By Ne0inhk

【2025】JDK 22下载安装教程(附安装包)Java开发环境保姆级配置步骤

文章目录 * 前言 * JDK 22安装前的准备工作 * JDK 22安装包免费下载 * JDK 22安装教程(超详细) * Java环境变量配置常见问题 * Java开发入门必备知识 前言 准备开始Java编程之旅了吗?本文将手把手教您完成JDK 22的下载和安装过程,这是Java开发的第一步也是最关键的一步。无论您是初学Java的新手还是需要升级开发环境的老手,这份保姆级教程都能帮您轻松配置好Java环境。开发者必备的JDK安装其实很简单,跟着我的步骤来,绝对不会出错! JDK 22安装前的准备工作 在开始下载和安装JDK之前,我们需要确保电脑满足以下基本要求(这点超级重要!): * 确保您的电脑至少有500MB的可用空间 * 关闭可能干扰安装的杀毒软件 * 最好准备管理员权限,避免安装过程中的权限问题 JDK 22安装包免费下载 https://pan.quark.cn/s/89c02f7768ea JDK 22安装教程(超详细) ① 下载好安装包后,解压文件,打开文件夹后鼠标右击安装程序,选择"以管理员身份运行"。 ② 在安

By Ne0inhk

Java 后端开发(从菜鸟到起飞)

以下是针对 Java 后端开发 的最新学习路线,结合当前技术趋势与企业需求,分为 基础、进阶、实战、架构 四个阶段,帮助你系统性掌握技术栈并提升工程化能力: 阶段一:Java 核心基础(2-3个月) 目标:掌握 Java 核心语法与编程思想,熟悉常用工具链 1. 语法基础 * 数据类型、流程控制、异常处理 * 面向对象:封装/继承/多态、接口/抽象类、内部类 * 重点:集合框架(源码级理解 HashMap、ConcurrentHashMap)、IO/NIO、多线程(JUC 包) * 新特性:Lambda 表达式、Stream API、模块化(Java

By Ne0inhk
基于SpringBoot+Vue的Web教师个人成果管理系统管理系统设计与实现【Java+MySQL+MyBatis完整源码】

基于SpringBoot+Vue的Web教师个人成果管理系统管理系统设计与实现【Java+MySQL+MyBatis完整源码】

摘要 随着教育信息化的快速发展,教师个人成果管理已成为高校和科研机构的重要需求。传统的纸质档案管理方式效率低下,难以满足教师科研成果的动态更新和高效检索需求。教师个人成果管理系统通过数字化手段整合教学、科研、论文、专利等各类成果,实现数据的统一管理与可视化展示,有助于提升教师个人职业发展效率,同时为学校管理层提供数据支持。该系统能够有效解决传统管理方式中存在的分散存储、检索困难、统计不便等问题,为教师和学校提供便捷的管理工具。关键词:教师成果管理、信息化、数字化、动态更新、高效检索。 本系统采用前后端分离架构,前端基于Vue.js框架实现响应式用户界面,后端采用SpringBoot框架提供RESTful API接口,数据库使用MySQL存储数据,并通过MyBatis实现数据持久化操作。系统功能模块包括用户管理、成果录入、成果分类、数据统计与分析、权限控制等,支持教师上传论文、项目、专利等成果信息,并按照时间、类别等多维度展示。系统采用JWT进行身份认证,确保数据安全性,同时结合ECharts实现数据可视化,便于教师和管理员直观查看成果分布与趋势。关键词:SpringBoot、Vue

By Ne0inhk