fnget_selection_indices(
env: &mut jni::JNIEnv,
iter: &JObject,
num_cols: usize,
) ->Result<Option<Vec<ArrayRef>>, CometError> {
// Check if all columns have selection vectorslethas_selection_vectors_result: jni::sys::jboolean = unsafe {
jni_call!(env, comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)?
};
lethas_selection_vectors = has_selection_vectors_result != 0;
letselection_indices_arrays = if has_selection_vectors {
// Allocate arrays for selection indices export (one per column)letmut indices_array_addrs = Vec::with_capacity(num_cols);
letmut indices_schema_addrs = Vec::with_capacity(num_cols);
for_in0..num_cols {
letarrow_array = Rc::new(FFI_ArrowArray::empty());
letarrow_schema = Rc::new(FFI_ArrowSchema::empty());
indices_array_addrs.push(Rc::into_raw(arrow_array) asi64);
indices_schema_addrs.push(Rc::into_raw(arrow_schema) asi64);
}
// Prepare JNI arrays for the export callletindices_array_obj = env.new_long_array(num_cols as jsize)?;
letindices_schema_obj = env.new_long_array(num_cols as jsize)?;
env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?;
env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?;
// Export selection indices from JVMlet_exported_count: i32 = unsafe {
jni_call!(env, comet_batch_iterator(iter).export_selection_indices(
JValueGen::Object(JObject::from(indices_array_obj).as_ref()),
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
) ->i32)?
};
// Convert to ArrayRef for easier handlingletmut selection_arrays = Vec::with_capacity(num_cols);
foriin0..num_cols {
letarray_data = ArrayData::from_spark((indices_array_addrs[i], indices_schema_addrs[i]))?;
selection_arrays.push(make_array(array_data));
// Drop the references to the FFI arraysunsafe {
Rc::from_raw(indices_array_addrs[i] as *const FFI_ArrowArray);
Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema);
}
}
Some(selection_arrays)
} else {
None
};
Ok(selection_indices_arrays)
}
Rust 侧处理逻辑
let (num_rows, array_addrs, schema_addrs) = Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
letmut inputs: Vec<ArrayRef> = Vec::with_capacity(num_cols);
// Process each columnforiin0..num_cols {
letarray_ptr = array_addrs[i];
letschema_ptr = schema_addrs[i];
letarray_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
// TODO: validate array input data// array_data.validate_full()?;letarray = make_array(array_data);
// Apply selection if selection vectors exist (applies to all columns)letarray = ifletSome(ref selection_arrays) = selection_indices_arrays {
letindices = &selection_arrays[i];
// Apply the selection using Arrow's take kernelmatchtake(&*array, &**indices, None) {
Ok(selected_array) => selected_array,
Err(e) => {
returnErr(CometError::from(ExecutionError::ArrowError(format!(
"Failed to apply selection for column {}: {}",
i, e
))));
}
}
} else {
array
};
letarray = if arrow_ffi_safe {
// ownership of this array has been transferred to native// but we still need to unpack dictionary arrayscopy_or_unpack_array(&array, &CopyMode::UnpackOrClone)?
} else {
// it is necessary to copy the array because the contents may be// overwritten on the JVM side in the futurecopy_array(&array)
};
inputs.push(array);
// Drop the Arcs to avoid memory leakunsafe {
Rc::from_raw(array_ptr as *const FFI_ArrowArray);
Rc::from_raw(schema_ptr as *const FFI_ArrowSchema);
}
}
// If selection was applied, determine the actual row count from the selected arraysletactual_num_rows = ifletSome(ref selection_arrays) = selection_indices_arrays {
if !selection_arrays.is_empty() {
// Use the length of the first selection array as the actual row count
selection_arrays[0].len()
} else {
num_rows asusize
}
} else {
num_rows asusize
};
Ok(InputBatch::new(inputs, Some(actual_num_rows)))
publicclassCometSelectionVectorextendsCometVector {
/** The original vector containing all values */privatefinal CometVector values;
/**
* The valid indices in the values vector. This array is converted into an Arrow vector so we can
* transfer the data to native in one JNI call. This is used to represent the rowid mapping used
* by Iceberg
*/privatefinalint[] selectionIndices;
/**
* The indices vector containing selection indices. This is currently allocated by the JVM side
* unlike the values vector which is allocated on the native side
*/privatefinal CometVector indices;
// ...
}