Spark Datafusion Comet 向量化Rust Native--Native算子ScanExec以及涉及到的Selection Vectors

背景

Apache Datafusion Comet 是苹果公司开源的加速Spark运行的向量化项目。
本项目采用了 Spark插件化 + Protobuf + Arrow + DataFusion 架构形式
其中

  • Spark插件是 利用 SparkPlugin 插件,其中分为 DriverPlugin 和 ExecutorPlugin ,这两个插件在driver和 Executor启动的时候就会调用
  • Protobuf 是用来序列化 spark对应的表达式以及计划,用来传递给 native 引擎去执行,利用了 体积小,速度快的特性
  • Arrow 是用来 spark 和 native 引擎进行高效的数据交换(native执行的结果或者spark执行的数据结果),主要在JNI中利用Arrow IPC 列式存储以及零拷贝等特点进行进程间数据交换
  • DataFusion 主要是利用Rust native以及Arrow内存格式实现的向量化执行引擎,Spark中主要offload对应的算子到该引擎中去执行

本文基于 datafusion comet 截止到2026年1月13号的main分支的最新代码(对应的commit为 eef5f28a0727d9aef043fa2b87d6747ff68b827a)
主要分析Rust Native的Spark Datafusion Comet 向量化Rust Native–执行Datafusion计划中ScanExec以及涉及到的Selection Vectors

Selection Vectors

什么是Selection Vectors

Selection Vectors 是向量化查询执行引擎过滤操作中的一种表达,还有另一种表达是 Bitmap :

  1. Bitmap 表达是: 用 BitMap 来标记哪些数据是被过滤选中的
  2. Selection Vectors表达是:用 vector 存储被命中的数据的下标
    两者的区别是Bitmap表达会记录所有的数据,只不过是用不同的0/1代表存在与否,而 Selection Vectors 只记录命中的数据
    具体相关的论文可以参考Filter Representation in Vectorized Query Execution
    针对这两种过滤算子的表达,可以衍生出三种执行策略:
  • BMFull:总是对所有数据处理,未选中的数据的值未定义,优势是能充分发挥向量化的优势
  • BMPartial:只对选中的数据进行处理,无法利用向量化,依然需要遍历所有下标
  • SVPartial:只需要遍历选中的下标,无法利用向量化

