diff --git a/Cargo.toml b/Cargo.toml index 2db0379b0657..7029c2869d9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] exclude = ["datafusion-cli"] -members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/proto/gen", "datafusion/row", "datafusion/sql", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks", +members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks", ] resolver = "2" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 862d41a889f3..a95a8266052f 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -67,7 +67,6 @@ datafusion-execution = { path = "../execution", version = "27.0.0" } datafusion-expr = { path = "../expr", version = "27.0.0" } datafusion-optimizer = { path = "../optimizer", version = "27.0.0", default-features = false } datafusion-physical-expr = { path = "../physical-expr", version = "27.0.0", default-features = false } -datafusion-row = { path = "../row", version = "27.0.0" } datafusion-sql = { path = "../sql", version = "27.0.0" } flate2 = { version = "1.0.24", optional = true } futures = "0.3" diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index ad2435f9c146..9542ffe8ffbb 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -463,11 +463,6 @@ pub mod physical_expr { pub use datafusion_physical_expr::*; } -/// re-export of [`datafusion_row`] crate -pub mod row { - pub use datafusion_row::*; -} - /// re-export of [`datafusion_sql`] crate pub mod sql { pub use datafusion_sql::*; diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs deleted file mode 100644 index c68b422a4f06..000000000000 --- a/datafusion/core/tests/row.rs +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::physical_plan::FileScanConfig; -use datafusion::error::Result; -use datafusion::execution::context::SessionState; -use datafusion::physical_plan::{collect, ExecutionPlan}; -use datafusion::prelude::SessionContext; -use datafusion_row::reader::read_as_batch; -use datafusion_row::writer::write_batch_unchecked; -use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; -use std::sync::Arc; - -#[tokio::test] -async fn test_with_parquet() -> Result<()> { - let ctx = SessionContext::new(); - let state = ctx.state(); - let task_ctx = state.task_ctx(); - let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = - get_exec(&state, "alltypes_plain.parquet", projection.as_ref(), None).await?; - let schema = exec.schema().clone(); - - let batches = collect(exec, task_ctx).await?; - assert_eq!(1, batches.len()); - let batch = &batches[0]; - - let mut vector = vec![0; 20480]; - let row_offsets = { write_batch_unchecked(&mut vector, 0, batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(*batch, output_batch); - - Ok(()) -} - -async fn get_exec( - state: &SessionState, - file_name: &str, - projection: Option<&Vec>, - limit: Option, -) -> Result> { - let testdata = datafusion::test_util::parquet_test_data(); - let filename = format!("{testdata}/{file_name}"); - - let path = Path::from_filesystem_path(filename).unwrap(); - - let format = ParquetFormat::default(); - let object_store = Arc::new(LocalFileSystem::new()) as Arc; - let object_store_url = ObjectStoreUrl::local_filesystem(); - - let meta = object_store.head(&path).await.unwrap(); - - let file_schema = format - .infer_schema(state, &object_store, &[meta.clone()]) - .await - .expect("Schema inference"); - let statistics = format - .infer_stats(state, &object_store, file_schema.clone(), &meta) - .await - .expect("Stats inference"); - let file_groups = vec![vec![meta.into()]]; - let exec = format - .create_physical_plan( - state, - FileScanConfig { - object_store_url, - file_schema, - file_groups, - statistics, - projection: projection.cloned(), - limit, - table_partition_cols: vec![], - output_ordering: vec![], - infinite_source: false, - }, - None, - ) - .await?; - Ok(exec) -} diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index b7ffa1810cce..02958a543c37 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -54,7 +54,6 @@ blake3 = { version = "1.0", optional = true } chrono = { version = "0.4.23", default-features = false } datafusion-common = { path = "../common", version = "27.0.0" } datafusion-expr = { path = "../expr", version = "27.0.0" } -datafusion-row = { path = "../row", version = "27.0.0" } half = { version = "2.1", default-features = false } hashbrown = { version = "0.14", features = ["raw"] } hex = { version = "0.4", optional = true } diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml deleted file mode 100644 index 4d34a5da2fd1..000000000000 --- a/datafusion/row/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "datafusion-row" -description = "Row backed by raw bytes for DataFusion query engine" -keywords = [ "arrow", "query", "sql" ] -version = { workspace = true } -edition = { workspace = true } -readme = { workspace = true } -homepage = { workspace = true } -repository = { workspace = true } -license = { workspace = true } -authors = { workspace = true } -rust-version = { workspace = true } - -[lib] -name = "datafusion_row" -path = "src/lib.rs" - -[dependencies] -arrow = { workspace = true } -datafusion-common = { path = "../common", version = "27.0.0" } -paste = "^1.0" -rand = "0.8" diff --git a/datafusion/row/README.md b/datafusion/row/README.md deleted file mode 100644 index eef4dfd554e3..000000000000 --- a/datafusion/row/README.md +++ /dev/null @@ -1,29 +0,0 @@ - - -# DataFusion Row - -[DataFusion](df) is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. - -This crate is a submodule of DataFusion that provides an optimized row based format for row-based operations. - -See the documentation in [`lib.rs`] for more details. - -[df]: https://crates.io/crates/datafusion -[`lib.rs`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion/row/src/lib.rs diff --git a/datafusion/row/src/accessor.rs b/datafusion/row/src/accessor.rs deleted file mode 100644 index a0b5a70df993..000000000000 --- a/datafusion/row/src/accessor.rs +++ /dev/null @@ -1,384 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`RowAccessor`] provides a Read/Write/Modify access for row with all fixed-sized fields: - -use crate::layout::RowLayout; -use crate::validity::NullBitsFormatter; -use crate::{fn_get_idx, fn_get_idx_opt, fn_set_idx}; -use arrow::datatypes::{DataType, Schema}; -use arrow::util::bit_util::{get_bit_raw, set_bit_raw}; -use datafusion_common::ScalarValue; -use std::ops::{BitAnd, BitOr, BitXor}; -use std::sync::Arc; - -//TODO: DRY with reader and writer - -/// Provides read/write/modify access to a tuple stored in Row format -/// at `data[base_offset..]` -/// -/// ```text -/// Set / Update data -/// in [u8] -/// ─ ─ ─ ─ ─ ─ ─ ┐ Read data out as native -/// │ types or ScalarValues -/// │ -/// │ ┌───────────────────────┐ -/// │ │ -/// └ ▶│ [u8] │─ ─ ─ ─ ─ ─ ─ ─▶ -/// │ │ -/// └───────────────────────┘ -/// ``` -pub struct RowAccessor<'a> { - /// Layout on how to read each field - layout: Arc, - /// Raw bytes slice where the tuple stores - data: &'a mut [u8], - /// Start position for the current tuple in the raw bytes slice. - base_offset: usize, -} - -impl<'a> std::fmt::Debug for RowAccessor<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.null_free() { - write!(f, "null_free") - } else { - let null_bits = self.null_bits(); - write!( - f, - "{:?}", - NullBitsFormatter::new(null_bits, self.layout.field_count) - ) - } - } -} - -#[macro_export] -macro_rules! fn_add_idx { - ($NATIVE: ident) => { - paste::item! { - /// add field at `idx` with `value` - #[inline(always)] - pub fn [](&mut self, idx: usize, value: $NATIVE) { - if self.is_valid_at(idx) { - self.[](idx, value + self.[](idx)); - } else { - self.set_non_null_at(idx); - self.[](idx, value); - } - } - } - }; -} - -macro_rules! fn_max_min_idx { - ($NATIVE: ident, $OP: ident) => { - paste::item! { - /// check max then update - #[inline(always)] - pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { - if self.is_valid_at(idx) { - let v = value.$OP(self.[](idx)); - self.[](idx, v); - } else { - self.set_non_null_at(idx); - self.[](idx, value); - } - } - } - }; -} - -macro_rules! fn_bit_and_or_xor_idx { - ($NATIVE: ident, $OP: ident) => { - paste::item! { - /// check bit_and then update - #[inline(always)] - pub fn [<$OP _ $NATIVE>](&mut self, idx: usize, value: $NATIVE) { - if self.is_valid_at(idx) { - let v = value.$OP(self.[](idx)); - self.[](idx, v); - } else { - self.set_non_null_at(idx); - self.[](idx, value); - } - } - } - }; -} - -macro_rules! fn_get_idx_scalar { - ($NATIVE: ident, $SCALAR:ident) => { - paste::item! { - #[inline(always)] - pub fn [](&self, idx: usize) -> ScalarValue { - if self.is_valid_at(idx) { - ScalarValue::$SCALAR(Some(self.[](idx))) - } else { - ScalarValue::$SCALAR(None) - } - } - } - }; -} - -impl<'a> RowAccessor<'a> { - /// new - pub fn new(schema: &Schema) -> Self { - Self { - layout: Arc::new(RowLayout::new(schema)), - data: &mut [], - base_offset: 0, - } - } - - pub fn new_from_layout(layout: Arc) -> Self { - Self { - layout, - data: &mut [], - base_offset: 0, - } - } - - /// Update this row to point to position `offset` in `base` - pub fn point_to(&mut self, offset: usize, data: &'a mut [u8]) { - self.base_offset = offset; - self.data = data; - } - - #[inline] - fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.layout.field_count); - } - - #[inline(always)] - fn field_offsets(&self) -> &[usize] { - &self.layout.field_offsets - } - - #[inline(always)] - fn null_free(&self) -> bool { - self.layout.null_free - } - - #[inline(always)] - fn null_bits(&self) -> &[u8] { - if self.null_free() { - &[] - } else { - let start = self.base_offset; - &self.data[start..start + self.layout.null_width] - } - } - - fn is_valid_at(&self, idx: usize) -> bool { - unsafe { get_bit_raw(self.null_bits().as_ptr(), idx) } - } - - // ------------------------------ - // ----- Fixed Sized getters ---- - // ------------------------------ - - fn get_bool(&self, idx: usize) -> bool { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - let value = &self.data[self.base_offset + offset..]; - value[0] != 0 - } - - fn get_u8(&self, idx: usize) -> u8 { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[self.base_offset + offset] - } - - fn_get_idx!(u16, 2); - fn_get_idx!(u32, 4); - fn_get_idx!(u64, 8); - fn_get_idx!(i8, 1); - fn_get_idx!(i16, 2); - fn_get_idx!(i32, 4); - fn_get_idx!(i64, 8); - fn_get_idx!(f32, 4); - fn_get_idx!(f64, 8); - fn_get_idx!(i128, 16); - - fn_get_idx_opt!(bool); - fn_get_idx_opt!(u8); - fn_get_idx_opt!(u16); - fn_get_idx_opt!(u32); - fn_get_idx_opt!(u64); - fn_get_idx_opt!(i8); - fn_get_idx_opt!(i16); - fn_get_idx_opt!(i32); - fn_get_idx_opt!(i64); - fn_get_idx_opt!(f32); - fn_get_idx_opt!(f64); - fn_get_idx_opt!(i128); - - fn_get_idx_scalar!(bool, Boolean); - fn_get_idx_scalar!(u8, UInt8); - fn_get_idx_scalar!(u16, UInt16); - fn_get_idx_scalar!(u32, UInt32); - fn_get_idx_scalar!(u64, UInt64); - fn_get_idx_scalar!(i8, Int8); - fn_get_idx_scalar!(i16, Int16); - fn_get_idx_scalar!(i32, Int32); - fn_get_idx_scalar!(i64, Int64); - fn_get_idx_scalar!(f32, Float32); - fn_get_idx_scalar!(f64, Float64); - - fn get_decimal128_scalar(&self, idx: usize, p: u8, s: i8) -> ScalarValue { - if self.is_valid_at(idx) { - ScalarValue::Decimal128(Some(self.get_i128(idx)), p, s) - } else { - ScalarValue::Decimal128(None, p, s) - } - } - - pub fn get_as_scalar(&self, dt: &DataType, index: usize) -> ScalarValue { - match dt { - DataType::Boolean => self.get_bool_scalar(index), - DataType::Int8 => self.get_i8_scalar(index), - DataType::Int16 => self.get_i16_scalar(index), - DataType::Int32 => self.get_i32_scalar(index), - DataType::Int64 => self.get_i64_scalar(index), - DataType::UInt8 => self.get_u8_scalar(index), - DataType::UInt16 => self.get_u16_scalar(index), - DataType::UInt32 => self.get_u32_scalar(index), - DataType::UInt64 => self.get_u64_scalar(index), - DataType::Float32 => self.get_f32_scalar(index), - DataType::Float64 => self.get_f64_scalar(index), - DataType::Decimal128(p, s) => self.get_decimal128_scalar(index, *p, *s), - _ => unreachable!(), - } - } - - // ------------------------------ - // ----- Fixed Sized setters ---- - // ------------------------------ - - pub(crate) fn set_non_null_at(&mut self, idx: usize) { - assert!( - !self.null_free(), - "Unexpected call to set_non_null_at on null-free row writer" - ); - let null_bits = &mut self.data[0..self.layout.null_width]; - unsafe { - set_bit_raw(null_bits.as_mut_ptr(), idx); - } - } - - fn set_bool(&mut self, idx: usize, value: bool) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = u8::from(value); - } - - fn set_u8(&mut self, idx: usize, value: u8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value; - } - - fn_set_idx!(u16, 2); - fn_set_idx!(u32, 4); - fn_set_idx!(u64, 8); - fn_set_idx!(i16, 2); - fn_set_idx!(i32, 4); - fn_set_idx!(i64, 8); - fn_set_idx!(f32, 4); - fn_set_idx!(f64, 8); - fn_set_idx!(i128, 16); - - fn set_i8(&mut self, idx: usize, value: i8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value.to_le_bytes()[0]; - } - - // ------------------------------ - // ---- Fixed sized updaters ---- - // ------------------------------ - - fn_add_idx!(u8); - fn_add_idx!(u16); - fn_add_idx!(u32); - fn_add_idx!(u64); - fn_add_idx!(i8); - fn_add_idx!(i16); - fn_add_idx!(i32); - fn_add_idx!(i64); - fn_add_idx!(f32); - fn_add_idx!(f64); - fn_add_idx!(i128); - - fn_max_min_idx!(bool, max); - fn_max_min_idx!(u8, max); - fn_max_min_idx!(u16, max); - fn_max_min_idx!(u32, max); - fn_max_min_idx!(u64, max); - fn_max_min_idx!(i8, max); - fn_max_min_idx!(i16, max); - fn_max_min_idx!(i32, max); - fn_max_min_idx!(i64, max); - fn_max_min_idx!(f32, max); - fn_max_min_idx!(f64, max); - fn_max_min_idx!(i128, max); - - fn_max_min_idx!(bool, min); - fn_max_min_idx!(u8, min); - fn_max_min_idx!(u16, min); - fn_max_min_idx!(u32, min); - fn_max_min_idx!(u64, min); - fn_max_min_idx!(i8, min); - fn_max_min_idx!(i16, min); - fn_max_min_idx!(i32, min); - fn_max_min_idx!(i64, min); - fn_max_min_idx!(f32, min); - fn_max_min_idx!(f64, min); - fn_max_min_idx!(i128, min); - - fn_bit_and_or_xor_idx!(bool, bitand); - fn_bit_and_or_xor_idx!(u8, bitand); - fn_bit_and_or_xor_idx!(u16, bitand); - fn_bit_and_or_xor_idx!(u32, bitand); - fn_bit_and_or_xor_idx!(u64, bitand); - fn_bit_and_or_xor_idx!(i8, bitand); - fn_bit_and_or_xor_idx!(i16, bitand); - fn_bit_and_or_xor_idx!(i32, bitand); - fn_bit_and_or_xor_idx!(i64, bitand); - - fn_bit_and_or_xor_idx!(bool, bitor); - fn_bit_and_or_xor_idx!(u8, bitor); - fn_bit_and_or_xor_idx!(u16, bitor); - fn_bit_and_or_xor_idx!(u32, bitor); - fn_bit_and_or_xor_idx!(u64, bitor); - fn_bit_and_or_xor_idx!(i8, bitor); - fn_bit_and_or_xor_idx!(i16, bitor); - fn_bit_and_or_xor_idx!(i32, bitor); - fn_bit_and_or_xor_idx!(i64, bitor); - - fn_bit_and_or_xor_idx!(u8, bitxor); - fn_bit_and_or_xor_idx!(u16, bitxor); - fn_bit_and_or_xor_idx!(u32, bitxor); - fn_bit_and_or_xor_idx!(u64, bitxor); - fn_bit_and_or_xor_idx!(i8, bitxor); - fn_bit_and_or_xor_idx!(i16, bitxor); - fn_bit_and_or_xor_idx!(i32, bitxor); - fn_bit_and_or_xor_idx!(i64, bitxor); -} diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs deleted file mode 100644 index 71471327536a..000000000000 --- a/datafusion/row/src/layout.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Various row layouts for different use case - -use crate::schema_null_free; -use arrow::datatypes::{DataType, Schema}; -use arrow::util::bit_util::{ceil, round_upto_power_of_2}; - -/// Row layout stores one or multiple 8-byte word(s) per field for CPU-friendly -/// and efficient processing. -/// -/// It is mainly used to represent the rows with frequently updated content, -/// for example, grouping state for hash aggregation. -/// -/// Each tuple consists of two parts: "`null bit set`" and "`values`". -/// -/// For null-free tuples, the null bit set can be omitted. -/// -/// The null bit set, when present, is aligned to 8 bytes. It stores one bit per field. -/// -/// In the region of the values, we store the fields in the order they are defined in the schema. -/// Each field is stored in one or multiple 8-byte words. -/// -/// ```plaintext -/// ┌─────────────────┬─────────────────────┐ -/// │Validity Bitmask │ Fields │ -/// │ (8-byte aligned)│ (8-byte words) │ -/// └─────────────────┴─────────────────────┘ -/// ``` -/// -/// For example, given the schema (Int8, Float32, Int64) with a null-free tuple -/// -/// Encoding the tuple (1, 3.14, 42) -/// -/// Requires 24 bytes (3 fields * 8 bytes each): -/// -/// ```plaintext -/// ┌──────────────────────┬──────────────────────┬──────────────────────┐ -/// │ 0x01 │ 0x4048F5C3 │ 0x0000002A │ -/// └──────────────────────┴──────────────────────┴──────────────────────┘ -/// 0 8 16 24 -/// ``` -/// -/// If the schema allows null values and the tuple is (1, NULL, 42) -/// -/// Encoding the tuple requires 32 bytes (1 * 8 bytes for the null bit set + 3 fields * 8 bytes each): -/// -/// ```plaintext -/// ┌──────────────────────────┬──────────────────────┬──────────────────────┬──────────────────────┐ -/// │ 0b00000101 │ 0x01 │ 0x00000000 │ 0x0000002A │ -/// │ (7 bytes padding after) │ │ │ │ -/// └──────────────────────────┴──────────────────────┴──────────────────────┴──────────────────────┘ -/// 0 8 16 24 32 -/// ``` -#[derive(Debug, Clone)] -pub struct RowLayout { - /// If a row is null free according to its schema - pub(crate) null_free: bool, - /// The number of bytes used to store null bits for each field. - pub(crate) null_width: usize, - /// Length in bytes for `values` part of the current tuple. - pub(crate) values_width: usize, - /// Total number of fields for each tuple. - pub(crate) field_count: usize, - /// Starting offset for each fields in the raw bytes. - pub(crate) field_offsets: Vec, -} - -impl RowLayout { - /// new - pub fn new(schema: &Schema) -> Self { - assert!( - row_supported(schema), - "Row with {schema:?} not supported yet.", - ); - let null_free = schema_null_free(schema); - let field_count = schema.fields().len(); - let null_width = if null_free { - 0 - } else { - round_upto_power_of_2(ceil(field_count, 8), 8) - }; - let (field_offsets, values_width) = word_aligned_offsets(null_width, schema); - Self { - null_free, - null_width, - values_width, - field_count, - field_offsets, - } - } - - /// Get fixed part width for this layout - #[inline(always)] - pub fn fixed_part_width(&self) -> usize { - self.null_width + self.values_width - } -} - -fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usize) { - let mut offsets = vec![]; - let mut offset = null_width; - for f in schema.fields() { - offsets.push(offset); - assert!(!matches!(f.data_type(), DataType::Decimal256(_, _))); - // All of the current support types can fit into one single 8-bytes word except for Decimal128. - // For Decimal128, its width is of two 8-bytes words. - match f.data_type() { - DataType::Decimal128(_, _) => offset += 16, - _ => offset += 8, - } - } - (offsets, offset - null_width) -} - -/// Return true of data in `schema` can be converted to raw-bytes -/// based rows. -/// -/// Note all schemas can be supported in the row format -pub fn row_supported(schema: &Schema) -> bool { - schema.fields().iter().all(|f| { - let dt = f.data_type(); - use DataType::*; - matches!( - dt, - Boolean - | UInt8 - | UInt16 - | UInt32 - | UInt64 - | Int8 - | Int16 - | Int32 - | Int64 - | Float32 - | Float64 - | Date32 - | Date64 - | Decimal128(_, _) - ) - }) -} diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs deleted file mode 100644 index 902fa881b19b..000000000000 --- a/datafusion/row/src/lib.rs +++ /dev/null @@ -1,303 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! This module contains code to translate arrays back and forth to a -//! row based format. The row based format is backed by raw bytes -//! ([`[u8]`]) and used to optimize certain operations. -//! -//! In general, DataFusion is a so called "vectorized" execution -//! model, specifically it uses the optimized calculation kernels in -//! [`arrow`] to amortize dispatch overhead. -//! -//! However, as mentioned in [this paper], there are some "row -//! oriented" operations in a database that are not typically amenable -//! to vectorization. The "classics" are: hash table updates in joins -//! and hash aggregates, as well as comparing tuples in sort / -//! merging. -//! -//! [this paper]: https://db.in.tum.de/~kersten/vectorization_vs_compilation.pdf - -use arrow::array::{make_builder, ArrayBuilder, ArrayRef}; -use arrow::datatypes::Schema; -use arrow::error::Result as ArrowResult; -use arrow::record_batch::RecordBatch; -pub use layout::row_supported; -use std::sync::Arc; - -pub mod accessor; -pub mod layout; -pub mod reader; -mod validity; -pub mod writer; - -/// Tell if schema contains no nullable field -pub(crate) fn schema_null_free(schema: &Schema) -> bool { - schema.fields().iter().all(|f| !f.is_nullable()) -} - -/// Columnar Batch buffer that assists creating `RecordBatches` -pub struct MutableRecordBatch { - arrays: Vec>, - schema: Arc, -} - -impl MutableRecordBatch { - /// new - pub fn new(target_batch_size: usize, schema: Arc) -> Self { - let arrays = new_arrays(&schema, target_batch_size); - Self { arrays, schema } - } - - /// Finalize the batch, output and reset this buffer - pub fn output(&mut self) -> ArrowResult { - let result = make_batch(self.schema.clone(), self.arrays.drain(..).collect()); - result - } - - pub fn output_as_columns(&mut self) -> Vec { - get_columns(self.arrays.drain(..).collect()) - } -} - -fn new_arrays(schema: &Schema, batch_size: usize) -> Vec> { - schema - .fields() - .iter() - .map(|field| { - let dt = field.data_type(); - make_builder(dt, batch_size) - }) - .collect::>() -} - -fn make_batch( - schema: Arc, - mut arrays: Vec>, -) -> ArrowResult { - let columns = arrays.iter_mut().map(|array| array.finish()).collect(); - RecordBatch::try_new(schema, columns) -} - -fn get_columns(mut arrays: Vec>) -> Vec { - arrays.iter_mut().map(|array| array.finish()).collect() -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::layout::RowLayout; - use crate::reader::read_as_batch; - use crate::writer::write_batch_unchecked; - use arrow::record_batch::RecordBatch; - use arrow::{array::*, datatypes::*}; - use datafusion_common::Result; - use DataType::*; - - macro_rules! fn_test_single_type { - ($ARRAY: ident, $TYPE: expr, $VEC: expr) => { - paste::item! { - #[test] - #[allow(non_snake_case)] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, true)])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let a = $ARRAY::from($VEC); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[allow(non_snake_case)] - fn []() -> Result<()> { - let schema = Arc::new(Schema::new(vec![Field::new("a", $TYPE, false)])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let v = $VEC.into_iter().filter(|o| o.is_some()).collect::>(); - let a = $ARRAY::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - } - }; - } - - fn_test_single_type!( - BooleanArray, - Boolean, - vec![Some(true), Some(false), None, Some(true), None] - ); - - fn_test_single_type!( - Int8Array, - Int8, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int16Array, - Int16, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int32Array, - Int32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Int64Array, - Int64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt8Array, - UInt8, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt16Array, - UInt16, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt32Array, - UInt32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - UInt64Array, - UInt64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Float32Array, - Float32, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] - ); - - fn_test_single_type!( - Float64Array, - Float64, - vec![Some(5.0), Some(7.0), None, Some(0.0), Some(111.0)] - ); - - fn_test_single_type!( - Date32Array, - Date32, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - fn_test_single_type!( - Date64Array, - Date64, - vec![Some(5), Some(7), None, Some(0), Some(111)] - ); - - #[test] - fn test_single_decimal128() -> Result<()> { - let v = vec![ - Some(0), - Some(1), - None, - Some(-1), - Some(i128::MIN), - Some(i128::MAX), - ]; - let schema = - Arc::new(Schema::new(vec![Field::new("a", Decimal128(38, 10), true)])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let a = Decimal128Array::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - fn test_single_decimal128_null_free() -> Result<()> { - let v = vec![ - Some(0), - Some(1), - None, - Some(-1), - Some(i128::MIN), - Some(i128::MAX), - ]; - let schema = Arc::new(Schema::new(vec![Field::new( - "a", - Decimal128(38, 10), - false, - )])); - let record_width = RowLayout::new(schema.as_ref()).fixed_part_width(); - let v = v.into_iter().filter(|o| o.is_some()).collect::>(); - let a = Decimal128Array::from(v); - let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a)])?; - let mut vector = vec![0; record_width * batch.num_rows()]; - let row_offsets = - { write_batch_unchecked(&mut vector, 0, &batch, 0, schema.clone()) }; - let output_batch = { read_as_batch(&vector, schema, &row_offsets)? }; - assert_eq!(batch, output_batch); - Ok(()) - } - - #[test] - #[should_panic(expected = "not supported yet")] - fn test_unsupported_type() { - let a: ArrayRef = Arc::new(StringArray::from(vec!["hello", "world"])); - let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let schema = batch.schema(); - let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema); - } - - #[test] - #[should_panic(expected = "not supported yet")] - fn test_unsupported_type_write() { - let a: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![8, 7, 6, 5, 8])); - let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap(); - let schema = batch.schema(); - let mut vector = vec![0; 1024]; - write_batch_unchecked(&mut vector, 0, &batch, 0, schema); - } - - #[test] - #[should_panic(expected = "not supported yet")] - fn test_unsupported_type_read() { - let schema = Arc::new(Schema::new(vec![Field::new("a", Utf8, false)])); - let vector = vec![0; 1024]; - let row_offsets = vec![0]; - read_as_batch(&vector, schema, &row_offsets).unwrap(); - } -} diff --git a/datafusion/row/src/reader.rs b/datafusion/row/src/reader.rs deleted file mode 100644 index 10c9896df70a..000000000000 --- a/datafusion/row/src/reader.rs +++ /dev/null @@ -1,366 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`read_as_batch`] converts raw bytes to [`RecordBatch`] - -use crate::layout::RowLayout; -use crate::validity::{all_valid, NullBitsFormatter}; -use crate::MutableRecordBatch; -use arrow::array::*; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::get_bit_raw; -use datafusion_common::{DataFusionError, Result}; -use std::sync::Arc; - -/// Read raw-bytes from `data` rows starting at `offsets` out to a [`RecordBatch`] -/// -/// -/// ```text -/// Read data to RecordBatch ┌──────────────────┐ -/// │ │ -/// │ │ -/// ┌───────────────────────┐ │ │ -/// │ │ │ RecordBatch │ -/// │ [u8] │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ▶│ │ -/// │ │ │ (... N Rows ...) │ -/// └───────────────────────┘ │ │ -/// │ │ -/// │ │ -/// └──────────────────┘ -/// ``` -pub fn read_as_batch( - data: &[u8], - schema: Arc, - offsets: &[usize], -) -> Result { - let row_num = offsets.len(); - let mut output = MutableRecordBatch::new(row_num, schema.clone()); - let mut row = RowReader::new(&schema); - - for offset in offsets.iter().take(row_num) { - row.point_to(*offset, data); - read_row(&row, &mut output, &schema); - } - - output.output().map_err(DataFusionError::ArrowError) -} - -#[macro_export] -macro_rules! get_idx { - ($NATIVE: ident, $SELF: ident, $IDX: ident, $WIDTH: literal) => {{ - $SELF.assert_index_valid($IDX); - let offset = $SELF.field_offsets()[$IDX]; - let start = $SELF.base_offset + offset; - let end = start + $WIDTH; - $NATIVE::from_le_bytes($SELF.data[start..end].try_into().unwrap()) - }}; -} - -#[macro_export] -macro_rules! fn_get_idx { - ($NATIVE: ident, $WIDTH: literal) => { - paste::item! { - fn [](&self, idx: usize) -> $NATIVE { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - let start = self.base_offset + offset; - let end = start + $WIDTH; - $NATIVE::from_le_bytes(self.data[start..end].try_into().unwrap()) - } - } - }; -} - -#[macro_export] -macro_rules! fn_get_idx_opt { - ($NATIVE: ident) => { - paste::item! { - pub fn [](&self, idx: usize) -> Option<$NATIVE> { - if self.is_valid_at(idx) { - Some(self.[](idx)) - } else { - None - } - } - } - }; -} - -/// Read the tuple `data[base_offset..]` we are currently pointing to -pub struct RowReader<'a> { - /// Layout on how to read each field - layout: RowLayout, - /// Raw bytes slice where the tuple stores - data: &'a [u8], - /// Start position for the current tuple in the raw bytes slice. - base_offset: usize, -} - -impl<'a> std::fmt::Debug for RowReader<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - if self.null_free() { - write!(f, "null_free") - } else { - let null_bits = self.null_bits(); - write!( - f, - "{:?}", - NullBitsFormatter::new(null_bits, self.layout.field_count) - ) - } - } -} - -impl<'a> RowReader<'a> { - /// new - pub fn new(schema: &Schema) -> Self { - Self { - layout: RowLayout::new(schema), - data: &[], - base_offset: 0, - } - } - - /// Update this row to point to position `offset` in `base` - pub fn point_to(&mut self, offset: usize, data: &'a [u8]) { - self.base_offset = offset; - self.data = data; - } - - #[inline] - fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.layout.field_count); - } - - #[inline(always)] - fn field_offsets(&self) -> &[usize] { - &self.layout.field_offsets - } - - #[inline(always)] - fn null_free(&self) -> bool { - self.layout.null_free - } - - #[inline(always)] - fn null_bits(&self) -> &[u8] { - if self.null_free() { - &[] - } else { - let start = self.base_offset; - &self.data[start..start + self.layout.null_width] - } - } - - #[inline(always)] - fn all_valid(&self) -> bool { - if self.null_free() { - true - } else { - let null_bits = self.null_bits(); - all_valid(null_bits, self.layout.field_count) - } - } - - fn is_valid_at(&self, idx: usize) -> bool { - unsafe { get_bit_raw(self.null_bits().as_ptr(), idx) } - } - - fn get_bool(&self, idx: usize) -> bool { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - let value = &self.data[self.base_offset + offset..]; - value[0] != 0 - } - - fn get_u8(&self, idx: usize) -> u8 { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[self.base_offset + offset] - } - - fn_get_idx!(u16, 2); - fn_get_idx!(u32, 4); - fn_get_idx!(u64, 8); - fn_get_idx!(i8, 1); - fn_get_idx!(i16, 2); - fn_get_idx!(i32, 4); - fn_get_idx!(i64, 8); - fn_get_idx!(f32, 4); - fn_get_idx!(f64, 8); - - fn get_date32(&self, idx: usize) -> i32 { - get_idx!(i32, self, idx, 4) - } - - fn get_date64(&self, idx: usize) -> i64 { - get_idx!(i64, self, idx, 8) - } - - fn get_decimal128(&self, idx: usize) -> i128 { - get_idx!(i128, self, idx, 16) - } - - fn_get_idx_opt!(bool); - fn_get_idx_opt!(u8); - fn_get_idx_opt!(u16); - fn_get_idx_opt!(u32); - fn_get_idx_opt!(u64); - fn_get_idx_opt!(i8); - fn_get_idx_opt!(i16); - fn_get_idx_opt!(i32); - fn_get_idx_opt!(i64); - fn_get_idx_opt!(f32); - fn_get_idx_opt!(f64); - - fn get_date32_opt(&self, idx: usize) -> Option { - if self.is_valid_at(idx) { - Some(self.get_date32(idx)) - } else { - None - } - } - - fn get_date64_opt(&self, idx: usize) -> Option { - if self.is_valid_at(idx) { - Some(self.get_date64(idx)) - } else { - None - } - } - - fn get_decimal128_opt(&self, idx: usize) -> Option { - if self.is_valid_at(idx) { - Some(self.get_decimal128(idx)) - } else { - None - } - } -} - -/// Read the row currently pointed by RowWriter to the output columnar batch buffer -pub fn read_row(row: &RowReader, batch: &mut MutableRecordBatch, schema: &Schema) { - if row.all_valid() { - for ((col_idx, to), field) in batch - .arrays - .iter_mut() - .enumerate() - .zip(schema.fields().iter()) - { - read_field_null_free(to, field.data_type(), col_idx, row) - } - } else { - for ((col_idx, to), field) in batch - .arrays - .iter_mut() - .enumerate() - .zip(schema.fields().iter()) - { - read_field(to, field.data_type(), col_idx, row) - } - } -} - -macro_rules! fn_read_field { - ($NATIVE: ident, $ARRAY: ident) => { - paste::item! { - pub(crate) fn [](to: &mut Box, col_idx: usize, row: &RowReader) { - let to = to - .as_any_mut() - .downcast_mut::<$ARRAY>() - .unwrap(); - to.append_option(row.[](col_idx)); - } - - pub(crate) fn [](to: &mut Box, col_idx: usize, row: &RowReader) { - let to = to - .as_any_mut() - .downcast_mut::<$ARRAY>() - .unwrap(); - to.append_value(row.[](col_idx)); - } - } - }; -} - -fn_read_field!(bool, BooleanBuilder); -fn_read_field!(u8, UInt8Builder); -fn_read_field!(u16, UInt16Builder); -fn_read_field!(u32, UInt32Builder); -fn_read_field!(u64, UInt64Builder); -fn_read_field!(i8, Int8Builder); -fn_read_field!(i16, Int16Builder); -fn_read_field!(i32, Int32Builder); -fn_read_field!(i64, Int64Builder); -fn_read_field!(f32, Float32Builder); -fn_read_field!(f64, Float64Builder); -fn_read_field!(date32, Date32Builder); -fn_read_field!(date64, Date64Builder); -fn_read_field!(decimal128, Decimal128Builder); - -fn read_field( - to: &mut Box, - dt: &DataType, - col_idx: usize, - row: &RowReader, -) { - use DataType::*; - match dt { - Boolean => read_field_bool(to, col_idx, row), - UInt8 => read_field_u8(to, col_idx, row), - UInt16 => read_field_u16(to, col_idx, row), - UInt32 => read_field_u32(to, col_idx, row), - UInt64 => read_field_u64(to, col_idx, row), - Int8 => read_field_i8(to, col_idx, row), - Int16 => read_field_i16(to, col_idx, row), - Int32 => read_field_i32(to, col_idx, row), - Int64 => read_field_i64(to, col_idx, row), - Float32 => read_field_f32(to, col_idx, row), - Float64 => read_field_f64(to, col_idx, row), - Date32 => read_field_date32(to, col_idx, row), - Date64 => read_field_date64(to, col_idx, row), - Decimal128(_, _) => read_field_decimal128(to, col_idx, row), - _ => unimplemented!(), - } -} - -fn read_field_null_free( - to: &mut Box, - dt: &DataType, - col_idx: usize, - row: &RowReader, -) { - use DataType::*; - match dt { - Boolean => read_field_bool_null_free(to, col_idx, row), - UInt8 => read_field_u8_null_free(to, col_idx, row), - UInt16 => read_field_u16_null_free(to, col_idx, row), - UInt32 => read_field_u32_null_free(to, col_idx, row), - UInt64 => read_field_u64_null_free(to, col_idx, row), - Int8 => read_field_i8_null_free(to, col_idx, row), - Int16 => read_field_i16_null_free(to, col_idx, row), - Int32 => read_field_i32_null_free(to, col_idx, row), - Int64 => read_field_i64_null_free(to, col_idx, row), - Float32 => read_field_f32_null_free(to, col_idx, row), - Float64 => read_field_f64_null_free(to, col_idx, row), - Date32 => read_field_date32_null_free(to, col_idx, row), - Date64 => read_field_date64_null_free(to, col_idx, row), - Decimal128(_, _) => read_field_decimal128_null_free(to, col_idx, row), - _ => unimplemented!(), - } -} diff --git a/datafusion/row/src/validity.rs b/datafusion/row/src/validity.rs deleted file mode 100644 index 45f5e19f1894..000000000000 --- a/datafusion/row/src/validity.rs +++ /dev/null @@ -1,161 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Row format validity utilities - -use arrow::util::bit_util::get_bit_raw; -use std::fmt::Write; - -const ALL_VALID_MASK: [u8; 8] = [1, 3, 7, 15, 31, 63, 127, 255]; - -/// Returns if all fields are valid -pub fn all_valid(data: &[u8], n: usize) -> bool { - for item in data.iter().take(n / 8) { - if *item != ALL_VALID_MASK[7] { - return false; - } - } - if n % 8 == 0 { - true - } else { - data[n / 8] == ALL_VALID_MASK[n % 8 - 1] - } -} - -/// Show null bit for each field in a tuple, 1 for valid and 0 for null. -/// For a tuple with nine total fields, valid at field 0, 6, 7, 8 shows as `[10000011, 1]`. -pub struct NullBitsFormatter<'a> { - null_bits: &'a [u8], - field_count: usize, -} - -impl<'a> NullBitsFormatter<'a> { - /// new - pub fn new(null_bits: &'a [u8], field_count: usize) -> Self { - Self { - null_bits, - field_count, - } - } -} - -impl<'a> std::fmt::Debug for NullBitsFormatter<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut is_first = true; - let data = self.null_bits; - for i in 0..self.field_count { - if is_first { - f.write_char('[')?; - is_first = false; - } else if i % 8 == 0 { - f.write_str(", ")?; - } - if unsafe { get_bit_raw(data.as_ptr(), i) } { - f.write_char('1')?; - } else { - f.write_char('0')?; - } - } - f.write_char(']')?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::util::bit_util::{ceil, set_bit_raw, unset_bit_raw}; - use rand::Rng; - - fn test_validity(bs: &[bool]) { - let n = bs.len(); - let mut data = vec![0; ceil(n, 8)]; - for (i, b) in bs.iter().enumerate() { - if *b { - let data_argument = &mut data; - unsafe { - set_bit_raw(data_argument.as_mut_ptr(), i); - }; - } else { - let data_argument = &mut data; - unsafe { - unset_bit_raw(data_argument.as_mut_ptr(), i); - }; - } - } - let expected = bs.iter().all(|f| *f); - assert_eq!(all_valid(&data, bs.len()), expected); - } - - #[test] - fn test_all_valid() { - let sizes = [4, 8, 12, 16, 19, 23, 32, 44]; - for i in sizes { - { - // contains false - let input = { - let mut rng = rand::thread_rng(); - let mut input: Vec = vec![false; i]; - rng.fill(&mut input[..]); - input - }; - test_validity(&input); - } - - { - // all true - let input = vec![true; i]; - test_validity(&input); - } - } - } - - #[test] - fn test_formatter() -> std::fmt::Result { - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001], 8)), - "[10000011]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1], 9)), - "[10000011, 1]" - ); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 2)), "[10]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 3)), "[100]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 4)), "[1000]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 5)), "[10000]"); - assert_eq!(format!("{:?}", NullBitsFormatter::new(&[1], 6)), "[100000]"); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[1], 7)), - "[1000000]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[1], 8)), - "[10000000]" - ); - // extra bytes are ignored - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1, 1], 9)), - "[10000011, 1]" - ); - assert_eq!( - format!("{:?}", NullBitsFormatter::new(&[0b11000001, 1, 1], 16)), - "[10000011, 10000000]" - ); - Ok(()) - } -} diff --git a/datafusion/row/src/writer.rs b/datafusion/row/src/writer.rs deleted file mode 100644 index 14ce6afe6832..000000000000 --- a/datafusion/row/src/writer.rs +++ /dev/null @@ -1,333 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! [`RowWriter`] writes [`RecordBatch`]es to `Vec` to stitch attributes together - -use crate::layout::RowLayout; -use arrow::array::*; -use arrow::datatypes::{DataType, Schema}; -use arrow::record_batch::RecordBatch; -use arrow::util::bit_util::{set_bit_raw, unset_bit_raw}; -use datafusion_common::cast::{as_date32_array, as_date64_array, as_decimal128_array}; -use datafusion_common::Result; -use std::sync::Arc; - -/// Append batch from `row_idx` to `output` buffer start from `offset` -/// # Panics -/// -/// This function will panic if the output buffer doesn't have enough space to hold all the rows -pub fn write_batch_unchecked( - output: &mut [u8], - offset: usize, - batch: &RecordBatch, - row_idx: usize, - schema: Arc, -) -> Vec { - let mut writer = RowWriter::new(&schema); - let mut current_offset = offset; - let mut offsets = vec![]; - let columns = batch.columns(); - for cur_row in row_idx..batch.num_rows() { - offsets.push(current_offset); - let row_width = write_row(&mut writer, cur_row, &schema, columns); - output[current_offset..current_offset + row_width] - .copy_from_slice(writer.get_row()); - current_offset += row_width; - writer.reset() - } - offsets -} - -/// Bench interpreted version write -#[inline(never)] -pub fn bench_write_batch( - batches: &[Vec], - schema: Arc, -) -> Result> { - let mut writer = RowWriter::new(&schema); - let mut lengths = vec![]; - - for batch in batches.iter().flatten() { - let columns = batch.columns(); - for cur_row in 0..batch.num_rows() { - let row_width = write_row(&mut writer, cur_row, &schema, columns); - lengths.push(row_width); - writer.reset() - } - } - - Ok(lengths) -} - -#[macro_export] -macro_rules! set_idx { - ($WIDTH: literal, $SELF: ident, $IDX: ident, $VALUE: ident) => {{ - $SELF.assert_index_valid($IDX); - let offset = $SELF.field_offsets()[$IDX]; - $SELF.data[offset..offset + $WIDTH].copy_from_slice(&$VALUE.to_le_bytes()); - }}; -} - -#[macro_export] -macro_rules! fn_set_idx { - ($NATIVE: ident, $WIDTH: literal) => { - paste::item! { - fn [](&mut self, idx: usize, value: $NATIVE) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset..offset + $WIDTH].copy_from_slice(&value.to_le_bytes()); - } - } - }; -} - -/// Reusable row writer backed by `Vec` -/// -/// ```text -/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ -/// RowWriter │ -/// ┌───────────────────────┐ │ [RowFormat] -/// │ │ │ -/// │ │ │(copy from Array -/// │ │ to [u8]) │ ┌───────────────────────┐ -/// │ RecordBatch │ └ ─ ─ ─ ─ ─ ─ ─ ─ │ RowFormat │ -/// │ │──────────────────────────────▶│ Vec │ -/// │ (... N Rows ...) │ │ │ -/// │ │ └───────────────────────┘ -/// │ │ -/// │ │ -/// └───────────────────────┘ -/// ``` -pub struct RowWriter { - /// Layout on how to write each field - layout: RowLayout, - /// Buffer for the current tuple being written. - data: Vec, - /// Length in bytes for the current tuple, 8-bytes word aligned. - pub(crate) row_width: usize, -} - -impl RowWriter { - /// New - pub fn new(schema: &Schema) -> Self { - let layout = RowLayout::new(schema); - let init_capacity = layout.fixed_part_width(); - Self { - layout, - data: vec![0; init_capacity], - row_width: init_capacity, - } - } - - /// Reset the row writer state for new tuple - pub fn reset(&mut self) { - self.data.fill(0); - self.row_width = self.layout.fixed_part_width(); - } - - #[inline] - fn assert_index_valid(&self, idx: usize) { - assert!(idx < self.layout.field_count); - } - - #[inline(always)] - fn field_offsets(&self) -> &[usize] { - &self.layout.field_offsets - } - - #[inline(always)] - fn null_free(&self) -> bool { - self.layout.null_free - } - - pub(crate) fn set_null_at(&mut self, idx: usize) { - assert!( - !self.null_free(), - "Unexpected call to set_null_at on null-free row writer" - ); - let null_bits = &mut self.data[0..self.layout.null_width]; - unsafe { - unset_bit_raw(null_bits.as_mut_ptr(), idx); - } - } - - pub(crate) fn set_non_null_at(&mut self, idx: usize) { - assert!( - !self.null_free(), - "Unexpected call to set_non_null_at on null-free row writer" - ); - let null_bits = &mut self.data[0..self.layout.null_width]; - unsafe { - set_bit_raw(null_bits.as_mut_ptr(), idx); - } - } - - fn set_bool(&mut self, idx: usize, value: bool) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = u8::from(value); - } - - fn set_u8(&mut self, idx: usize, value: u8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value; - } - - fn_set_idx!(u16, 2); - fn_set_idx!(u32, 4); - fn_set_idx!(u64, 8); - fn_set_idx!(i16, 2); - fn_set_idx!(i32, 4); - fn_set_idx!(i64, 8); - fn_set_idx!(f32, 4); - fn_set_idx!(f64, 8); - - fn set_i8(&mut self, idx: usize, value: i8) { - self.assert_index_valid(idx); - let offset = self.field_offsets()[idx]; - self.data[offset] = value.to_le_bytes()[0]; - } - - fn set_date32(&mut self, idx: usize, value: i32) { - set_idx!(4, self, idx, value) - } - - fn set_date64(&mut self, idx: usize, value: i64) { - set_idx!(8, self, idx, value) - } - - fn set_decimal128(&mut self, idx: usize, value: i128) { - set_idx!(16, self, idx, value) - } - - /// Get raw bytes - pub fn get_row(&self) -> &[u8] { - &self.data[0..self.row_width] - } -} - -/// Stitch attributes of tuple in `batch` at `row_idx` and returns the tuple width -pub fn write_row( - row_writer: &mut RowWriter, - row_idx: usize, - schema: &Schema, - columns: &[ArrayRef], -) -> usize { - // Get the row from the batch denoted by row_idx - if row_writer.null_free() { - for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { - write_field(i, row_idx, col, f.data_type(), row_writer); - } - } else { - for ((i, f), col) in schema.fields().iter().enumerate().zip(columns.iter()) { - if !col.is_null(row_idx) { - row_writer.set_non_null_at(i); - write_field(i, row_idx, col, f.data_type(), row_writer); - } else { - row_writer.set_null_at(i); - } - } - } - - row_writer.row_width -} - -macro_rules! fn_write_field { - ($NATIVE: ident, $ARRAY: ident) => { - paste::item! { - pub(crate) fn [](to: &mut RowWriter, from: &Arc, col_idx: usize, row_idx: usize) { - let from = from - .as_any() - .downcast_ref::<$ARRAY>() - .unwrap(); - to.[](col_idx, from.value(row_idx)); - } - } - }; -} - -fn_write_field!(bool, BooleanArray); -fn_write_field!(u8, UInt8Array); -fn_write_field!(u16, UInt16Array); -fn_write_field!(u32, UInt32Array); -fn_write_field!(u64, UInt64Array); -fn_write_field!(i8, Int8Array); -fn_write_field!(i16, Int16Array); -fn_write_field!(i32, Int32Array); -fn_write_field!(i64, Int64Array); -fn_write_field!(f32, Float32Array); -fn_write_field!(f64, Float64Array); - -pub(crate) fn write_field_date32( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - match as_date32_array(from) { - Ok(from) => to.set_date32(col_idx, from.value(row_idx)), - Err(e) => panic!("{e}"), - }; -} - -pub(crate) fn write_field_date64( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - let from = as_date64_array(from).unwrap(); - to.set_date64(col_idx, from.value(row_idx)); -} - -pub(crate) fn write_field_decimal128( - to: &mut RowWriter, - from: &Arc, - col_idx: usize, - row_idx: usize, -) { - let from = as_decimal128_array(from).unwrap(); - to.set_decimal128(col_idx, from.value(row_idx)); -} - -fn write_field( - col_idx: usize, - row_idx: usize, - col: &Arc, - dt: &DataType, - row: &mut RowWriter, -) { - use DataType::*; - match dt { - Boolean => write_field_bool(row, col, col_idx, row_idx), - UInt8 => write_field_u8(row, col, col_idx, row_idx), - UInt16 => write_field_u16(row, col, col_idx, row_idx), - UInt32 => write_field_u32(row, col, col_idx, row_idx), - UInt64 => write_field_u64(row, col, col_idx, row_idx), - Int8 => write_field_i8(row, col, col_idx, row_idx), - Int16 => write_field_i16(row, col, col_idx, row_idx), - Int32 => write_field_i32(row, col, col_idx, row_idx), - Int64 => write_field_i64(row, col, col_idx, row_idx), - Float32 => write_field_f32(row, col, col_idx, row_idx), - Float64 => write_field_f64(row, col, col_idx, row_idx), - Date32 => write_field_date32(row, col, col_idx, row_idx), - Date64 => write_field_date64(row, col, col_idx, row_idx), - Decimal128(_, _) => write_field_decimal128(row, col, col_idx, row_idx), - _ => unimplemented!(), - } -}