Flink Batch Shuffle Blocking vs Hybrid 怎么选?Hash vs Sort 怎么调?一篇把坑点讲透的实战文

1. 为什么 Batch Shuffle 跟 Streaming 的 Pipelined Shuffle 不一样?

Streaming 的 pipelined shuffle 强依赖:上游和下游同时运行、边产边传边算。这对资源要求更高(slot、网络 buffer、并发等)。

Batch 的 blocking/hybrid 主要目标是:

  • 允许上下游不同时运行(尤其 blocking):用更少资源把任务跑完
  • 通过“持久化中间结果”提升稳定性(失败可重读、可恢复)
  • 在资源充足时(尤其 hybrid)尽量缩短整体执行时间

一句话:batch shuffle 关注的是 **“资源效率 + 稳定性 + 总耗时”**三角平衡。

2. Blocking Shuffle:Hash Shuffle vs Sort Shuffle

Blocking Shuffle 有两种实现:

  • Hash Shuffle:Flink 1.14 及以下默认
  • Sort Shuffle:Flink 1.13 引入,Flink 1.15 起成为默认

2.1 Hash Shuffle 原理与三个 IO 机制(file / mmap / auto)

Hash Shuffle 的典型行为是:

  • 每个 upstream task 会为每个 downstream task写一个独立文件(同一上游会写很多小/中等文件)
  • 下游运行时向上游所在 TaskManager 拉取 partition,上游读文件并通过网络发给下游

写读机制(可由 TaskManager 配置选择):

  • file:普通 File IO 写;读时使用 Netty FileRegion(依赖 sendfile)减少拷贝与内存消耗
  • mmap:读写使用 mmap 系统调用
  • auto:写用 File IO;读在 32 位退回 file,在 64 位使用 mmap(规避 32 位 Java mmap 文件大小限制)
Hash Shuffle 的关键坑点(非常“生产级”)
  • SSL 开启时FileRegion 不能用,只能用非池化 buffer 缓存再传,可能引发 direct memory OOM
  • 同步文件读可能阻塞 Netty 线程:需要增大 SSL handshake timeout,否则可能 connection reset
  • mmap 的内存占用不计入 Flink 配置的内存限制,但像 YARN 这类资源管理器可能会统计并在阈值超限时杀 container
  • 大规模作业会产生海量文件,需要更大的写 buffer,同时也容易把 inode / FD 打爆
  • HDD 场景:多个下游并发拉取时更容易触发 随机 IO,性能雪崩

结论:Hash Shuffle 在“小规模 + SSD”还能凑合,一旦规模大、磁盘是 HDD 或开启 SSL,就容易把稳定性问题放大。

2.2 Sort Shuffle:为什么 1.15 以后默认切它?

Sort Shuffle 的核心改动是:

  • 每个 result partition 只写一个文件
  • 多个下游并发读取同一个 partition 时:文件只打开一次,多读共享
    结果就是:inode/FD 更少,稳定性更好
  • 读尽量顺序化,尤其 HDD 上比 Hash Shuffle 更友好
  • Sort Shuffle 用额外的 managed memory 当读 buffer,不依赖 sendfile/mmap,因此 对 SSL 友好
重要说明:Sort Shuffle 目前“sort”的对象不是记录内容本身,而是按 partition index 做聚簇(数据聚集算法意义上的 sort)。
使用 Sort Shuffle 时通常需要关注的两个内存相关参数
  1. 写 buffer(来自 network memory)
  • taskmanager.network.sort-shuffle.min-buffers
    控制写入缓冲大小。大规模作业常需要加大,通常“几百 MB”量级就够(视并行度与数据量而定)
  • 增大它往往需要同步调大 network memory:
    • taskmanager.memory.network.fraction
    • taskmanager.memory.network.min
    • taskmanager.memory.network.max
      否则容易出现 Insufficient number of network buffers
  1. 读 buffer(来自 framework off-heap)
  • taskmanager.memory.framework.off-heap.batch-shuffle.size
    控制读共享内存缓冲。大规模作业建议 256M/512M 起步(视吞吐与并发读而定)
  • 这块内存从 framework off-heap 切出来,所以要同步增大:
    • taskmanager.memory.framework.off-heap.size
      否则可能 direct memory OOM

3. Blocking Shuffle 的选择策略:一张“决策表”就够

你可以按下面思路快速决策:

  • 小规模 + SSD:Hash/Sort 都能跑,优先考虑版本默认值
  • 大规模 或 HDD:Sort Shuffle 更适合(更稳定、顺序 IO 更友好、FD/inode 压力更小)
  • 开启 SSL:优先 Sort(Hash 的 FileRegion 受限,direct memory 风险上升)

