1. 外部资源框架到底做了什么
整体就两件事:
1)改写资源请求(Resource Request)
- 你在 Flink 配置里声明要什么资源、要多少
- Flink 会把这些外部资源需求映射进底层资源管理系统(Kubernetes/YARN)的容器或 Pod 资源请求中
- 最终确保 TaskManager 所在的容器/Pod 真的带着你要的外部资源启动
2)把'可用资源信息'提供给算子(Operator)
Flink 外部资源框架实现作业对 GPU、FPGA 等硬件资源的原生申请与分配。框架负责向底层资源管理系统(如 Kubernetes、YARN)请求容器资源,并向算子提供可用资源信息。支持 GPU 推理、特征工程及 FPGA 加速场景。配置需包含插件加载、资源声明及驱动工厂设置。在 Standalone 模式下需注意多 TaskManager 间的资源协调,避免冲突。生产环境推荐结合 K8s 或 YARN 使用,并遵循设备亲和策略进行算子绑定。
整体就两件事:
1)改写资源请求(Resource Request)
2)把'可用资源信息'提供给算子(Operator)
一句话:框架负责'申请 + 告知',至于'怎么用'取决于具体插件。
适合的典型场景
当前边界(很关键)
如果你希望'每个算子/每个 subtask 独占一张卡',需要你在算子内部做更严格的绑定策略,或用脚本协调模式避免同机多 TM 抢同一 GPU(后面讲)。
外部资源通过 Flink 插件机制加载,你需要把对应 jar 放到 Flink 的 plugins/ 目录下的某个子目录中,例如:
plugins/external-resource-gpu/(或你自定义目录,但要保证 jar 能被加载)plugins/fpga/,把你打出来的 jar 放进去插件隔离非常重要:每个 plugin 目录是独立 classloader,避免依赖冲突;同时 SPI(ServiceLoader)文件必须保留(META-INF/services)。
核心配置有两层:
A)先声明启用哪些资源(白名单)
external-resources: gpu;fpga
只有这里列出来的 <resource_name> 才会生效。
B)为每个资源配置 amount、k8s/yarn 映射、driver、driver 参数 常见配置项含义:
external-resource.<name>.amount:每个 TaskManager 需要的资源数量external-resource.<name>.yarn.config-key:YARN 容器资源 profile 的映射 key(可选)external-resource.<name>.kubernetes.config-key:K8s 容器 resources.requests/limits 的 key(可选)external-resource.<name>.driver-factory.class:驱动工厂(可选但强烈建议)
external-resource.<name>.param.<param>:传给驱动工厂的自定义参数(插件自定义解释)一个包含 GPU+FPGA 的示例:
external-resources: gpu;fpga
external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
external-resource.gpu.amount: 2
external-resource.gpu.param.discovery-script.args: --enable-coordination-mode
external-resource.fpga.driver-factory.class: org.apache.flink.externalresource.fpga.FPGADriverFactory
external-resource.fpga.amount: 1
external-resource.fpga.yarn.config-key: yarn.io/fpga
算子侧用法非常直接:
public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
private static final String RESOURCE_NAME = "gpu";
@Override
public String map(String value) throws Exception {
Set<ExternalResourceInfo> infos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
List<String> indices = new ArrayList<>();
for (ExternalResourceInfo info : infos) {
info.getProperty("index").ifPresent(indices::add); // GPU 插件常用属性 key:index
}
// 这里用 indices 做设备绑定,比如选择一张卡 set CUDA_VISIBLE_DEVICES 或初始化推理引擎
return value;
}
}
每个 ExternalResourceInfo 里有哪些 key,取决于插件实现。你可以用 ExternalResourceInfo#getKeys() 获取完整键集合。
kubernetes.config-key 写入 TaskManager 主容器的:
resources.requests.<config-key>resources.limits.<config-key>GPU 的常见 key:
nvidia.com/gpuamd.com/gpu(但 Flink 默认 discovery 脚本是 NVIDIA 的,AMD 需要你自己写脚本)external-resource.<name>.yarn.config-key 把 amount 写进 container resource profileGPU(YARN)常见 key:
yarn.io/gpu(注意:YARN 当前通常仅支持 NVIDIA GPU 的调度)Flink 目前官方提供的一方外部资源插件就是 GPU 插件。
external-resources: gpu
external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
external-resource.gpu.amount: 2
# Kubernetes
external-resource.gpu.kubernetes.config-key: nvidia.com/gpu
# YARN
external-resource.gpu.yarn.config-key: yarn.io/gpu
GPUDriver 会调用 discovery script 来发现可用 GPU,并生成 ExternalResourceInfo,其中关键属性是:
index(GPU 设备 index)默认脚本路径(NVIDIA):
external-resource.gpu.param.discovery-script.path: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
自定义脚本路径(例如 AMD)也可以配置同一个 key。
脚本参数:
external-resource.gpu.param.discovery-script.args: --enable-coordination-mode
amount 作为第一个参数传给脚本discovery-script.args 会拼在后面0,1Standalone 下经常同机部署多个 TaskManager,此时所有 TM 默认都能看到同一批 GPU(nvidia-smi 可见),很容易'多进程抢同一张卡'。
默认脚本提供 coordination mode:
--enable-coordination-mode:启用协调--coordination-file <path>:协调文件路径(默认 /var/tmp/flink-gpu-coordination)它能保证:同一个 Flink 集群内,同机多个 TaskManager 不会分到同一张 GPU。
但要注意两点:
你需要实现三件套:
1)ExternalResourceDriver
retrieveResourceInfo(long amount):返回 ExternalResourceInfo 集合(你定义的资源维度)2)ExternalResourceDriverFactory
createExternalResourceDriver(Configuration config):从配置创建 driver3)SPI 服务声明(非常关键)
META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactoryyour.domain.FPGADriverFactory示例骨架:
public class FPGAInfo implements ExternalResourceInfo {
@Override
public Optional<String> getProperty(String key) {
// 根据 key 返回属性,比如 "device", "pci", "address" 等
return Optional.empty();
}
@Override
public Collection<String> getKeys() {
return List.of("device", "pci", "address");
}
}
public class FPGADriver implements ExternalResourceDriver {
@Override
public Set<ExternalResourceInfo> retrieveResourceInfo(long amount) {
// 发现并返回 FPGA 信息集合
return Set.of(/* ... */);
}
}
public class FPGADriverFactory implements ExternalResourceDriverFactory {
@Override
public ExternalResourceDriver createExternalResourceDriver(Configuration config) {
return new FPGADriver();
}
}
打包成 jar 丢到 plugins/fpga/,然后在 flink-conf.yaml 里按 <resource_name> 配置启用即可。
1)external-resources 没写 gpu(或资源名拼错)
2)插件 jar 没放到 plugins/ 下正确目录(或目录里缺依赖)
3)没配置 driver-factory.class,导致算子侧拿不到 ExternalResourceInfo
4)K8s 没装 NVIDIA device plugin(Pod 根本拿不到 GPU)
5)discovery script 不可执行 / 路径不对 / 返回非 0
6)Standalone 同机多 TM 没开协调模式,导致资源冲突看似'有卡但不可用'
index 决定 CUDA_VISIBLE_DEVICES 或引擎初始化参数
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
查找任何按下的键的javascript键代码、代码、位置和修饰符。 在线工具,Keycode 信息在线工具,online
JavaScript 字符串转义/反转义;Java 风格 \uXXXX(Native2Ascii)编码与解码。 在线工具,Escape 与 Native 编解码在线工具,online
使用 Prettier 在浏览器内格式化 JavaScript 或 HTML 片段。 在线工具,JavaScript / HTML 格式化在线工具,online
Terser 压缩、变量名混淆,或 javascript-obfuscator 高强度混淆(体积会增大)。 在线工具,JavaScript 压缩与混淆在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online