From 92cbaba9d5e79cf7b01677f11fdc009936103919 Mon Sep 17 00:00:00 2001 From: kikkon Date: Sun, 24 Mar 2024 20:36:41 +0800 Subject: [PATCH] [WIP] feat: support live_view_array --- arrow-array/src/array/list_view_array.rs | 501 ++++++++++++++++++ arrow-array/src/array/mod.rs | 31 +- .../src/builder/generic_list_view_builder.rs | 249 +++++++++ arrow-array/src/builder/mod.rs | 2 + arrow-array/src/iterator.rs | 4 +- arrow-array/src/record_batch.rs | 4 +- arrow-buffer/src/buffer/mod.rs | 3 + arrow-buffer/src/buffer/size.rs | 91 ++++ arrow-data/src/data.rs | 69 ++- arrow-data/src/equal/list_view.rs | 51 ++ arrow-data/src/equal/mod.rs | 9 +- 11 files changed, 999 insertions(+), 15 deletions(-) create mode 100644 arrow-array/src/array/list_view_array.rs create mode 100644 arrow-array/src/builder/generic_list_view_builder.rs create mode 100644 arrow-buffer/src/buffer/size.rs create mode 100644 arrow-data/src/equal/list_view.rs diff --git a/arrow-array/src/array/list_view_array.rs b/arrow-array/src/array/list_view_array.rs new file mode 100644 index 000000000000..f7de5ee86bac --- /dev/null +++ b/arrow-array/src/array/list_view_array.rs @@ -0,0 +1,501 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::array::{get_offsets, get_sizes, make_array, print_long_array}; +use crate::builder::{GenericListViewBuilder, PrimitiveBuilder}; +use crate::{new_empty_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, OffsetSizeTrait}; +use arrow_buffer::{NullBuffer, OffsetBuffer, SizeBuffer}; +use arrow_data::{ArrayData, ArrayDataBuilder}; +use arrow_schema::{ArrowError, DataType, FieldRef}; +use std::any::Any; +use std::ops::Add; +use std::sync::Arc; +use crate::iterator::GenericListViewArrayIter; + +// See [`ListViewBuilder`](crate::builder::ListViewBuilder) for how to construct a [`ListViewArray`] +pub type ListViewArray = GenericListViewArray; + +/// A [`GenericListViewArray`] of variable size lists, storing offsets as `i64`. +/// +// See [`LargeListViewArray`](crate::builder::LargeListViewBuilder) for how to construct a [`LargeListViewArray`] +pub type LargeListViewArray = GenericListViewArray; + +/// +/// Different than [`crate::GenericListArray`] as it stores both an offset and length +/// meaning that take / filter operations can be implemented without copying the underlying data. +/// +/// [Variable-size List Layout: ListView Layout]: https://arrow.apache.org/docs/format/Columnar.html#listview-layout +pub struct GenericListViewArray { + data_type: DataType, + nulls: Option, + values: ArrayRef, + value_offsets: OffsetBuffer, + value_sizes: SizeBuffer, +} + + +impl Clone for GenericListViewArray { + fn clone(&self) -> Self { + Self { + data_type: self.data_type.clone(), + nulls: self.nulls.clone(), + values: self.values.clone(), + value_offsets: self.value_offsets.clone(), + value_sizes: self.value_sizes.clone(), + } + } +} + +impl GenericListViewArray { + /// The data type constructor of listview array. + /// The input is the schema of the child array and + /// the output is the [`DataType`], ListView or LargeListView. + pub const DATA_TYPE_CONSTRUCTOR: fn(FieldRef) -> DataType = if OffsetSize::IS_LARGE { + DataType::LargeListView + } else { + DataType::ListView + }; + + pub fn try_new( + field: FieldRef, + offsets: OffsetBuffer, + sizes: SizeBuffer, + values: ArrayRef, + nulls: Option, + ) -> Result { + let len = offsets.len(); + if len != sizes.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Length of offsets buffer and sizes buffer must be equal for {}ListViewArray, got {} and {}", + OffsetSize::PREFIX, len, sizes.len() + ))); + } + + if let Some(n) = nulls.as_ref() { + if n.len() != len { + return Err(ArrowError::InvalidArgumentError(format!( + "Incorrect length of null buffer for {}ListViewArray, expected {len} got {}", + OffsetSize::PREFIX, + n.len(), + ))); + } + } + if !field.is_nullable() && values.is_nullable() { + return Err(ArrowError::InvalidArgumentError(format!( + "Non-nullable field of {}ListViewArray {:?} cannot contain nulls", + OffsetSize::PREFIX, + field.name() + ))); + } + + if field.data_type() != values.data_type() { + return Err(ArrowError::InvalidArgumentError(format!( + "{}ListViewArray expected data type {} got {} for {:?}", + OffsetSize::PREFIX, + field.data_type(), + values.data_type(), + field.name() + ))); + } + + + Ok(Self { + data_type: Self::DATA_TYPE_CONSTRUCTOR(field), + nulls, + values, + value_offsets: offsets, + value_sizes: sizes, + }) + } + + /// Create a new [`GenericListViewArray`] from the provided parts + /// + /// # Panics + /// + /// Panics if [`Self::try_new`] returns an error + pub fn new( + field: FieldRef, + offsets: OffsetBuffer, + sizes: SizeBuffer, + values: ArrayRef, + nulls: Option, + ) -> Self { + Self::try_new(field, offsets, sizes, values, nulls).unwrap() + } + + /// Create a new [`GenericListViewArray`] of length `len` where all values are null + pub fn new_null(field: FieldRef, len: usize) -> Self { + let values = new_empty_array(field.data_type()); + Self { + data_type: Self::DATA_TYPE_CONSTRUCTOR(field), + nulls: Some(NullBuffer::new_null(len)), + value_offsets: OffsetBuffer::new_zeroed(len), + value_sizes: SizeBuffer::new_zeroed(len), + values, + } + } + + /// Deconstruct this array into its constituent parts + pub fn into_parts( + self, + ) -> ( + FieldRef, + OffsetBuffer, + SizeBuffer, + ArrayRef, + Option, + ) { + let f = match self.data_type { + DataType::ListView(f) | DataType::LargeListView(f) => f, + _ => unreachable!(), + }; + (f, self.value_offsets, self.value_sizes, self.values, self.nulls) + } + + /// Returns a reference to the offsets of this list + /// + /// Unlike [`Self::value_offsets`] this returns the [`OffsetBuffer`] + /// allowing for zero-copy cloning + #[inline] + pub fn offsets(&self) -> &OffsetBuffer { + &self.value_offsets + } + + /// Returns a reference to the values of this list + #[inline] + pub fn values(&self) -> &ArrayRef { + &self.values + } + + /// Returns a reference to the sizes of this list + /// + /// Unlike [`Self::value_sizes`] this returns the [`SizeBuffer`] + /// allowing for zero-copy cloning + #[inline] + pub fn sizes(&self) -> &SizeBuffer { + &self.value_sizes + } + + + /// Returns a clone of the value type of this list. + pub fn value_type(&self) -> DataType { + self.values.data_type().clone() + } + + /// Returns ith value of this list array. + /// # Safety + /// Caller must ensure that the index is within the array bounds + pub unsafe fn value_unchecked(&self, i: usize) -> ArrayRef { + let end = self.value_offsets().get_unchecked(i + 1).as_usize(); + let start = self.value_offsets().get_unchecked(i).as_usize(); + self.values.slice(start, end - start) + } + + /// Returns ith value of this list array. + pub fn value(&self, i: usize) -> ArrayRef { + let offset = self.value_offsets()[i].as_usize(); + let length = self.value_sizes()[i].as_usize(); + self.values.slice(offset, length) + } + + /// Returns the offset values in the offsets buffer + #[inline] + pub fn value_offsets(&self) -> &[OffsetSize] { + &self.value_offsets + } + + /// Returns the sizes values in the offsets buffer + #[inline] + pub fn value_sizes(&self) -> &[OffsetSize] { + &self.value_sizes + } + + /// Returns the length for value at index `i`. + #[inline] + pub fn value_length(&self, i: usize) -> OffsetSize { + let offsets = self.value_offsets(); + offsets[i + 1] - offsets[i] + } + + /// constructs a new iterator + pub fn iter<'a>(&'a self) -> GenericListViewArrayIter<'a, OffsetSize> { + GenericListViewArrayIter::<'a, OffsetSize>::new(self) + } + + #[inline] + fn get_type(data_type: &DataType) -> Option<&DataType> { + match (OffsetSize::IS_LARGE, data_type) { + (true, DataType::LargeListView(child)) | (false, DataType::ListView(child)) => { + Some(child.data_type()) + } + _ => None, + } + } + + /// Returns a zero-copy slice of this array with the indicated offset and length. + pub fn slice(&self, offset: usize, length: usize) -> Self { + Self { + data_type: self.data_type.clone(), + nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), + values: self.values.clone(), + value_offsets: self.value_offsets.slice(offset, length), + value_sizes: self.value_sizes.slice(offset, length), + } + } + + pub fn from_iter_primitive(iter: I) -> Self + where + T: ArrowPrimitiveType, + P: IntoIterator::Native>>, + I: IntoIterator>, + { + let iter = iter.into_iter(); + let size_hint = iter.size_hint().0; + let mut builder = + GenericListViewBuilder::with_capacity(PrimitiveBuilder::::new(), size_hint); + + for i in iter { + match i { + Some(p) => { + //todo: remove size variable + let mut size = 0usize; + for t in p { + builder.values().append_option(t); + size = size.add(1); + } + builder.append(true, size); + } + None => builder.append(false, 0), + } + } + builder.finish() + } +} + +impl<'a, OffsetSize: OffsetSizeTrait> ArrayAccessor for &'a GenericListViewArray { + type Item = ArrayRef; + + fn value(&self, index: usize) -> Self::Item { + GenericListViewArray::value(self, index) + } + + unsafe fn value_unchecked(&self, index: usize) -> Self::Item { + GenericListViewArray::value(self, index) + } +} + + +impl Array for GenericListViewArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn to_data(&self) -> ArrayData { + self.clone().into() + } + + fn into_data(self) -> ArrayData { + self.into() + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn slice(&self, offset: usize, length: usize) -> ArrayRef { + Arc::new(self.slice(offset, length)) + } + + fn len(&self) -> usize { + self.value_offsets.len() - 1 + } + + fn is_empty(&self) -> bool { + self.value_offsets.len() <= 1 + } + + fn offset(&self) -> usize { + 0 + } + + fn nulls(&self) -> Option<&NullBuffer> { + self.nulls.as_ref() + } + + fn get_buffer_memory_size(&self) -> usize { + let mut size = self.values.get_buffer_memory_size(); + size += self.value_offsets.inner().inner().capacity(); + if let Some(n) = self.nulls.as_ref() { + size += n.buffer().capacity(); + } + size + } + + fn get_array_memory_size(&self) -> usize { + let mut size = std::mem::size_of::() + self.values.get_array_memory_size(); + size += self.value_offsets.inner().inner().capacity(); + if let Some(n) = self.nulls.as_ref() { + size += n.buffer().capacity(); + } + size + } +} + +impl std::fmt::Debug for GenericListViewArray { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let prefix = OffsetSize::PREFIX; + + write!(f, "{prefix}ListViewArray\n[\n")?; + print_long_array(self, f, |array, index, f| { + std::fmt::Debug::fmt(&array.value(index), f) + })?; + write!(f, "]") + } +} + +impl From> for ArrayData { + fn from(array: GenericListViewArray) -> Self { + let len = array.len(); + let builder = ArrayDataBuilder::new(array.data_type) + .len(len) + .nulls(array.nulls) + .buffers(vec![array.value_offsets.into_inner().into_inner(), array.value_sizes.into_inner().into_inner()]) + .child_data(vec![array.values.to_data()]); + + unsafe { builder.build_unchecked() } + } +} + +impl From for GenericListViewArray { + fn from(data: ArrayData) -> Self { + Self::try_new_from_array_data(data) + .expect("Expected infallible creation of GenericListViewArray from ArrayDataRef failed") + } +} + +impl GenericListViewArray { + fn try_new_from_array_data(data: ArrayData) -> Result { + if data.buffers().len() != 2 { + return Err(ArrowError::InvalidArgumentError(format!( + "ListViewArray data should contain two buffer (value offsets & value size), had {}", + data.buffers().len() + ))); + } + + if data.child_data().len() != 1 { + return Err(ArrowError::InvalidArgumentError(format!( + "ListViewArray should contain a single child array (values array), had {}", + data.child_data().len() + ))); + } + + let values = data.child_data()[0].clone(); + + if let Some(child_data_type) = Self::get_type(data.data_type()) { + if values.data_type() != child_data_type { + return Err(ArrowError::InvalidArgumentError(format!( + "[Large]ListViewArray's child datatype {:?} does not \ + correspond to the List's datatype {:?}", + values.data_type(), + child_data_type + ))); + } + } else { + return Err(ArrowError::InvalidArgumentError(format!( + "[Large]ListViewArray's datatype must be [Large]ListViewArray(). It is {:?}", + data.data_type() + ))); + } + + let values = make_array(values); + // SAFETY: + // ArrayData is valid, and verified type above + let value_offsets = unsafe { get_offsets(&data) }; + let value_sizes = unsafe { get_sizes(&data) }; + + Ok(Self { + data_type: data.data_type().clone(), + nulls: data.nulls().cloned(), + values, + value_offsets, + value_sizes, + }) + } +} + + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::Int32Type; + use crate::Int32Array; + use arrow_buffer::{Buffer, ScalarBuffer}; + use arrow_schema::Field; + + fn create_from_buffers() -> ListViewArray { + // [[0, 1, 2], [3, 4, 5], [6, 7]] + let values = Int32Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7]); + let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6])); + let field = Arc::new(Field::new("item", DataType::Int32, true)); + let sizes = SizeBuffer::new(ScalarBuffer::from(vec![3, 3, 2])); + + ListViewArray::new(field, offsets, sizes,Arc::new(values), None) + } + + + #[test] + fn test_from_iter_primitive() { + let data = vec![ + Some(vec![Some(0), Some(1), Some(2)]), + Some(vec![Some(3), Some(4), Some(5)]), + Some(vec![Some(6), Some(7)]), + ]; + let list_array = ListViewArray::from_iter_primitive::(data); + let another = create_from_buffers(); + assert_eq!(list_array, another) + } + + #[test] + fn test_empty_list_array() { + // Construct an empty value array + let value_data = ArrayData::builder(DataType::Int32) + .len(0) + .add_buffer(Buffer::from([])) + .build() + .unwrap(); + + // Construct an empty offset buffer + let value_offsets = Buffer::from([]); + // Construct an empty size buffer + let value_sizes = Buffer::from([]); + + // Construct a list view array from the above two + let list_data_type = DataType::ListView(Arc::new(Field::new("item", DataType::Int32, false))); + let list_data = ArrayData::builder(list_data_type) + .len(0) + .add_buffer(value_offsets) + .add_buffer(value_sizes) + .add_child_data(value_data) + .build() + .unwrap(); + + let list_array = ListViewArray::from(list_data); + assert_eq!(list_array.len(), 0) + } + + +} \ No newline at end of file diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index b115ff9c14cc..f981ba4021a3 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -20,7 +20,7 @@ mod binary_array; use crate::types::*; -use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer, SizeBuffer}; use arrow_data::ArrayData; use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use std::any::Any; @@ -65,13 +65,14 @@ mod union_array; pub use union_array::*; mod run_array; - pub use run_array::*; mod byte_view_array; - pub use byte_view_array::*; +mod list_view_array; +pub use list_view_array::*; + /// An array in the [arrow columnar format](https://arrow.apache.org/docs/format/Columnar.html) pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the array as [`Any`] so that it can be @@ -519,6 +520,12 @@ impl PartialEq for GenericListArray { } } +impl PartialEq for GenericListViewArray { + fn eq(&self, other: &Self) -> bool { + self.to_data().eq(&other.to_data()) + } +} + impl PartialEq for MapArray { fn eq(&self, other: &Self) -> bool { self.to_data().eq(&other.to_data()) @@ -606,7 +613,9 @@ pub fn make_array(data: ArrayData) -> ArrayRef { DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data)) as ArrayRef, DataType::Utf8View => Arc::new(StringViewArray::from(data)) as ArrayRef, DataType::List(_) => Arc::new(ListArray::from(data)) as ArrayRef, + DataType::ListView(_) => Arc::new(ListViewArray::from(data)) as ArrayRef, DataType::LargeList(_) => Arc::new(LargeListArray::from(data)) as ArrayRef, + DataType::LargeListView(_) => Arc::new(LargeListViewArray::from(data)) as ArrayRef, DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef, DataType::Map(_, _) => Arc::new(MapArray::from(data)) as ArrayRef, DataType::Union(_, _) => Arc::new(UnionArray::from(data)) as ArrayRef, @@ -687,6 +696,22 @@ unsafe fn get_offsets(data: &ArrayData) -> OffsetBuffer { } } +/// Helper function that gets size from an [`ArrayData`] +/// +/// # Safety +unsafe fn get_sizes(data: &ArrayData) -> SizeBuffer { + match data.is_empty() && data.buffers()[1].is_empty() { + true => SizeBuffer::new_empty(), + false => { + let buffer = + ScalarBuffer::new(data.buffers()[1].clone(), data.offset(), data.len()); + // Safety: + // ArrayData is valid + SizeBuffer::new(buffer) + } + } +} + /// Helper function for printing potentially long arrays. fn print_long_array(array: &A, f: &mut std::fmt::Formatter, print_item: F) -> std::fmt::Result where diff --git a/arrow-array/src/builder/generic_list_view_builder.rs b/arrow-array/src/builder/generic_list_view_builder.rs new file mode 100644 index 000000000000..226ae3ee7393 --- /dev/null +++ b/arrow-array/src/builder/generic_list_view_builder.rs @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +use std::any::Any; +use std::sync::Arc; +use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, OffsetBuffer, SizeBuffer}; +use arrow_schema::{Field, FieldRef}; +use crate::builder::ArrayBuilder; +use crate::{ArrayRef, GenericListViewArray, OffsetSizeTrait}; + +#[derive(Debug)] +pub struct GenericListViewBuilder { + //todo use OffsetBuffer? + offsets_builder: BufferBuilder, + sizes_builder: BufferBuilder, + null_buffer_builder: NullBufferBuilder, + values_builder: T, + field: Option, +} + + + + +impl Default for GenericListViewBuilder { + fn default() -> Self { + Self::new(T::default()) + } +} + +impl GenericListViewBuilder { + /// Creates a new [`GenericListBuilder`] from a given values array builder + pub fn new(values_builder: T) -> Self { + let capacity = values_builder.len(); + Self::with_capacity(values_builder, capacity) + } + + /// Creates a new [`GenericListBuilder`] from a given values array builder + /// `capacity` is the number of items to pre-allocate space for in this builder + pub fn with_capacity(values_builder: T, capacity: usize) -> Self { + let offsets_builder = BufferBuilder::::new(capacity); + let sizes_builder = BufferBuilder::::new(capacity); + Self { + offsets_builder, + null_buffer_builder: NullBufferBuilder::new(capacity), + values_builder, + sizes_builder, + field: None, + } + } + + /// Override the field passed to [`GenericListArray::new`] + /// + /// By default a nullable field is created with the name `item` + /// + /// Note: [`Self::finish`] and [`Self::finish_cloned`] will panic if the + /// field's data type does not match that of `T` + pub fn with_field(self, field: impl Into) -> Self { + Self { + field: Some(field.into()), + ..self + } + } +} + +impl ArrayBuilder +for GenericListViewBuilder + where + T: 'static, +{ + /// Returns the builder as a non-mutable `Any` reference. + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns the builder as a mutable `Any` reference. + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + /// Returns the boxed builder as a box of `Any`. + fn into_box_any(self: Box) -> Box { + self + } + + /// Returns the number of array slots in the builder + fn len(&self) -> usize { + self.null_buffer_builder.len() + } + + /// Builds the array and reset this builder. + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + /// Builds the array without resetting the builder. + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } +} + +impl GenericListViewBuilder + where + T: 'static, +{ + /// Returns the child array builder as a mutable reference. + /// + /// This mutable reference can be used to append values into the child array builder, + /// but you must call [`append`](#method.append) to delimit each distinct list value. + pub fn values(&mut self) -> &mut T { + &mut self.values_builder + } + + /// Returns the child array builder as an immutable reference + pub fn values_ref(&self) -> &T { + &self.values_builder + } + + /// Finish the current variable-length list array slot + /// + /// # Panics + /// + /// Panics if the length of [`Self::values`] exceeds `OffsetSize::MAX` + #[inline] + pub fn append(&mut self, is_valid: bool, size: usize) { + if is_valid { + self.offsets_builder.append(OffsetSize::from_usize(self.values_builder.len() - size).unwrap()); + let size = OffsetSize::from_usize(size).unwrap(); + self.sizes_builder.append(size); + } + self.null_buffer_builder.append(is_valid); + } + + #[inline] + pub fn append_value(&mut self, i: I) + where + T: Extend>, + I: IntoIterator>, + { + self.extend(std::iter::once(Some(i))) + } + + /// Append a null to this [`GenericListBuilder`] + /// + /// See [`Self::append_value`] for an example use. + #[inline] + pub fn append_null(&mut self) { + self.offsets_builder.append(OffsetSize::from_usize(self.values_builder.len()).unwrap()); + self.null_buffer_builder.append_null(); + } + + /// Appends an optional value into this [`GenericListBuilder`] + /// + /// If `Some` calls [`Self::append_value`] otherwise calls [`Self::append_null`] + #[inline] + pub fn append_option(&mut self, i: Option) + where + T: Extend>, + I: IntoIterator>, + { + match i { + Some(i) => self.append_value(i), + None => self.append_null(), + } + } + + /// Builds the [`GenericListViewArray`] and reset this builder. + pub fn finish(&mut self) -> GenericListViewArray { + let values = self.values_builder.finish(); + let nulls = self.null_buffer_builder.finish(); + + let offsets = self.offsets_builder.finish(); + // Safety: Safe by construction + let offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) }; + self.offsets_builder.append(OffsetSize::zero()); + + let sizes = self.sizes_builder.finish(); + // Safety: Safe by construction + let sizes = SizeBuffer::new(sizes.into()); + self.sizes_builder.append(OffsetSize::zero()); + + let field = match &self.field { + Some(f) => f.clone(), + None => Arc::new(Field::new("item", values.data_type().clone(), true)), + }; + + GenericListViewArray::new(field, offsets, sizes, values ,nulls) + } + + /// Builds the [`GenericListArray`] without resetting the builder. + pub fn finish_cloned(&self) -> GenericListViewArray { + let values = self.values_builder.finish_cloned(); + let nulls = self.null_buffer_builder.finish_cloned(); + + let offsets = Buffer::from_slice_ref(self.offsets_builder.as_slice()); + // Safety: safe by construction + let offsets = unsafe { OffsetBuffer::new_unchecked(offsets.into()) }; + + //todo sizes + let sizes = Buffer::from_slice_ref(self.sizes_builder.as_slice()); + let sizes = SizeBuffer::new(sizes.into()); + + let field = match &self.field { + Some(f) => f.clone(), + None => Arc::new(Field::new("item", values.data_type().clone(), true)), + }; + + GenericListViewArray::new(field, offsets, sizes, values, nulls) + } + + /// Returns the current offsets buffer as a slice + pub fn offsets_slice(&self) -> &[OffsetSize] { + self.offsets_builder.as_slice() + } +} + +impl Extend> for GenericListViewBuilder + where + O: OffsetSizeTrait, + B: ArrayBuilder + Extend, + V: IntoIterator, +{ + #[inline] + fn extend>>(&mut self, iter: T) { + for v in iter { + match v { + Some(elements) => { + self.values_builder.extend(elements); + todo!() + } + None => self.append(false, 0), + } + } + } +} diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs index e4ab7ae4ba23..f212ad554355 100644 --- a/arrow-array/src/builder/mod.rs +++ b/arrow-array/src/builder/mod.rs @@ -181,6 +181,8 @@ pub use generic_byte_run_builder::*; mod generic_bytes_view_builder; pub use generic_bytes_view_builder::*; mod union_builder; +mod generic_list_view_builder; +pub use generic_list_view_builder::*; pub use union_builder::*; diff --git a/arrow-array/src/iterator.rs b/arrow-array/src/iterator.rs index 3f9cc0d525c1..5b36d19359eb 100644 --- a/arrow-array/src/iterator.rs +++ b/arrow-array/src/iterator.rs @@ -19,7 +19,7 @@ use crate::array::{ ArrayAccessor, BooleanArray, FixedSizeBinaryArray, GenericBinaryArray, GenericListArray, - GenericStringArray, PrimitiveArray, + GenericListViewArray, GenericStringArray, PrimitiveArray, }; use crate::{FixedSizeListArray, MapArray}; use arrow_buffer::NullBuffer; @@ -141,6 +141,8 @@ pub type FixedSizeBinaryIter<'a> = ArrayIter<&'a FixedSizeBinaryArray>; pub type FixedSizeListIter<'a> = ArrayIter<&'a FixedSizeListArray>; /// an iterator that returns Some(T) or None, that can be used on any ListArray pub type GenericListArrayIter<'a, O> = ArrayIter<&'a GenericListArray>; +/// an iterator that returns Some(T) or None, that can be used on any ListArray +pub type GenericListViewArrayIter<'a, O> = ArrayIter<&'a GenericListViewArray>; /// an iterator that returns Some(T) or None, that can be used on any MapArray pub type MapArrayIter<'a> = ArrayIter<&'a MapArray>; diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index c56b1fd308cf..5fcfaa0b2709 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -626,9 +626,7 @@ mod tests { use std::collections::HashMap; use super::*; - use crate::{ - BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray, - }; + use crate::{BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray}; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::Fields; diff --git a/arrow-buffer/src/buffer/mod.rs b/arrow-buffer/src/buffer/mod.rs index d33e68795e4e..c00bd326b3cb 100644 --- a/arrow-buffer/src/buffer/mod.rs +++ b/arrow-buffer/src/buffer/mod.rs @@ -32,4 +32,7 @@ pub use boolean::*; mod null; pub use null::*; mod run; +mod size; +pub use size::*; + pub use run::*; diff --git a/arrow-buffer/src/buffer/size.rs b/arrow-buffer/src/buffer/size.rs new file mode 100644 index 000000000000..9d2e5889ca1c --- /dev/null +++ b/arrow-buffer/src/buffer/size.rs @@ -0,0 +1,91 @@ +use std::ops::Deref; +use crate::{ArrowNativeType, MutableBuffer, ScalarBuffer}; + +#[derive(Debug, Clone)] +pub struct SizeBuffer(ScalarBuffer); + +impl SizeBuffer { + + /// Create a new [`SizeBuffer`] containing `len` `0` values + pub fn new_zeroed(len: usize) -> Self { + let len_bytes = len.checked_mul(std::mem::size_of::()).expect("overflow"); + let buffer = MutableBuffer::from_len_zeroed(len_bytes); + Self(buffer.into_buffer().into()) + } + + /// Create a new [`SizeBuffer`] from the provided [`ScalarBuffer`] + pub fn new(buffer: ScalarBuffer) -> Self { + assert!(!buffer.is_empty(), "offsets cannot be empty"); + assert!( + buffer[0] >= O::usize_as(0), + "offsets must be greater than 0" + ); + Self(buffer) + } + + /// Create a new [`SizeBuffer`] containing a single 0 value + pub fn new_empty() -> Self { + let buffer = MutableBuffer::from_len_zeroed(std::mem::size_of::()); + Self(buffer.into_buffer().into()) + } + + /// Create a new [`SizeBuffer`] from the iterator of slice lengths + /// + /// ``` + /// # use arrow_buffer::SizeBuffer; + /// let offsets = SizeBuffer::::from_lengths([1, 3, 5]); + /// assert_eq!(offsets.as_ref(), &[1, 3, 5]); + /// ``` + pub fn from_lengths(lengths: I) -> Self + where + I: IntoIterator, + { + let iter = lengths.into_iter(); + let mut out = Vec::with_capacity(iter.size_hint().0); + + for size in iter { + out.push(O::usize_as(size)) + } + Self(out.into()) + } + + /// Returns the inner [`ScalarBuffer`] + pub fn inner(&self) -> &ScalarBuffer { + &self.0 + } + + /// Returns the inner [`ScalarBuffer`], consuming self + pub fn into_inner(self) -> ScalarBuffer { + self.0 + } + + /// Returns a zero-copy slice of this buffer with length `len` and starting at `offset` + pub fn slice(&self, offset: usize, len: usize) -> Self { + Self(self.0.slice(offset, len.saturating_add(1))) + } + + /// Returns true if this [`OffsetBuffer`] is equal to `other`, using pointer comparisons + /// to determine buffer equality. This is cheaper than `PartialEq::eq` but may + /// return false when the arrays are logically equal + #[inline] + pub fn ptr_eq(&self, other: &Self) -> bool { + self.0.ptr_eq(&other.0) + } +} + + +impl Deref for SizeBuffer { + type Target = [T]; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsRef<[T]> for SizeBuffer { + #[inline] + fn as_ref(&self) -> &[T] { + self + } +} diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index e227b168eee5..cf7f8fae88a7 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -119,17 +119,21 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff [buffer, empty_buffer] } DataType::ListView(_) => [ + // offset buffer MutableBuffer::new(capacity * mem::size_of::()), + // size buffer MutableBuffer::new(capacity * mem::size_of::()), ], DataType::LargeList(_) => { - // offset buffer always starts with a zero + // offset buffer let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); buffer.push(0i64); [buffer, empty_buffer] } DataType::LargeListView(_) => [ + // offset buffer MutableBuffer::new(capacity * mem::size_of::()), + // size buffer MutableBuffer::new(capacity * mem::size_of::()), ], DataType::FixedSizeBinary(size) => { @@ -856,6 +860,17 @@ impl ArrayData { self.typed_buffer(0, self.len + 1) } + /// Returns a reference to the data in `buffer` as a typed slice + /// after validating. The returned slice is guaranteed to have at + /// least `len` entries. + fn typed_sizes(&self) -> Result<&[T], ArrowError> { + // An empty list-like array can have 0 sizes + if self.len == 0 && self.buffers[1].is_empty() { + return Ok(&[]); + } + self.typed_buffer(0, self.len) + } + /// Returns a reference to the data in `buffers[idx]` as a typed slice after validating fn typed_buffer( &self, @@ -929,6 +944,26 @@ impl ArrayData { Ok(()) } + fn validate_sizes( + &self, + values_length: usize, + ) -> Result<(), ArrowError> { + let sizes = self.typed_sizes::()?; + if sizes.is_empty() { + return Ok(()); + } + + let total_size: usize = sizes.iter().map(|x| x.to_usize().unwrap()).sum(); + if total_size > values_length { + return Err(ArrowError::InvalidArgumentError(format!( + "Total size of {} is larger than values length {}", + total_size, values_length, + ))); + } + + Ok(()) + } + /// Validates the layout of `child_data` ArrayData structures fn validate_child_data(&self) -> Result<(), ArrowError> { match &self.data_type { @@ -937,11 +972,23 @@ impl ArrayData { self.validate_offsets::(values_data.len)?; Ok(()) } + DataType::ListView(field) => { + let values_data = self.get_single_valid_child_data(field.data_type())?; + self.validate_offsets::(values_data.len)?; + self.validate_sizes::(values_data.len)?; + Ok(()) + } DataType::LargeList(field) => { let values_data = self.get_single_valid_child_data(field.data_type())?; self.validate_offsets::(values_data.len)?; Ok(()) } + DataType::LargeListView(field) => { + let values_data = self.get_single_valid_child_data(field.data_type())?; + self.validate_offsets::(values_data.len)?; + self.validate_sizes::(values_data.len)?; + Ok(()) + } DataType::FixedSizeList(field, list_size) => { let values_data = self.get_single_valid_child_data(field.data_type())?; @@ -1546,9 +1593,8 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataType::BinaryView | DataType::Utf8View => DataTypeLayout::new_view(), DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::List(_) => DataTypeLayout::new_fixed_width::(), - DataType::ListView(_) | DataType::LargeListView(_) => { - unimplemented!("ListView/LargeListView not implemented") - } + DataType::ListView(_) => DataTypeLayout::new_list_view::(), + DataType::LargeListView(_) => DataTypeLayout::new_list_view::(), DataType::LargeList(_) => DataTypeLayout::new_fixed_width::(), DataType::Map(_, _) => DataTypeLayout::new_fixed_width::(), DataType::Struct(_) => DataTypeLayout::new_empty(), // all in child data, @@ -1652,6 +1698,21 @@ impl DataTypeLayout { variadic: true, } } + + /// Describes a list view type + pub fn new_list_view() -> Self { + Self { + buffers: vec![BufferSpec::FixedWidth { + byte_width: mem::size_of::(), + alignment: mem::align_of::(), + },BufferSpec::FixedWidth { + byte_width: mem::size_of::(), + alignment: mem::align_of::(), + }], + can_contain_null_mask: true, + variadic: true, + } + } } /// Layout specification for a single data type buffer diff --git a/arrow-data/src/equal/list_view.rs b/arrow-data/src/equal/list_view.rs new file mode 100644 index 000000000000..afbd876737d2 --- /dev/null +++ b/arrow-data/src/equal/list_view.rs @@ -0,0 +1,51 @@ +use num::Integer; +use arrow_buffer::ArrowNativeType; +use crate::ArrayData; +use crate::data::count_nulls; + +use super::equal_range; + +pub(super) fn list_view_equal( + lhs: &ArrayData, + rhs: &ArrayData, + lhs_start: usize, + rhs_start: usize, + len: usize, +) -> bool { + let lhs_offsets = lhs.buffer::(0); + let rhs_offsets = rhs.buffer::(0); + let lhs_sizes = lhs.buffer::(1); + let rhs_sizes = rhs.buffer::(1); + for i in 0..len { + + // compare offsets + let lhs_pos = lhs_start + i; + let rhs_pos = rhs_start + i; + let lhs_offset_start = lhs_offsets[lhs_pos].to_usize().unwrap(); + let rhs_offset_start = rhs_offsets[rhs_pos].to_usize().unwrap(); + let lhs_len = lhs_sizes[lhs_pos].to_usize().unwrap(); + let rhs_len = rhs_sizes[rhs_pos].to_usize().unwrap(); + if lhs_len != rhs_len { + return false; + } + + // compare nulls + let lhs_null_count = count_nulls(lhs.nulls(), lhs_offset_start, lhs_len); + let rhs_null_count = count_nulls(rhs.nulls(), rhs_offset_start, rhs_len); + if lhs_null_count != rhs_null_count { + return false; + } + + // compare values + if !equal_range( + &lhs.child_data()[0], + &rhs.child_data()[0], + lhs_offset_start, + rhs_offset_start, + lhs_len, + ) { + return false; + } + } + true +} diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs index dba6a0186a56..0bc0cf56277c 100644 --- a/arrow-data/src/equal/mod.rs +++ b/arrow-data/src/equal/mod.rs @@ -37,6 +37,7 @@ mod structure; mod union; mod utils; mod variable_size; +mod list_view; // these methods assume the same type, len and null count. // For this reason, they are not exposed and are instead used @@ -52,6 +53,7 @@ use primitive::primitive_equal; use structure::struct_equal; use union::union_equal; use variable_size::variable_sized_equal; +use crate::equal::list_view::list_view_equal; use self::run::run_equal; @@ -102,10 +104,9 @@ fn equal_values( byte_view_equal(lhs, rhs, lhs_start, rhs_start, len) } DataType::List(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), - DataType::ListView(_) | DataType::LargeListView(_) => { - unimplemented!("ListView/LargeListView not yet implemented") - } - DataType::LargeList(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), + DataType::ListView(_) => list_view_equal::(lhs, rhs, lhs_start, rhs_start, len), + DataType::LargeListView(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), + DataType::LargeList(_) => list_view_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::FixedSizeList(_, _) => fixed_list_equal(lhs, rhs, lhs_start, rhs_start, len), DataType::Struct(_) => struct_equal(lhs, rhs, lhs_start, rhs_start, len), DataType::Union(_, _) => union_equal(lhs, rhs, lhs_start, rhs_start, len),