3.1 如何切换 Hash 与 Sort:一个参数搞定

  • taskmanager.network.sort-shuffle.min-parallelism

它按下游并行度决定用哪种实现:

  • downstream parallelism < 该值:用 Hash Shuffle
  • 否则:用 Sort Shuffle

版本默认值差异非常关键:

  • Flink < 1.15:默认 Integer.MAX_VALUE,因此基本总是 Hash
  • Flink ≥ 1.15:默认 1,因此基本总是 Sort

想在 1.14 及以下强制 Sort:把它设成 1

4. Hybrid Shuffle:更快,但要读懂“实验性”的代价

Hybrid Shuffle 的定位是:下一代 batch exchange,结合 blocking 与 pipelined 的优点:

  • 像 blocking:不要求上下游必须同时运行(资源占用可控)
  • 像 pipelined:允许边产边消费(资源足时总耗时更短)
  • 通过 spilling 策略在“落盘更少”与“失败重启更少”之间做权衡

4.1 如何开启 Hybrid Shuffle

配置:

  • execution.batch-shuffle-mode = ALL_EXCHANGES_HYBRID_FULL
  • execution.batch-shuffle-mode = ALL_EXCHANGES_HYBRID_SELECTIVE

两种 spilling 策略:

  • Selective Spilling(SELECTIVE)
    下游消费不及时才落盘 → 落盘更少、可能更快
    代价:失败时上游可能需要重启以重算完整中间结果
  • Full Spilling(FULL)
    无论下游是否及时消费都落盘 → 更像“可边消费的 blocking”
    优点:失败时可直接复用已持久化的完整中间结果,不必重启上游
    代价:落盘更多

一个实用建议(偏生产落地):

  • 你想“先稳定跑起来”:从 FULL 开始
  • 你追求“极致吞吐/更少落盘”:再评估 SELECTIVE

4.2 数据消费约束:决定“到底有多 pipelined”

参数:

  • jobmanager.partition.hybrid.partition-data-consume-constraint

三种模式:

  • ALL_PRODUCERS_FINISHED:所有 producer 都结束后才允许消费(更接近 blocking)
  • ONLY_FINISHED_PRODUCERS:只能消费已结束 producer 的数据
  • UNFINISHED_PRODUCERS:允许消费未结束 producer 的数据(最像 pipelined)

对不同调度/特性有默认偏好:

  • AdaptiveBatchScheduler:默认 UNFINISHED_PRODUCERS(追求 pipelined-like)
    • 如果你改成 ALL_PRODUCERS_FINISHEDONLY_FINISHED_PRODUCERS,性能可能下降
  • 启用 SpeculativeExecution:默认 ONLY_FINISHED_PRODUCERS
    • 这样相比 blocking 仍有收益,但可能产生更多 speculative tasks,failover 成本也会上升
    • 想完全退回 blocking 行为:设成 ALL_PRODUCERS_FINISHED
    • 注意:该模式下 不支持UNFINISHED_PRODUCERS

4.3 Hybrid Shuffle 支持远端存储(OSS/HDFS/S3…)

配置:

  • taskmanager.network.hybrid-shuffle.remote.path

这在“本地磁盘不够、希望更弹性或更可靠的中间数据存储”时很有用,但也意味着你要评估远端吞吐与成本(尤其是大规模 shuffle 数据)。

4.4 Hybrid Shuffle 已知限制(别踩)

  • 不支持 Slot Sharing:hybrid 模式下当前强制每个 task 独占 slot;显式配置 slot sharing 会报错
  • 动态图(auto-parallelism)下无法 pipelined 执行:Adaptive Batch Scheduler 需要等上游结束才能决定下游并行度,hybrid 会“有效退化”为 blocking(ALL_PRODUCERS_FINISHED 行为)

5. 性能调优:给你一套“可直接抄作业”的配置思路

5.1 Blocking Shuffle 调优要点(尤其大规模 batch)

  1. HDD 场景强烈建议 Sort Shuffle
  • Flink ≥ 1.15 默认就是 Sort
  • Flink ≤ 1.14:taskmanager.network.sort-shuffle.min-parallelism: 1
  1. 启用压缩(数据好压缩时收益明显)
  • Flink ≥ 1.15:默认已启用
  • Flink ≤ 1.14:需要手动启用(具体键因版本而异,按你实际版本文档为准)
  1. Sort Shuffle 下的 network buffer 策略:让它更“抗大并行度”
  • 建议(Flink 1.14+ 的经验配置):
    • taskmanager.network.memory.buffers-per-channel: 0
    • taskmanager.network.memory.floating-buffers-per-gate: 4096(示例值,可按规模调整)

