From f212fb052a2720c462538ca85e46b5ca7b9f43e9 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Fri, 19 Jun 2026 17:29:25 +0100 Subject: [PATCH] initial Signed-off-by: Mikhail Kot --- encodings/fastlanes/src/lib.rs | 2 +- .../src/e2e_test/vortex_scan_test.rs | 15 + vortex-duckdb/src/exporter/mod.rs | 7 + vortex-duckdb/src/exporter/rle.rs | 292 ++++++++++++++++++ vortex-duckdb/src/exporter/run_end.rs | 27 ++ 5 files changed, 342 insertions(+), 1 deletion(-) create mode 100644 vortex-duckdb/src/exporter/rle.rs diff --git a/encodings/fastlanes/src/lib.rs b/encodings/fastlanes/src/lib.rs index 458b5d7563f..d6babfdd712 100644 --- a/encodings/fastlanes/src/lib.rs +++ b/encodings/fastlanes/src/lib.rs @@ -21,7 +21,7 @@ mod delta; mod r#for; mod rle; -pub(crate) const FL_CHUNK_SIZE: usize = 1024; +pub const FL_CHUNK_SIZE: usize = 1024; use bitpacking::compute::is_constant::BitPackedIsConstantKernel; use r#for::compute::is_constant::FoRIsConstantKernel; diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 0ae72820d18..6cc28483571 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -34,6 +34,7 @@ use vortex::array::validity::Validity; use vortex::buffer::buffer; use vortex::dtype::Nullability; use vortex::dtype::PType; +use vortex::encodings::fastlanes::RLEData; use vortex::file::WriteOptionsSessionExt; use vortex::io::runtime::BlockingRuntime; use vortex::scalar::PValue; @@ -956,6 +957,20 @@ fn test_vortex_encodings_roundtrip() { not(duckdb_release), ignore = "spatial extension requires a release DuckDB build" )] +#[test] +fn test_fastlanes_rle_roundtrip() { + let expected: Vec = (0i32..2048).map(|i| i / 256).collect(); + let file = RUNTIME.block_on(async { + let mut ctx = SESSION.create_execution_ctx(); + let primitive = PrimitiveArray::from_iter(expected.clone()); + let rle = RLEData::encode(primitive.as_view(), &mut ctx).unwrap(); + write_single_column_vortex_file("rle_col", rle.into_array()).await + }); + + let values: Vec = scan_vortex_file::(file, "SELECT rle_col FROM ?", 0).unwrap(); + assert_eq!(values, expected); +} + #[test] fn test_geometry() { let file = RUNTIME.block_on(async { diff --git a/vortex-duckdb/src/exporter/mod.rs b/vortex-duckdb/src/exporter/mod.rs index 9dbeceb7d92..035cce5b8fb 100644 --- a/vortex-duckdb/src/exporter/mod.rs +++ b/vortex-duckdb/src/exporter/mod.rs @@ -15,6 +15,7 @@ mod geo; mod list; mod list_view; mod primitive; +mod rle; mod run_end; mod sequence; mod struct_; @@ -34,6 +35,7 @@ use vortex::array::arrays::List; use vortex::array::arrays::StructArray; use vortex::array::arrays::struct_::StructArrayExt; use vortex::buffer::BitChunks; +use vortex::encodings::fastlanes::RLE; use vortex::encodings::runend::RunEnd; use vortex::encodings::sequence::Sequence; use vortex::error::VortexExpect; @@ -244,6 +246,11 @@ fn new_array_exporter_with_flatten( Err(array) => array, }; + let array = match array.try_downcast::() { + Ok(array) => return rle::new_exporter_with_flatten(array, cache, ctx, flatten), + Err(array) => array, + }; + let array = match array.try_downcast::() { Ok(array) => return dict::new_exporter_with_flatten(&array, cache, ctx, flatten), Err(array) => array, diff --git a/vortex-duckdb/src/exporter/rle.rs b/vortex-duckdb/src/exporter/rle.rs new file mode 100644 index 00000000000..92e491d5442 --- /dev/null +++ b/vortex-duckdb/src/exporter/rle.rs @@ -0,0 +1,292 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::marker::PhantomData; + +use num_traits::AsPrimitive; +use vortex::array::ExecutionCtx; +use vortex::array::IntoArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::match_each_unsigned_integer_ptype; +use vortex::array::validity::Validity; +use vortex::dtype::IntegerPType; +use vortex::encodings::fastlanes::FL_CHUNK_SIZE; +use vortex::encodings::fastlanes::RLEArray; +use vortex::encodings::fastlanes::RLEArrayExt; +use vortex::error::VortexResult; + +use crate::duckdb::ReusableDict; +use crate::duckdb::SelectionVector; +use crate::duckdb::VectorRef; +use crate::exporter::ColumnExporter; +use crate::exporter::all_invalid; +use crate::exporter::cache::ConversionCache; +use crate::exporter::cached_values_dict; +use crate::exporter::canonical; + +struct RLEExporter { + values: ReusableDict, + indices: PrimitiveArray, + values_idx_offsets: PrimitiveArray, + /// Offset relative to the first chunk + offset: usize, + indices_type: PhantomData, + values_idx_offsets_type: PhantomData, +} + +pub(crate) fn new_exporter_with_flatten( + array: RLEArray, + cache: &ConversionCache, + ctx: &mut ExecutionCtx, + flatten: bool, +) -> VortexResult> { + if flatten || array.is_empty() { + return canonical::new_exporter(array.into_array(), cache, ctx); + } + // DuckDB dictionary can't carry validity on codes. + // Don't execute the validity mask, if there's a chance of NULL, + // canonicalize + match array.indices().validity()? { + Validity::AllInvalid => return Ok(all_invalid::new_exporter()), + Validity::Array(_) => return canonical::new_exporter(array.into_array(), cache, ctx), + _ => {} + } + + let indices = array.indices().clone().execute::(ctx)?; + let values = array.values().clone(); + let values_idx_offsets = array + .values_idx_offsets() + .clone() + .execute::(ctx)?; + + let values = cached_values_dict(values, cache, ctx)?; + match_each_unsigned_integer_ptype!(indices.ptype(), |I| { + match_each_unsigned_integer_ptype!(values_idx_offsets.ptype(), |O| { + Ok(Box::new(RLEExporter { + values, + indices, + values_idx_offsets, + offset: array.offset(), + indices_type: PhantomData::, + values_idx_offsets_type: PhantomData::, + })) + }) + }) +} + +impl ColumnExporter for RLEExporter +where + I: IntegerPType + AsPrimitive, + O: IntegerPType + AsPrimitive, +{ + fn export( + &self, + offset: usize, + len: usize, + vector: &mut VectorRef, + _ctx: &mut ExecutionCtx, + ) -> VortexResult<()> { + let mut selection_vec = SelectionVector::with_capacity(len); + let mut selection = unsafe { selection_vec.as_slice_mut(len) }; + + let indices = self.indices.as_slice::(); + let values_idx_offsets = self.values_idx_offsets.as_slice::(); + + let mut pos = self.offset + offset; + let end = pos + len; + + let first_idx_offset = values_idx_offsets[0]; + while pos < end { + let chunk_idx = pos / FL_CHUNK_SIZE; + let base: u32 = (values_idx_offsets[chunk_idx] - first_idx_offset).as_(); + let take = ((chunk_idx + 1) * FL_CHUNK_SIZE).min(end) - pos; + + for (dst, idx) in selection[..take].iter_mut().zip(&indices[pos..pos + take]) { + let idx: u32 = idx.as_(); + *dst = base + idx; + } + + selection = &mut selection[take..]; + pos += take; + } + + vector.reuse_dictionary(&self.values, &selection_vec); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use vortex::array::ArrayRef; + use vortex::array::IntoArray; + use vortex::array::VortexSessionExecute; + use vortex::array::arrays::PrimitiveArray; + use vortex::encodings::fastlanes::RLEArray; + use vortex::encodings::fastlanes::RLEData; + use vortex::error::VortexResult; + + use crate::SESSION; + use crate::cpp::duckdb_type::DUCKDB_TYPE_INTEGER; + use crate::duckdb::DataChunk; + use crate::duckdb::LogicalType; + use crate::exporter::ConversionCache; + use crate::exporter::new_array_exporter; + + fn encode_rle(values: Vec) -> VortexResult { + let mut ctx = SESSION.create_execution_ctx(); + let primitive = PrimitiveArray::from_iter(values); + RLEData::encode(primitive.as_view(), &mut ctx) + } + + fn export_flat(array: ArrayRef, len: usize) -> VortexResult> { + let mut ctx = SESSION.create_execution_ctx(); + let mut chunk = DataChunk::new([LogicalType::new(DUCKDB_TYPE_INTEGER)]); + new_array_exporter(array, &ConversionCache::default(), &mut ctx)?.export( + 0, + len, + chunk.get_vector_mut(0), + &mut ctx, + )?; + chunk.set_len(len); + let vector = chunk.get_vector(0); + vector.flatten(len as u64); + Ok(vector.as_slice_with_len::(len).to_vec()) + } + + #[test] + fn test_roundtrip_two_chunks() -> VortexResult<()> { + let expected: Vec = (0i32..2048).map(|i| i / 100).collect(); + let rle = encode_rle(expected.clone())?; + let exported = export_flat(rle.into_array(), 2048)?; + assert_eq!(exported, expected); + Ok(()) + } + + #[test] + fn test_roundtrip_boundary() -> VortexResult<()> { + let source: Vec = (0i32..2048).map(|i| i / 100).collect(); + let rle = encode_rle(source.clone())?; + let sliced = rle.into_array().slice(500..1700)?; + let exported = export_flat(sliced, 1200)?; + assert_eq!(exported, source[500..1700]); + Ok(()) + } + + #[test] + fn test_roundtrip_slice() -> VortexResult<()> { + let source: Vec = (0i32..3072).map(|i| i / 100).collect(); + let rle = encode_rle(source.clone())?; + let sliced = rle.into_array().slice(1200..2000)?; + let exported = export_flat(sliced, 800)?; + assert_eq!(exported, source[1200..2000]); + Ok(()) + } + + fn chunk_string(array: ArrayRef, offset: usize, len: usize) -> VortexResult { + let mut ctx = SESSION.create_execution_ctx(); + let mut chunk = DataChunk::new([LogicalType::new(DUCKDB_TYPE_INTEGER)]); + new_array_exporter(array, &ConversionCache::default(), &mut ctx)?.export( + offset, + len, + chunk.get_vector_mut(0), + &mut ctx, + )?; + chunk.set_len(len); + String::try_from(&*chunk) + } + + fn two_chunk_rle() -> VortexResult { + let mut ctx = SESSION.create_execution_ctx(); + let source: Vec = std::iter::repeat_n(10i32, 1024) + .chain(std::iter::repeat_n(20, 1024)) + .collect(); + RLEData::encode(PrimitiveArray::from_iter(source).as_view(), &mut ctx) + } + + #[test] + fn test_one_chunk() -> VortexResult<()> { + let rle = two_chunk_rle()?; + let chunk_str = chunk_string(rle.into_array(), 0, 5)?; + assert_eq!( + chunk_str, + r#"Chunk - [1 Columns] +- DICTIONARY INTEGER: 5 = [ 10, 10, 10, 10, 10] +"# + ); + Ok(()) + } + + #[test] + fn test_one_chunk_nulls() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let source = vec![Some(0u32), Some(1), None, Some(3), None]; + let rle = RLEData::encode(PrimitiveArray::from_option_iter(source).as_view(), &mut ctx)?; + let chunk_str = chunk_string(rle.into_array(), 0, 5)?; + assert_eq!( + chunk_str, + r#"Chunk - [1 Columns] +- FLAT INTEGER: 5 = [ 0, 1, NULL, 3, NULL] +"# + ); + Ok(()) + } + + #[test] + fn test_chunk_boundary() -> VortexResult<()> { + let rle = two_chunk_rle()?; + let chunk_str = chunk_string(rle.into_array(), 1020, 10)?; + assert_eq!( + chunk_str, + r#"Chunk - [1 Columns] +- DICTIONARY INTEGER: 10 = [ 10, 10, 10, 10, 20, 20, 20, 20, 20, 20] +"# + ); + Ok(()) + } + + #[test] + fn test_chunk_slice() -> VortexResult<()> { + let rle = two_chunk_rle()?; + let sliced = rle.into_array().slice(1500..1510)?; + let chunk_str = chunk_string(sliced, 0, 10)?; + assert_eq!( + chunk_str, + r#"Chunk - [1 Columns] +- FLAT INTEGER: 10 = [ 20, 20, 20, 20, 20, 20, 20, 20, 20, 20] +"# + ); + Ok(()) + } + + #[test] + fn test_roundtrip_with_nulls() -> VortexResult<()> { + let source: Vec> = (0i32..1024) + .map(|i| if i % 7 == 0 { None } else { Some(i / 50) }) + .collect(); + let mut ctx = SESSION.create_execution_ctx(); + let primitive = PrimitiveArray::from_option_iter(source.clone()); + let rle = RLEData::encode(primitive.as_view(), &mut ctx)?; + + let mut chunk = DataChunk::new([LogicalType::new(DUCKDB_TYPE_INTEGER)]); + new_array_exporter(rle.into_array(), &ConversionCache::default(), &mut ctx)?.export( + 0, + 1024, + chunk.get_vector_mut(0), + &mut ctx, + )?; + chunk.set_len(1024); + + let vector = chunk.get_vector(0); + vector.flatten(1024); + let slice = vector.as_slice_with_len::(1024); + for (i, expected) in source.iter().enumerate() { + if let Some(v) = expected { + assert!(!vector.row_is_null(i as u64), "row {i} is null"); + assert_eq!(slice[i], *v); + } else { + assert!(vector.row_is_null(i as u64), "row {i} not null"); + } + } + Ok(()) + } +} diff --git a/vortex-duckdb/src/exporter/run_end.rs b/vortex-duckdb/src/exporter/run_end.rs index c8b3d51ba02..2c499d700a3 100644 --- a/vortex-duckdb/src/exporter/run_end.rs +++ b/vortex-duckdb/src/exporter/run_end.rs @@ -135,10 +135,37 @@ mod tests { use vortex::error::VortexResult; use crate::SESSION; + use crate::cpp::duckdb_type::DUCKDB_TYPE_INTEGER; use crate::duckdb::DataChunk; use crate::duckdb::LogicalType; use crate::exporter::ArrayExporter; use crate::exporter::ConversionCache; + use crate::exporter::new_array_exporter; + + #[test] + fn test_one_chunk_null() -> VortexResult<()> { + let mut ctx = SESSION.create_execution_ctx(); + let source = vec![Some(0u32), Some(1), None, Some(3), None]; + let array = PrimitiveArray::from_option_iter(source); + let array = RunEnd::encode(array.into_array(), &mut ctx)?; + + let mut chunk = DataChunk::new([LogicalType::new(DUCKDB_TYPE_INTEGER)]); + new_array_exporter(array.into_array(), &ConversionCache::default(), &mut ctx)?.export( + 0, + 5, + chunk.get_vector_mut(0), + &mut ctx, + )?; + chunk.set_len(5); + let chunk_str = String::try_from(&*chunk)?; + assert_eq!( + chunk_str, + r#"Chunk - [1 Columns] +- DICTIONARY INTEGER: 5 = [ 0, 1, NULL, 3, NULL] +"# + ); + Ok(()) + } #[test] fn run_end_with_chunked_values_exports_across_value_chunks() -> VortexResult<()> {