ScanExec读取以及涉及到的Select Vectors

    • values 为一列中的所有原始值
    • selectionIndices为选中的数据的下标
      • 假如存在则获取每一列的Selection Vector,否则返回None
        • 首先用对于每一列值构造一个Vec(FFI_ArrowArray)Vec(FFI_ArrowSchema)类型的数组以及初始化数组,并创建的 FFI_ArrowArrayFFI_ArrowSchema对应的地址插入到该数组中
        • 使用JNIEnv.new_long_array创建Java Long型数组
        • 使用JNIEnv.set_long_array_region新创建的Java Long型数组(也就是对应的FFIArray/Schema地址)赋值给该数组
        • 调用ArrayData::from_spark方法将Spark 端通过 Arrow C Data Interface 传递过来的内存地址转换为 Rust 端Arrow ArrayData 对象
          这里主要使用了from_ffi方法,从这些裸指针(Raw Pointers)重建出 Rust 的 ArrayData 结构,这个过程是零拷贝的(Zero-copy),直接复用 Spark 分配的内存;并调用 align_buffers() 确保数据在 Rust 端能被正确、安全地访问(例如 SIMD 操作对内存对齐有要求)
    • allocate_and_fetch_batch方法 及后续说明
    • 调用allocate_and_fetch_batch方法从Java端获取一批数据并赋值到传入的FFI_ArrowArray和FFI_ArrowSchema中
      对于java端的处理和之前的exportSingleVector方法处理一样,主要是使用Data.exportVector方法来进行数据回传
    • 如果存在Selection Vector,则使用Arrow take方法获取到真正的值,否则就是原值
      其中对&Arc使用 &**操作是将类型由引用 &Arc 依次转换为 Arc、T,最后再取引用 &T,用于不转移所有权地读取数据。
    • 如果存在Selection Vector,则返回真正的行数
      最后组装成InputBatch::new(inputs, Some(actual_num_rows)返回

JNI调用JVM的exportSelectionIndices方法

 let _exported_count: i32 = unsafe { jni_call!(env, comet_batch_iterator(iter).export_selection_indices( JValueGen::Object(JObject::from(indices_array_obj).as_ref()), JValueGen::Object(JObject::from(indices_schema_obj).as_ref()) ) -> i32)? }; 

这里用到的as_ref方法通常用于将高级包装类型(如JObject, JString等)转换为对底层JNI指针(jobject)的共享引用。它使得在不改变对象所有权的情况下,可以安全地将对象传递给JNIEnv函数进行后续操作,
在Java侧的话,主要是NativeUtil.exportSingleVector的调用:

 def exportSingleVector(vector: CometVector, arrayAddr: Long, schemaAddr: Long): Unit = { val valueVector = vector.getValueVector val provider = if (valueVector.getField.getDictionary != null) { vector.getDictionaryProvider } else { null } val arrowSchema = ArrowSchema.wrap(schemaAddr) val arrowArray = ArrowArray.wrap(arrayAddr) Data.exportVector( allocator, getFieldVector(valueVector, "export"), provider, arrowArray, arrowSchema) } 

Data.exportVector 使用这个方法使Selection Vector回传到Rust端,

判断是否存在Selection Vectors,通过JNI调用java侧方法hasSelectionVectors

 jni_call!(env, comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean 

get_selection_indices方法说明

 fn get_selection_indices( env: &mut jni::JNIEnv, iter: &JObject, num_cols: usize, ) -> Result<Option<Vec<ArrayRef>>, CometError> { // Check if all columns have selection vectors let has_selection_vectors_result: jni::sys::jboolean = unsafe { jni_call!(env, comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)? }; let has_selection_vectors = has_selection_vectors_result != 0; let selection_indices_arrays = if has_selection_vectors { // Allocate arrays for selection indices export (one per column) let mut indices_array_addrs = Vec::with_capacity(num_cols); let mut indices_schema_addrs = Vec::with_capacity(num_cols); for _ in 0..num_cols { let arrow_array = Rc::new(FFI_ArrowArray::empty()); let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); indices_array_addrs.push(Rc::into_raw(arrow_array) as i64); indices_schema_addrs.push(Rc::into_raw(arrow_schema) as i64); } // Prepare JNI arrays for the export call let indices_array_obj = env.new_long_array(num_cols as jsize)?; let indices_schema_obj = env.new_long_array(num_cols as jsize)?; env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?; env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?; // Export selection indices from JVM let _exported_count: i32 = unsafe { jni_call!(env, comet_batch_iterator(iter).export_selection_indices( JValueGen::Object(JObject::from(indices_array_obj).as_ref()), JValueGen::Object(JObject::from(indices_schema_obj).as_ref()) ) -> i32)? }; // Convert to ArrayRef for easier handling let mut selection_arrays = Vec::with_capacity(num_cols); for i in 0..num_cols { let array_data = ArrayData::from_spark((indices_array_addrs[i], indices_schema_addrs[i]))?; selection_arrays.push(make_array(array_data)); // Drop the references to the FFI arrays unsafe { Rc::from_raw(indices_array_addrs[i] as *const FFI_ArrowArray); Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema); } } Some(selection_arrays) } else { None }; Ok(selection_indices_arrays) } 

Rust侧

 let (num_rows, array_addrs, schema_addrs) = Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?; let mut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols); // Process each column for i in 0..num_cols { let array_ptr = array_addrs[i]; let schema_ptr = schema_addrs[i]; let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; // TODO: validate array input data // array_data.validate_full()?; let array = make_array(array_data); // Apply selection if selection vectors exist (applies to all columns) let array = if let Some(ref selection_arrays) = selection_indices_arrays { let indices = &selection_arrays[i]; // Apply the selection using Arrow's take kernel match take(&*array, &**indices, None) { Ok(selected_array) => selected_array, Err(e) => { return Err(CometError::from(ExecutionError::ArrowError(format!( "Failed to apply selection for column {i}: {e}", )))); } } } else { array }; let array = if arrow_ffi_safe { // ownership of this array has been transferred to native // but we still need to unpack dictionary arrays copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)? } else { // it is necessary to copy the array because the contents may be // overwritten on the JVM side in the future copy_array(&array) }; inputs.push(array); // Drop the Arcs to avoid memory leak unsafe { Rc::from_raw(array_ptr as *const FFI_ArrowArray); Rc::from_raw(schema_ptr as *const FFI_ArrowSchema); } } // If selection was applied, determine the actual row count from the selected arrays let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays { if !selection_arrays.is_empty() { // Use the length of the first selection array as the actual row count selection_arrays[0].len() } else { num_rows as usize } } else { num_rows as usize }; Ok(InputBatch::new(inputs, Some(actual_num_rows))) 

indices为 java数组selectionIndicesArrow vector的表示,便于其他语言能够以零拷贝的方式访问这些数据,后续会被传递给 Native(Rust) 层

 this.indices = CometVector.getVector(indicesVector, values.useDecimal128, values.getDictionaryProvider()); 

Java侧

 public boolean hasSelectionVectors() { if (currentBatch == null) { return false; } // Check if all columns are CometSelectionVector instances for (int i = 0; i < currentBatch.numCols(); i++) { if (!(currentBatch.column(i) instanceof CometSelectionVector)) { return false; } } return true; } 

这其中的 CometSelectionVector就是对应上文中说到的selection vector,具体的代码如下:

 public class CometSelectionVector extends CometVector { /** The original vector containing all values */ private final CometVector values; /** * The valid indices in the values vector. This array is converted into an Arrow vector so we can * transfer the data to native in one JNI call. This is used to represent the rowid mapping used * by Iceberg */ private final int[] selectionIndices; /** * The indices vector containing selection indices. This is currently allocated by the JVM side * unlike the values vector which is allocated on the native side */ private final CometVector indices; /** 

参考

Read more

【算法】BFS解决最短路径问题

【算法】BFS解决最短路径问题

📢博客主页:https://blog.ZEEKLOG.net/2301_779549673 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正! 📢本文由 JohnKi 原创,首发于 ZEEKLOG🙉 📢未来很长,值得我们全力奔赴更美好的生活✨ 文章目录 * 📢前言 * 🏳️‍🌈一、概念 * 🏳️‍🌈二、问题描述 * 🏳️‍🌈三、求解思路 * 🏳️‍🌈四、代码实现 * 🏳️‍🌈例题分析 * ❤️1926. 迷宫中离入口最近的出口 * 🧡433. 最小基因变化 * 👥总结 📢前言 🏳️‍🌈一、概念 **BFS(广度优先搜索)**在图论算法中有着广泛的应用,尤其是在解决最短路径问题上表现出色。本文将详细介绍如何使用 C++ 实现 BFS 来解决最短路径问题。 广度优先搜索是一种用于图遍历的算法,它从起始节点开始,逐步探索其相邻节点,然后再探索相邻节点的相邻节点,

By Ne0inhk
HDFS副本数管理完全指南:如何动态调整副本并评估性能影响

HDFS副本数管理完全指南:如何动态调整副本并评估性能影响

HDFS副本数管理完全指南:如何动态调整副本并评估性能影响 * 引言 * 一、HDFS副本数调整的两种方式 * 1.1 针对已有文件:使用`-setrep`命令(立即生效) * 1.2 修改默认副本因子:配置文件方式(对新文件生效) * 二、副本调整的底层原理与流程 * 2.1 副本调整的整体流程 * 2.2 增加副本时的行为分析 * 2.3 减少副本时的行为分析 * 2.4 等待机制:`-w`参数的作用 * 三、调整副本数对性能的多维影响 * 3.1 影响矩阵总览 * 3.2 详细影响分析 * 增加副本数的正面影响 * 增加副本数的负面影响 * 减少副本数的影响 * 3.3 副本数与集群规模的关系 * 四、不同场景下的副本数配置建议 * 4.

By Ne0inhk
【优选算法 | 优先级队列】从堆实现到解题框架:彻底搞懂优先级队列

【优选算法 | 优先级队列】从堆实现到解题框架:彻底搞懂优先级队列

算法相关知识点可以通过点击以下链接进行学习一起加油!双指针滑动窗口二分查找前缀和位运算模拟链表哈希表字符串模拟栈模拟(非单调栈) 优先级队列(Priority Queue),本质上是一个支持动态插入与按优先级弹出操作的堆结构,是处理这类问题的强力工具。 本文将从底层的堆实现出发,逐步构建出优先级队列的完整解题框架,并结合高频 题目,帮助你真正掌握它在算法实战中的运用。 🌈个人主页:是店小二呀 🌈C/C++专栏:C语言\ C++ 🌈初/高阶数据结构专栏: 初阶数据结构\ 高阶数据结构 🌈Linux专栏: Linux 🌈算法专栏:算法 🌈Mysql专栏:Mysql 🌈你可知:无人扶我青云志 我自踏雪至山巅 文章目录 * 一、铺垫知识 * 1.1 堆排序(Heap Sort) * 1.2 快速选择(QuickSelect)算法解决 Top K 问题 * 3.

By Ne0inhk
【算法通关指南:算法基础篇】高精度专题:一篇破除超数运算问题

【算法通关指南:算法基础篇】高精度专题:一篇破除超数运算问题

🔥小龙报:个人主页 🎬作者简介:C++研发,嵌入式,机器人方向学习者 ❄️个人专栏:《算法通关指南》 ✨ 永远相信美好的事情即将发生 文章目录 * 前言 * 一、高精度 * 二、高精度加法 * 2.1【模板】加法 * 2.1.1题目 * 2.1.2 算法原理 * 2.2.3代码 * 三、高精度减法 * 3.1【模板】减法 * 3.1.1题目 * 3.1.2 算法原理 * 3.2.3代码 * 四、高精度乘法 * 4.1【

By Ne0inhk