好处:

  • network memory 不再随并行度线性膨胀,降低 “Insufficient number of network buffers”
  • buffer 按需在 channel 间分配,提高利用率与性能
  1. 增大 network memory 总量
    默认 network memory 往往偏保守,大规模 batch 建议:
  • taskmanager.memory.network.fraction: 0.2(至少)
    并结合 min/max 做上下界约束,避免被切得太小
  1. 增大 shuffle 写 buffer(Sort Shuffle)
  • 增大 taskmanager.network.sort-shuffle.min-buffers
  • 同步增大 network memory,否则你加 buffer 反而更容易触发 buffers 不足
  1. 增大 shuffle 读 buffer(Sort Shuffle)
  • taskmanager.memory.framework.off-heap.batch-shuffle.size: 256m512m
  • 同步增大 taskmanager.memory.framework.off-heap.size(至少同等增量)

5.2 一份“示例配置片段”(按需改)

# 让 1.14 及以下也默认走 Sort Shuffletaskmanager.network.sort-shuffle.min-parallelism:1# Sort Shuffle 写 buffer(示例:具体要按并行度/数据量调)taskmanager.network.sort-shuffle.min-buffers: 512mb # 读 buffer(共享读内存)taskmanager.memory.framework.off-heap.batch-shuffle.size: 512mb taskmanager.memory.framework.off-heap.size: 1024mb # 网络内存总量(示例)taskmanager.memory.network.fraction:0.2taskmanager.memory.network.min: 512mb taskmanager.memory.network.max: 4096mb # Sort Shuffle 推荐的 buffer 分配策略(1.14+)taskmanager.network.memory.buffers-per-channel:0taskmanager.network.memory.floating-buffers-per-gate:4096

Hybrid Shuffle 示例(按需):

execution.batch-shuffle-mode: ALL_EXCHANGES_HYBRID_FULL jobmanager.partition.hybrid.partition-data-consume-constraint: UNFINISHED_PRODUCERS taskmanager.network.hybrid-shuffle.remote.path: s3://your-bucket/flink/hybrid-shuffle/ 

6. 排障清单:看到这些异常,先按这张表走

6.1 Blocking Shuffle 常见异常与解法

  • Insufficient number of network buffers
    • 本质:network memory 不够
    • 解法:增大 taskmanager.memory.network.fraction/min/max
    • 1.15 升级后更常见:因为 Sort 默认可能比之前更吃 network memory(某些场景)
  • Too many open files
    • Hash Shuffle 优先切 Sort
    • 若已 Sort:提高系统 FD 上限,并排查用户代码是否泄漏 FD
  • Connection reset by peer
    • 多见于网络不稳/负载重
    • Hash Shuffle 场景优先切 Sort
    • 若已 Sort:可考虑增大 network backlog;若有 SSL,关注 handshake timeout
  • Network connection timeout / Socket read/write timeout
    • 多为网络慢/拥塞
    • 解法:增大网络超时/启用重试;必要时调大 send/recv buffer
    • K8s 场景:某些情况下 hostNetwork 能缓解(需结合你的集群策略评估)
  • Read buffer request timeout(Sort Shuffle 特有)
    • 本质:shuffle 读共享内存争用太激烈
    • 解法:增大 taskmanager.memory.framework.off-heap.batch-shuffle.size
    • 同步增大 taskmanager.memory.framework.off-heap.size
  • No space left on device
    • 磁盘容量或 inode 耗尽
    • 解法:扩容/清理;Hash 更容易造成 inode 风暴,建议切 Sort
  • Out of memory error
    • Hash:优先切 Sort
    • Sort:按类型加内存
      • heap:taskmanager.memory.task.heap.size
      • direct/off-heap:taskmanager.memory.task.off-heap.size 或 framework off-heap
  • Container killed by external resource manager(例如 YARN)
    • Hash Shuffle 的 mmap/直接内存等可能导致资源管理器统计超限
    • 解法:优先切 Sort;再结合 Flink log 与 RM log 找根因

7. 最后的“选型口诀”

  • 批处理默认 Blocking 没问题,大规模/HDD/SSL:优先 Sort
  • 你要更短总耗时、并且能接受实验性限制:评估 Hybrid
  • Hybrid 想先稳:FULL spilling;想少落盘:再试 SELECTIVE
  • 看到 buffers/FD/inode/SSL 相关问题:第一反应别硬扛,先把 shuffle 模式与内存模型调顺

Read more

你真的理解Java SPI吗?从源码到实战的深度思考 [特殊字符]

你真的理解Java SPI吗?从源码到实战的深度思考 [特殊字符]

目录 * 前言:重新认识SPI * 核心思考一:SPI的本质是什么? * 核心思考二:ServiceLoader的优与劣 * 核心思考三:Dubbo如何优化SPI? * 核心思考四:实战中的坑与最佳实践 * 总结与后续计划 前言:重新认识SPI 这篇文章《Java SPI机制初探》来自得物技术团队,系统介绍了Java SPI的概念、原理以及在JDBC、Spring、Dubbo等框架中的应用。文章从SPI的基础概念出发,深入分析了ServiceLoader的源码实现,并结合实际场景讲解了SPI的优缺点和解决方案。 说实话,SPI这个名词一直出现在我耳边,但从未真正了解过。这次正好借着这篇文章来学习一下,看看和自己印象中的是否一致。看完之后,发现SPI其实没有我想象中那么复杂,但背后的设计思想确实值得深入思考。 核心思考一:SPI的本质是什么? API vs SPI:控制权的反转 文章开篇就对比了API和SPI的区别,这个对比让我对SPI有了更清晰的认识: * API:接口实现方同时负责接口定义和接口实现,接口控制权在服务提供方 * SPI:服务调用方负

By Ne0inhk
Java外功基础(1)——Spring Web MVC

Java外功基础(1)——Spring Web MVC

1.前置知识 1.1 Tomcat 定义:Tomcat是一个开源的轻量级Web(Http)服务器和Servlet容器。它实现了Java Servlet等Java EE规范的核心功能,常用于部署和运行Java Web应用程序 。换言之,Tomcat就是一个严格遵循Servlet规范开发出来的、可以独立安装和运行的Java Web服务器/Servlet容器核心功能:Servlet容器:支持Servlet的执行,处理HTTP请求和响应Web服务器:提供静态资源(如HTML)的访问能力,支持基本的HTTP服务安装与版本对应: tomcat官网:Apache Tomcat®目录结构:bin:存放可执行文件,如startup.batconf:存放配置文件lib:存放Tomcat运行所需的jar文件logs:存储日志文件temp:存放临时文件,如上传的文件或缓存数据webapps:默认web应用部署目录work:服务器的工作目录,存放运行时生成的临时文件(编译文件) 1.2 Servlet 1.2.1 定义

By Ne0inhk
Java 大视界 -- Java 大数据机器学习模型在自然语言处理中的少样本学习与迁移学习融合

Java 大视界 -- Java 大数据机器学习模型在自然语言处理中的少样本学习与迁移学习融合

Java 大视界 -- Java 大数据机器学习模型在自然语言处理中的少样本学习与迁移学习融合 * 引言:从虚拟偶像情感计算到语言智能的 “显微镜” 革命 * 正文:从理论架构到工业落地的全链条创新 * 一、NLP 领域的 “数据贫困” 困境与破局逻辑 * 1.1 少样本场景的核心挑战 * 1.2 Java 大数据的 “三维穿透” 技术架构 * 二、工业级融合模型的技术实现与代码解析 * 2.1 预训练模型迁移优化(BERT 医疗领域深度微调) * 2.2 原型网络(Prototypical Network)少样本分类 * 三、实战案例:从医疗语义分析到跨境电商智能客服 * 3.1 医疗场景:罕见病实体识别的 “样本逆袭” * 3.2 跨境电商:阿拉伯语商品类目分类的

By Ne0inhk
【C++笔记】STL详解:vector容器的实现

【C++笔记】STL详解:vector容器的实现

前言:         在学习了vector类的基本使用的前提下,本文将重点分析vector类的常用接口及其应用实现。          一、vector成员变量          vector本质上是一个动态数组,通过原生指针来实现底层维护,为了使得STL接口调用的统一性,我们需要将原生指针重命名为迭代器。          其核心目的是:将数据结构(容器)与操作(算法)分离,并通过一种统一的接口(迭代器)将它们粘合在一起。          成员变量分析 template <class T> class vector { public: // 将原生指针重命名为迭代器,实现接口统一 typedef T* iterator; typedef const T* const_iterator; private: iterator _start; // 指向目前使用空间的头 iterator _finish; // 指向目前使用空间的尾 iterator _end_of_storage; // 指向目前可用空间的尾 };          成员变量分析:

By Ne0inhk