From 8be6078093314199ed2ad000a421460a544b1ad1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 29 Aug 2024 21:04:24 +0800 Subject: [PATCH 01/13] add feature gate Signed-off-by: Ruihang Xia --- arrow-buffer/Cargo.toml | 3 + arrow-buffer/src/buffer/mutable.rs | 151 +++++++++++++++++++++++++---- arrow-buffer/src/lib.rs | 1 + 3 files changed, 138 insertions(+), 17 deletions(-) diff --git a/arrow-buffer/Cargo.toml b/arrow-buffer/Cargo.toml index 8bc33b1874e4..89726267a13c 100644 --- a/arrow-buffer/Cargo.toml +++ b/arrow-buffer/Cargo.toml @@ -33,6 +33,9 @@ name = "arrow_buffer" path = "src/lib.rs" bench = false +[features] +allocator_api = [] + [dependencies] bytes = { version = "1.4" } num = { version = "0.4", default-features = false, features = ["std"] } diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 7fcbd89dd262..9aaac764b59c 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +#[cfg(feature = "allocator_api")] +use std::alloc::{Allocator, Global}; + use std::alloc::{handle_alloc_error, Layout}; use std::mem; use std::ptr::NonNull; @@ -28,6 +31,23 @@ use crate::{ use super::Buffer; +#[cfg(not(feature = "allocator_api"))] +pub trait Allocator: private::Sealed {} + +#[cfg(not(feature = "allocator_api"))] +impl Allocator for Global {} + +#[cfg(not(feature = "allocator_api"))] +#[derive(Debug)] +pub struct Global; + +#[cfg(not(feature = "allocator_api"))] +mod private { + pub trait Sealed {} + + impl Sealed for super::Global {} +} + /// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items. /// /// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned @@ -51,12 +71,20 @@ use super::Buffer; /// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0]) /// ``` #[derive(Debug)] -pub struct MutableBuffer { +pub struct MutableBuffer< + #[cfg(feature = "allocator_api")] A: Allocator = Global, + #[cfg(not(feature = "allocator_api"))] A: Allocator = Global, +> { // dangling iff capacity = 0 data: NonNull, // invariant: len <= capacity len: usize, layout: Layout, + #[cfg(feature = "allocator_api")] + allocator: A, + #[cfg(not(feature = "allocator_api"))] + #[doc = "Placeholder for allocator API"] + allocator: A, } impl MutableBuffer { @@ -83,14 +111,16 @@ impl MutableBuffer { 0 => dangling_ptr(), _ => { // Safety: Verified size != 0 - let raw_ptr = unsafe { std::alloc::alloc(layout) }; - NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) + // let raw_ptr = unsafe { Self::alloc(Global, layout) }; + // NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) + unsafe { Self::alloc(&Global, layout) } } }; Self { data, len: 0, layout, + allocator: Global, } } @@ -115,7 +145,12 @@ impl MutableBuffer { NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) } }; - Self { data, len, layout } + Self { + data, + len, + layout, + allocator: Global, + } } /// Create a [`MutableBuffer`] from the provided [`Vec`] without copying @@ -136,7 +171,12 @@ impl MutableBuffer { let data = bytes.ptr(); mem::forget(bytes); - Ok(Self { data, len, layout }) + Ok(Self { + data, + len, + layout, + allocator: Global, + }) } /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits. @@ -207,23 +247,62 @@ impl MutableBuffer { #[cold] fn reallocate(&mut self, capacity: usize) { let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); + + // shrink to zero if new_layout.size() == 0 { if self.layout.size() != 0 { // Safety: data was allocated with layout - unsafe { std::alloc::dealloc(self.as_mut_ptr(), self.layout) }; + unsafe { Self::dealloc(&self.allocator, self.data, self.layout) }; self.layout = new_layout } return; } - let data = match self.layout.size() { - // Safety: new_layout is not empty - 0 => unsafe { std::alloc::alloc(new_layout) }, - // Safety: verified new layout is valid and not empty - _ => unsafe { std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity) }, - }; - self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); - self.layout = new_layout; + #[cfg(feature = "allocator_api")] + match new_layout.size().cmp(&self.layout.size()) { + std::cmp::Ordering::Equal => { + // no action needed + return; + } + std::cmp::Ordering::Less => { + // shrink to new capacity + let new_data = unsafe { + self.allocator + .shrink(self.data, self.layout, new_layout) + .unwrap_or_else(|_| handle_alloc_error(new_layout)) + .cast() + }; + self.layout = new_layout; + self.data = new_data; + return; + } + std::cmp::Ordering::Greater => { + // grow to new capacity + let new_data = unsafe { + self.allocator + .grow(self.data, self.layout, new_layout) + .unwrap_or_else(|_| handle_alloc_error(new_layout)) + .cast() + }; + self.layout = new_layout; + self.data = new_data; + } + } + + #[cfg(not(feature = "allocator_api"))] + { + self.data = match self.layout.size() { + // Safety: new_layout is not empty + 0 => unsafe { Self::alloc(&self.allocator, new_layout) }, + // Safety: verified new layout is valid and not empty + _ => unsafe { + let new_data = std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity); + NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout)) + }, + }; + // self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); + self.layout = new_layout; + } } /// Truncates this buffer to `len` bytes @@ -483,6 +562,39 @@ impl MutableBuffer { } } +/// `allocator_api` related internal methods +impl MutableBuffer { + #[inline] + unsafe fn alloc(_alloc: &A, layout: Layout) -> NonNull { + #[cfg(feature = "allocator_api")] + { + _alloc + .allocate(layout) + .unwrap_or_else(|_| handle_alloc_error(layout)) + .cast() + } + + #[cfg(not(feature = "allocator_api"))] + { + let data = std::alloc::alloc(layout); + NonNull::new(data).unwrap_or_else(|| handle_alloc_error(layout)) + } + } + + #[inline] + unsafe fn dealloc(_alloc: &A, ptr: NonNull, layout: Layout) { + #[cfg(feature = "allocator_api")] + { + _alloc.deallocate(ptr, layout) + } + + #[cfg(not(feature = "allocator_api"))] + { + std::alloc::dealloc(ptr.as_ptr(), layout) + } + } +} + #[inline] fn dangling_ptr() -> NonNull { // SAFETY: ALIGNMENT is a non-zero usize which is then casted @@ -518,7 +630,12 @@ impl From> for MutableBuffer { // This is based on `RawVec::current_memory` let layout = unsafe { Layout::array::(value.capacity()).unwrap_unchecked() }; mem::forget(value); - Self { data, len, layout } + Self { + data, + len, + layout, + allocator: Global, + } } } @@ -688,11 +805,11 @@ impl std::ops::DerefMut for MutableBuffer { } } -impl Drop for MutableBuffer { +impl Drop for MutableBuffer { fn drop(&mut self) { if self.layout.size() != 0 { // Safety: data was allocated with standard allocator with given layout - unsafe { std::alloc::dealloc(self.data.as_ptr() as _, self.layout) }; + unsafe { Self::dealloc(&self.allocator, self.data, self.layout) }; } } } diff --git a/arrow-buffer/src/lib.rs b/arrow-buffer/src/lib.rs index a7bf93ed0c16..e854f4165626 100644 --- a/arrow-buffer/src/lib.rs +++ b/arrow-buffer/src/lib.rs @@ -19,6 +19,7 @@ // used by [`buffer::mutable::dangling_ptr`] #![cfg_attr(miri, feature(strict_provenance))] +#![cfg_attr(feature = "allocator_api", feature(allocator_api))] pub mod alloc; pub mod buffer; From c12e5f891b2d85c6b7aa2b5ada3d2004f2a3a4ed Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 29 Aug 2024 21:05:13 +0800 Subject: [PATCH 02/13] move reallocate Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 122 ++++++++++++++--------------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 9aaac764b59c..ddf674975596 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -244,67 +244,6 @@ impl MutableBuffer { } } - #[cold] - fn reallocate(&mut self, capacity: usize) { - let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); - - // shrink to zero - if new_layout.size() == 0 { - if self.layout.size() != 0 { - // Safety: data was allocated with layout - unsafe { Self::dealloc(&self.allocator, self.data, self.layout) }; - self.layout = new_layout - } - return; - } - - #[cfg(feature = "allocator_api")] - match new_layout.size().cmp(&self.layout.size()) { - std::cmp::Ordering::Equal => { - // no action needed - return; - } - std::cmp::Ordering::Less => { - // shrink to new capacity - let new_data = unsafe { - self.allocator - .shrink(self.data, self.layout, new_layout) - .unwrap_or_else(|_| handle_alloc_error(new_layout)) - .cast() - }; - self.layout = new_layout; - self.data = new_data; - return; - } - std::cmp::Ordering::Greater => { - // grow to new capacity - let new_data = unsafe { - self.allocator - .grow(self.data, self.layout, new_layout) - .unwrap_or_else(|_| handle_alloc_error(new_layout)) - .cast() - }; - self.layout = new_layout; - self.data = new_data; - } - } - - #[cfg(not(feature = "allocator_api"))] - { - self.data = match self.layout.size() { - // Safety: new_layout is not empty - 0 => unsafe { Self::alloc(&self.allocator, new_layout) }, - // Safety: verified new layout is valid and not empty - _ => unsafe { - let new_data = std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity); - NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout)) - }, - }; - // self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); - self.layout = new_layout; - } - } - /// Truncates this buffer to `len` bytes /// /// If `len` is greater than the buffer's current length, this has no effect @@ -593,6 +532,67 @@ impl MutableBuffer { std::alloc::dealloc(ptr.as_ptr(), layout) } } + + #[cold] + fn reallocate(&mut self, capacity: usize) { + let new_layout = Layout::from_size_align(capacity, self.layout.align()).unwrap(); + + // shrink to zero + if new_layout.size() == 0 { + if self.layout.size() != 0 { + // Safety: data was allocated with layout + unsafe { Self::dealloc(&self.allocator, self.data, self.layout) }; + self.layout = new_layout + } + return; + } + + #[cfg(feature = "allocator_api")] + match new_layout.size().cmp(&self.layout.size()) { + std::cmp::Ordering::Equal => { + // no action needed + return; + } + std::cmp::Ordering::Less => { + // shrink to new capacity + let new_data = unsafe { + self.allocator + .shrink(self.data, self.layout, new_layout) + .unwrap_or_else(|_| handle_alloc_error(new_layout)) + .cast() + }; + self.layout = new_layout; + self.data = new_data; + return; + } + std::cmp::Ordering::Greater => { + // grow to new capacity + let new_data = unsafe { + self.allocator + .grow(self.data, self.layout, new_layout) + .unwrap_or_else(|_| handle_alloc_error(new_layout)) + .cast() + }; + self.layout = new_layout; + self.data = new_data; + } + } + + #[cfg(not(feature = "allocator_api"))] + { + self.data = match self.layout.size() { + // Safety: new_layout is not empty + 0 => unsafe { Self::alloc(&self.allocator, new_layout) }, + // Safety: verified new layout is valid and not empty + _ => unsafe { + let new_data = std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity); + NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout)) + }, + }; + // self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); + self.layout = new_layout; + } + } } #[inline] From 609d93ce907c5d94e5dc8562a8e44ff64067939a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 29 Aug 2024 21:18:38 +0800 Subject: [PATCH 03/13] adapt from vec Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 54 ++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index ddf674975596..92f1ac327fa5 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -111,8 +111,6 @@ impl MutableBuffer { 0 => dangling_ptr(), _ => { // Safety: Verified size != 0 - // let raw_ptr = unsafe { Self::alloc(Global, layout) }; - // NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) unsafe { Self::alloc(&Global, layout) } } }; @@ -501,6 +499,45 @@ impl MutableBuffer { } } +#[cfg(feature = "allocator_api")] +impl MutableBuffer { + /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity` + /// in the given allocator. + /// + /// See [`MutableBuffer::with_capacity_in`]. + #[inline] + pub fn new_in(allocator: A, capacity: usize) -> Self { + Self::with_capacity_in(allocator, capacity) + } + + /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. + /// in the given allocator + /// + /// # Panics + /// + /// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater + /// then `isize::MAX`, then this function will panic. + #[inline] + pub fn with_capacity_in(allocator: A, capacity: usize) -> Self { + let capacity = bit_util::round_upto_multiple_of_64(capacity); + let layout = Layout::from_size_align(capacity, ALIGNMENT) + .expect("failed to create layout for MutableBuffer"); + let data = match layout.size() { + 0 => dangling_ptr(), + _ => { + // Safety: Verified size != 0 + unsafe { Self::alloc(&allocator, layout) } + } + }; + Self { + data, + len: 0, + layout, + allocator, + } + } +} + /// `allocator_api` related internal methods impl MutableBuffer { #[inline] @@ -585,7 +622,7 @@ impl MutableBuffer { 0 => unsafe { Self::alloc(&self.allocator, new_layout) }, // Safety: verified new layout is valid and not empty _ => unsafe { - let new_data = std::alloc::realloc(self.as_mut_ptr(), self.layout, capacity); + let new_data = std::alloc::realloc(self.data.as_ptr(), self.layout, capacity); NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout)) }, }; @@ -629,13 +666,18 @@ impl From> for MutableBuffer { // Vec guaranteed to have a valid layout matching that of `Layout::array` // This is based on `RawVec::current_memory` let layout = unsafe { Layout::array::(value.capacity()).unwrap_unchecked() }; - mem::forget(value); - Self { + let zelf = Self { data, len, layout, + #[cfg(not(feature = "allocator_api"))] allocator: Global, - } + #[cfg(feature = "allocator_api")] + allocator: *value.allocator(), + }; + + mem::forget(value); + zelf } } From 227e6674afd99552aba3b30b1786636c322f6eeb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 30 Aug 2024 12:20:16 +0800 Subject: [PATCH 04/13] midify workflow and miri shell Signed-off-by: Ruihang Xia --- .github/workflows/arrow.yml | 4 ++-- .github/workflows/miri.sh | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index d3b2526740fa..d069b9a4b7fe 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -61,8 +61,8 @@ jobs: submodules: true - name: Setup Rust toolchain uses: ./.github/actions/setup-builder - - name: Test arrow-buffer with all features - run: cargo test -p arrow-buffer --all-features + - name: Test arrow-buffer + run: cargo test -p arrow-buffer - name: Test arrow-data with all features run: cargo test -p arrow-data --all-features - name: Test arrow-schema with all features diff --git a/.github/workflows/miri.sh b/.github/workflows/miri.sh index 86be2100ee67..e26a878989c1 100755 --- a/.github/workflows/miri.sh +++ b/.github/workflows/miri.sh @@ -12,7 +12,7 @@ cargo miri setup cargo clean echo "Starting Arrow MIRI run..." -cargo miri test -p arrow-buffer +cargo miri test -p arrow-buffer --features allocator_api cargo miri test -p arrow-data --features ffi cargo miri test -p arrow-schema --features ffi cargo miri test -p arrow-ord From 05fd1cba719b2019916c471c4623f063f72e1941 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 30 Aug 2024 12:36:53 +0800 Subject: [PATCH 05/13] also modify clippy command Signed-off-by: Ruihang Xia --- .github/workflows/arrow.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index d069b9a4b7fe..a02161c3d25e 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -163,8 +163,8 @@ jobs: uses: ./.github/actions/setup-builder - name: Setup Clippy run: rustup component add clippy - - name: Clippy arrow-buffer with all features - run: cargo clippy -p arrow-buffer --all-targets --all-features -- -D warnings + - name: Clippy arrow-buffer + run: cargo clippy -p arrow-buffer --all-targets -- -D warnings - name: Clippy arrow-data with all features run: cargo clippy -p arrow-data --all-targets --all-features -- -D warnings - name: Clippy arrow-schema with all features From cbcbc51dd69501c0ebe10e8fe41a4d1fe6892e07 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 30 Aug 2024 21:59:15 +0800 Subject: [PATCH 06/13] fix style and clippy Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 15 ++++++++------- arrow-buffer/src/builder/null.rs | 1 + arrow-buffer/src/util/bit_mask.rs | 8 +++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 92f1ac327fa5..8ef97519c357 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -32,6 +32,8 @@ use crate::{ use super::Buffer; #[cfg(not(feature = "allocator_api"))] +#[doc = "Placeholder trait for std::alloc::Allocator. To appoint a real allocator,"] +#[doc = "please enable `allocator_api` feature with nightly toolchain"] pub trait Allocator: private::Sealed {} #[cfg(not(feature = "allocator_api"))] @@ -541,10 +543,10 @@ impl MutableBuffer { /// `allocator_api` related internal methods impl MutableBuffer { #[inline] - unsafe fn alloc(_alloc: &A, layout: Layout) -> NonNull { + unsafe fn alloc(alloc: &A, layout: Layout) -> NonNull { #[cfg(feature = "allocator_api")] { - _alloc + alloc .allocate(layout) .unwrap_or_else(|_| handle_alloc_error(layout)) .cast() @@ -552,20 +554,22 @@ impl MutableBuffer { #[cfg(not(feature = "allocator_api"))] { + let _ = alloc; let data = std::alloc::alloc(layout); NonNull::new(data).unwrap_or_else(|| handle_alloc_error(layout)) } } #[inline] - unsafe fn dealloc(_alloc: &A, ptr: NonNull, layout: Layout) { + unsafe fn dealloc(alloc: &A, ptr: NonNull, layout: Layout) { #[cfg(feature = "allocator_api")] { - _alloc.deallocate(ptr, layout) + alloc.deallocate(ptr, layout) } #[cfg(not(feature = "allocator_api"))] { + let _ = alloc; std::alloc::dealloc(ptr.as_ptr(), layout) } } @@ -588,7 +592,6 @@ impl MutableBuffer { match new_layout.size().cmp(&self.layout.size()) { std::cmp::Ordering::Equal => { // no action needed - return; } std::cmp::Ordering::Less => { // shrink to new capacity @@ -600,7 +603,6 @@ impl MutableBuffer { }; self.layout = new_layout; self.data = new_data; - return; } std::cmp::Ordering::Greater => { // grow to new capacity @@ -626,7 +628,6 @@ impl MutableBuffer { NonNull::new(new_data).unwrap_or_else(|| handle_alloc_error(new_layout)) }, }; - // self.data = NonNull::new(data).unwrap_or_else(|| handle_alloc_error(new_layout)); self.layout = new_layout; } } diff --git a/arrow-buffer/src/builder/null.rs b/arrow-buffer/src/builder/null.rs index a1cea6ef2cca..ce5e1dc34aa0 100644 --- a/arrow-buffer/src/builder/null.rs +++ b/arrow-buffer/src/builder/null.rs @@ -18,6 +18,7 @@ use crate::{BooleanBufferBuilder, MutableBuffer, NullBuffer}; /// Builder for creating the null bit buffer. +/// /// This builder only materializes the buffer when we append `false`. /// If you only append `true`s to the builder, what you get will be /// `None` when calling [`finish`](#method.finish). diff --git a/arrow-buffer/src/util/bit_mask.rs b/arrow-buffer/src/util/bit_mask.rs index 8f81cb7d0469..0a43faf304ac 100644 --- a/arrow-buffer/src/util/bit_mask.rs +++ b/arrow-buffer/src/util/bit_mask.rs @@ -20,9 +20,11 @@ use crate::bit_chunk_iterator::BitChunks; use crate::bit_util::{ceil, get_bit, set_bit}; -/// Sets all bits on `write_data` in the range `[offset_write..offset_write+len]` to be equal to the -/// bits in `data` in the range `[offset_read..offset_read+len]` -/// returns the number of `0` bits `data[offset_read..offset_read+len]` +/// Sets bits by range. +/// +/// Sets all bits on `write_data` in the range `[offset_write..offset_write+len]` +/// to be equal to the bits in `data` in the range `[offset_read..offset_read+len]`. +/// Returns the number of `0` bits `data[offset_read..offset_read+len]`. pub fn set_bits( write_data: &mut [u8], data: &[u8], From b8d2924589afe16756a6ccd79dd995b6eb4a6121 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 2 Sep 2024 14:47:56 +0800 Subject: [PATCH 07/13] add document and example Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 7 +++++ arrow/Cargo.toml | 5 ++++ arrow/README.md | 1 + arrow/examples/allocator_api.rs | 43 ++++++++++++++++++++++++++++++ 4 files changed, 56 insertions(+) create mode 100644 arrow/examples/allocator_api.rs diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 8ef97519c357..fdb6053a4303 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -72,6 +72,13 @@ mod private { /// let buffer: Buffer = buffer.into(); /// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0]) /// ``` +/// +/// # Customize [`Allocator`] +/// +/// To customize the allocator for the buffer, enable the `allocator_api` feature and use either +/// methods like [`MutableBuffer::new_in`] or [`MutableBuffer::with_capacity_in`], or inherit the +/// allocator from a type like [`Vec`] using [`MutableBuffer::from`]. A example can be found in +/// the [allocator_api example](https://github.com/apache/arrow-rs/tree/master/arrow/examples). #[derive(Debug)] pub struct MutableBuffer< #[cfg(feature = "allocator_api")] A: Allocator = Global, diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index a0fd96415a1d..e18f2cae8ebb 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -81,6 +81,7 @@ force_validate = ["arrow-array/force_validate", "arrow-data/force_validate"] # Enable ffi support ffi = ["arrow-schema/ffi", "arrow-data/ffi", "arrow-array/ffi"] chrono-tz = ["arrow-array/chrono-tz"] +allocator_api = ["arrow-buffer/allocator_api"] [dev-dependencies] chrono = { workspace = true } @@ -107,6 +108,10 @@ name = "read_csv_infer_schema" required-features = ["prettyprint", "csv"] path = "./examples/read_csv_infer_schema.rs" +[[example]] +name = "allocator_api" +path = "./examples/allocator_api.rs" + [[bench]] name = "aggregate_kernels" harness = false diff --git a/arrow/README.md b/arrow/README.md index 557a0b474e4b..7daf2c408295 100644 --- a/arrow/README.md +++ b/arrow/README.md @@ -61,6 +61,7 @@ The `arrow` crate provides the following features which may be enabled in your ` - `chrono-tz` - support of parsing timezone using [chrono-tz](https://docs.rs/chrono-tz/0.6.0/chrono_tz/) - `ffi` - bindings for the Arrow C [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) - `pyarrow` - bindings for pyo3 to call arrow-rs from python +- `allocator_api` - support for customizing memory [`Allocator`](https://doc.rust-lang.org/std/alloc/trait.Allocator.html) for underlying Array [`MutableBuffer`](https://docs.rs/arrow/latest/arrow/buffer/struct.MutableBuffer.html). This feature requires a nightly rust toolchain. ## Arrow Feature Status diff --git a/arrow/examples/allocator_api.rs b/arrow/examples/allocator_api.rs new file mode 100644 index 000000000000..3ca18d8dda57 --- /dev/null +++ b/arrow/examples/allocator_api.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Customizing [`Allocator`] for Arrow Array's underlying [`MutableBuffer`]. +//! +//! This module requires the `allocator_api` feature and a nightly channel Rust toolchain. + +fn main() { + demo(); +} + +#[cfg(not(feature = "allocator_api"))] +fn demo() { + println!("This example requires the `allocator_api` feature to be enabled."); +} + +#[cfg(feature = "allocator_api")] +fn demo() { + use arrow::buffer::MutableBuffer; + use std::alloc::Global; + + // Creates a mutable buffer with customized allocator + let mut buffer = MutableBuffer::::with_capacity_in(10, Global); + + // Inherits allocator from Vec + let vector = Vec::::with_capacity_in(100, Global); + let mut buffer = MutableBuffer::from(vector); + buffer.reserve(100); +} From 4934043f26b1db87cfb330507457ed2e6384ec9f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 18 Sep 2024 17:30:03 +0800 Subject: [PATCH 08/13] customize allocator in example Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 125 ++++++++++++++++++----------- arrow/examples/allocator_api.rs | 76 +++++++++++++++++- 2 files changed, 150 insertions(+), 51 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index fdb6053a4303..51a7b783e0b3 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -80,10 +80,7 @@ mod private { /// allocator from a type like [`Vec`] using [`MutableBuffer::from`]. A example can be found in /// the [allocator_api example](https://github.com/apache/arrow-rs/tree/master/arrow/examples). #[derive(Debug)] -pub struct MutableBuffer< - #[cfg(feature = "allocator_api")] A: Allocator = Global, - #[cfg(not(feature = "allocator_api"))] A: Allocator = Global, -> { +pub struct MutableBuffer { // dangling iff capacity = 0 data: NonNull, // invariant: len <= capacity @@ -96,7 +93,8 @@ pub struct MutableBuffer< allocator: A, } -impl MutableBuffer { +/// Constructors under default allocator +impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. /// /// See [`MutableBuffer::with_capacity`]. @@ -193,6 +191,45 @@ impl MutableBuffer { MutableBuffer::from_len_zeroed(num_bytes) } + /// Invokes `f` with values `0..len` collecting the boolean results into a new `MutableBuffer` + /// + /// This is similar to `from_trusted_len_iter_bool`, however, can be significantly faster + /// as it eliminates the conditional `Iterator::next` + #[inline] + pub fn collect_bool bool>(len: usize, mut f: F) -> Self { + let mut buffer = Self::new(bit_util::ceil(len, 64) * 8); + + let chunks = len / 64; + let remainder = len % 64; + for chunk in 0..chunks { + let mut packed = 0; + for bit_idx in 0..64 { + let i = bit_idx + chunk * 64; + packed |= (f(i) as u64) << bit_idx; + } + + // SAFETY: Already allocated sufficient capacity + unsafe { buffer.push_unchecked(packed) } + } + + if remainder != 0 { + let mut packed = 0; + for bit_idx in 0..remainder { + let i = bit_idx + chunks * 64; + packed |= (f(i) as u64) << bit_idx; + } + + // SAFETY: Already allocated sufficient capacity + unsafe { buffer.push_unchecked(packed) } + } + + buffer.truncate(bit_util::ceil(len, 8)); + buffer + } +} + +/// General methods +impl MutableBuffer { /// Set the bits in the range of `[0, end)` to 0 (if `val` is false), or 1 (if `val` /// is true). Also extend the length of this buffer to be `end`. /// @@ -334,12 +371,12 @@ impl MutableBuffer { /// Returns the data stored in this buffer as a slice. pub fn as_slice(&self) -> &[u8] { - self + unsafe { std::slice::from_raw_parts(self.data.as_ptr(), self.len) } } /// Returns the data stored in this buffer as a mutable slice. pub fn as_slice_mut(&mut self) -> &mut [u8] { - self + unsafe { std::slice::from_raw_parts_mut(self.data.as_ptr(), self.len) } } /// Returns a raw pointer to this buffer's internal memory @@ -470,53 +507,18 @@ impl MutableBuffer { assert!(len <= self.capacity()); self.len = len; } - - /// Invokes `f` with values `0..len` collecting the boolean results into a new `MutableBuffer` - /// - /// This is similar to `from_trusted_len_iter_bool`, however, can be significantly faster - /// as it eliminates the conditional `Iterator::next` - #[inline] - pub fn collect_bool bool>(len: usize, mut f: F) -> Self { - let mut buffer = Self::new(bit_util::ceil(len, 64) * 8); - - let chunks = len / 64; - let remainder = len % 64; - for chunk in 0..chunks { - let mut packed = 0; - for bit_idx in 0..64 { - let i = bit_idx + chunk * 64; - packed |= (f(i) as u64) << bit_idx; - } - - // SAFETY: Already allocated sufficient capacity - unsafe { buffer.push_unchecked(packed) } - } - - if remainder != 0 { - let mut packed = 0; - for bit_idx in 0..remainder { - let i = bit_idx + chunks * 64; - packed |= (f(i) as u64) << bit_idx; - } - - // SAFETY: Already allocated sufficient capacity - unsafe { buffer.push_unchecked(packed) } - } - - buffer.truncate(bit_util::ceil(len, 8)); - buffer - } } #[cfg(feature = "allocator_api")] +/// Constructors for custom allocator impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity` /// in the given allocator. /// /// See [`MutableBuffer::with_capacity_in`]. #[inline] - pub fn new_in(allocator: A, capacity: usize) -> Self { - Self::with_capacity_in(allocator, capacity) + pub fn new_in(capacity: usize, allocator: A) -> Self { + Self::with_capacity_in(capacity, allocator) } /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. @@ -527,7 +529,7 @@ impl MutableBuffer { /// If `capacity`, when rounded up to the nearest multiple of [`ALIGNMENT`], is greater /// then `isize::MAX`, then this function will panic. #[inline] - pub fn with_capacity_in(allocator: A, capacity: usize) -> Self { + pub fn with_capacity_in(capacity: usize, allocator: A) -> Self { let capacity = bit_util::round_upto_multiple_of_64(capacity); let layout = Layout::from_size_align(capacity, ALIGNMENT) .expect("failed to create layout for MutableBuffer"); @@ -664,6 +666,7 @@ impl Extend for MutableBuffer { } } +#[cfg(not(feature = "allocator_api"))] impl From> for MutableBuffer { fn from(value: Vec) -> Self { // Safety @@ -678,10 +681,34 @@ impl From> for MutableBuffer { data, len, layout, - #[cfg(not(feature = "allocator_api"))] allocator: Global, - #[cfg(feature = "allocator_api")] - allocator: *value.allocator(), + }; + + mem::forget(value); + zelf + } +} + +#[cfg(feature = "allocator_api")] +impl From> for MutableBuffer +where + T: ArrowNativeType, + A: Allocator + Clone, +{ + fn from(value: Vec) -> Self { + // Safety + // Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable + let data = unsafe { NonNull::new_unchecked(value.as_ptr() as _) }; + let len = value.len() * mem::size_of::(); + // Safety + // Vec guaranteed to have a valid layout matching that of `Layout::array` + // This is based on `RawVec::current_memory` + let layout = unsafe { Layout::array::(value.capacity()).unwrap_unchecked() }; + let zelf = Self { + data, + len, + layout, + allocator: value.allocator().clone(), }; mem::forget(value); diff --git a/arrow/examples/allocator_api.rs b/arrow/examples/allocator_api.rs index 3ca18d8dda57..de6a67decf69 100644 --- a/arrow/examples/allocator_api.rs +++ b/arrow/examples/allocator_api.rs @@ -19,6 +19,8 @@ //! //! This module requires the `allocator_api` feature and a nightly channel Rust toolchain. +#![cfg_attr(feature = "allocator_api", feature(allocator_api))] + fn main() { demo(); } @@ -28,16 +30,86 @@ fn demo() { println!("This example requires the `allocator_api` feature to be enabled."); } +#[cfg(feature = "allocator_api")] +mod allocator { + /// A simple allocator tracker that records and reports memory usage. + #[derive(Clone)] + pub struct AllocatorTracker + where + A: std::alloc::Allocator + Clone, + { + usage: std::sync::Arc, + alloc: A, + } + + impl AllocatorTracker + where + A: std::alloc::Allocator + Clone, + { + pub fn new(alloc: A) -> Self { + Self { + usage: std::sync::Arc::new(std::sync::atomic::AtomicIsize::new(0)), + alloc, + } + } + + pub fn report_usage(&self) -> isize { + self.usage.load(std::sync::atomic::Ordering::Relaxed) + } + } + + unsafe impl std::alloc::Allocator for AllocatorTracker + where + A: std::alloc::Allocator + Clone, + { + fn allocate( + &self, + layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + let size = layout.size(); + self.usage + .fetch_add(size as isize, std::sync::atomic::Ordering::Relaxed); + + self.alloc.allocate(layout) + } + + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: std::alloc::Layout) { + let size = layout.size(); + self.usage + .fetch_sub(size as isize, std::sync::atomic::Ordering::Relaxed); + + self.alloc.deallocate(ptr, layout) + } + } +} + #[cfg(feature = "allocator_api")] fn demo() { use arrow::buffer::MutableBuffer; use std::alloc::Global; + let allocator_tracker = allocator::AllocatorTracker::new(Global); + // Creates a mutable buffer with customized allocator - let mut buffer = MutableBuffer::::with_capacity_in(10, Global); + let mut buffer = + MutableBuffer::>::with_capacity_in( + 10, + allocator_tracker.clone(), + ); + println!( + "Current memory usage: {} bytes", + allocator_tracker.report_usage() + ); // Inherits allocator from Vec - let vector = Vec::::with_capacity_in(100, Global); + let vector = Vec::>::with_capacity_in( + 100, + allocator_tracker.clone(), + ); let mut buffer = MutableBuffer::from(vector); buffer.reserve(100); + println!( + "Current memory usage: {} bytes", + allocator_tracker.report_usage() + ); } From 36eccf6c6ddf64d32dc5928f914b0eb6f4b95b6c Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 18 Sep 2024 19:19:32 +0800 Subject: [PATCH 09/13] modify ci for nightly feature Signed-off-by: Ruihang Xia --- .github/workflows/arrow.yml | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index a02161c3d25e..74b2ae3a0fbd 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -192,8 +192,27 @@ jobs: - name: Clippy arrow-row with all features run: cargo clippy -p arrow-row --all-targets --all-features -- -D warnings - name: Clippy arrow with all features - run: cargo clippy -p arrow --all-features --all-targets -- -D warnings + # `allocator_api` is ignored as it requires nightly toolchain + run: cargo clippy -p arrow -F csv -F json -F ipc -F ipc_compression -F prettyprint -F chrono-tz -F ffi -F pyarrow --all-targets -- -D warnings - name: Clippy arrow-integration-test with all features run: cargo clippy -p arrow-integration-test --all-targets --all-features -- -D warnings - name: Clippy arrow-integration-testing with all features run: cargo clippy -p arrow-integration-testing --all-targets --all-features -- -D warnings + + clippy-nightly: + name: Clippy + runs-on: ubuntu-latest + container: + image: amd64/rust + steps: + - uses: actions/checkout@v4 + - name: Setup Rust toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: nightly + - name: Setup Clippy + run: rustup component add clippy + - name: Clippy arrow-buffer with all features + run: cargo clippy -p arrow-buffer --all-features --all-targets -- -D warnings + - name: Clippy arrow-buffer with all features + run: cargo clippy -p arrow --all-features --all-targets -- -D warnings From fa587a0a1017f45389e64e10ca7774c98843f2c6 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 18 Sep 2024 19:28:36 +0800 Subject: [PATCH 10/13] remove one job Signed-off-by: Ruihang Xia --- .github/workflows/arrow.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/arrow.yml b/.github/workflows/arrow.yml index 74b2ae3a0fbd..1e47335a096e 100644 --- a/.github/workflows/arrow.yml +++ b/.github/workflows/arrow.yml @@ -214,5 +214,3 @@ jobs: run: rustup component add clippy - name: Clippy arrow-buffer with all features run: cargo clippy -p arrow-buffer --all-features --all-targets -- -D warnings - - name: Clippy arrow-buffer with all features - run: cargo clippy -p arrow --all-features --all-targets -- -D warnings From 6b28b10e17f79bcca12904ba51991125f38aecdf Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 19 Sep 2024 11:25:11 +0800 Subject: [PATCH 11/13] limit freeze methods to only available for the default allocator Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 43 ++++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index 51a7b783e0b3..e99f755485de 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -93,7 +93,7 @@ pub struct MutableBuffer { allocator: A, } -/// Constructors under default allocator +/// Constructors when using the default allocator [`Global`](std::alloc::Global) impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. /// @@ -226,6 +226,31 @@ impl MutableBuffer { buffer.truncate(bit_util::ceil(len, 8)); buffer } + + #[deprecated( + since = "2.0.0", + note = "This method is deprecated in favour of `into` from the trait `Into`." + )] + /// Freezes this buffer and return an immutable version of it. + /// + /// This method is only available under the default [`Global`](std::alloc::Global) + /// for now. Support for custom allocators will be added in a future release. + /// Related ticket: https://github.com/apache/arrow-rs/issues/3960 + pub fn freeze(self) -> Buffer { + self.into_buffer() + } + + /// Freezes this buffer and return an immutable version of it. + /// + /// This method is only available under the default [`Global`](std::alloc::Global) + /// for now. Support for custom allocators will be added in a future release. + /// Related ticket: https://github.com/apache/arrow-rs/issues/3960 + #[inline] + pub(super) fn into_buffer(self) -> Buffer { + let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) }; + std::mem::forget(self); + Buffer::from_bytes(bytes) + } } /// General methods @@ -393,22 +418,6 @@ impl MutableBuffer { self.data.as_ptr() } - #[deprecated( - since = "2.0.0", - note = "This method is deprecated in favour of `into` from the trait `Into`." - )] - /// Freezes this buffer and return an immutable version of it. - pub fn freeze(self) -> Buffer { - self.into_buffer() - } - - #[inline] - pub(super) fn into_buffer(self) -> Buffer { - let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) }; - std::mem::forget(self); - Buffer::from_bytes(bytes) - } - /// View this buffer as a mutable slice of a specific type. /// /// # Panics From ceef77277e8bfc89ebfd0b8af3a292a19b38f229 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 19 Sep 2024 11:39:17 +0800 Subject: [PATCH 12/13] fix doc links Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index e99f755485de..aa0fe12ac90f 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -41,6 +41,8 @@ impl Allocator for Global {} #[cfg(not(feature = "allocator_api"))] #[derive(Debug)] +/// A placeholder for [`Global`](std::alloc::Global) as it's not available +/// without `allocator_api` feature enabled. pub struct Global; #[cfg(not(feature = "allocator_api"))] @@ -93,7 +95,7 @@ pub struct MutableBuffer { allocator: A, } -/// Constructors when using the default allocator [`Global`](std::alloc::Global) +/// Constructors when using the default allocator [`Global`] impl MutableBuffer { /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. /// @@ -233,19 +235,19 @@ impl MutableBuffer { )] /// Freezes this buffer and return an immutable version of it. /// - /// This method is only available under the default [`Global`](std::alloc::Global) + /// This method is only available under the default [`Global`] /// for now. Support for custom allocators will be added in a future release. - /// Related ticket: https://github.com/apache/arrow-rs/issues/3960 + /// Related ticket: pub fn freeze(self) -> Buffer { self.into_buffer() } + #[inline] /// Freezes this buffer and return an immutable version of it. /// - /// This method is only available under the default [`Global`](std::alloc::Global) + /// This method is only available under the default [`Global`] /// for now. Support for custom allocators will be added in a future release. - /// Related ticket: https://github.com/apache/arrow-rs/issues/3960 - #[inline] + /// Related ticket: pub(super) fn into_buffer(self) -> Buffer { let bytes = unsafe { Bytes::new(self.data, self.len, Deallocation::Standard(self.layout)) }; std::mem::forget(self); From 02d73ad4dc6c8b5b38ee13bbb65f26379bff386d Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 19 Sep 2024 11:47:24 +0800 Subject: [PATCH 13/13] add unit test for custom allocator Signed-off-by: Ruihang Xia --- arrow-buffer/src/buffer/mutable.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index aa0fe12ac90f..0df165e61400 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -1230,4 +1230,32 @@ mod tests { let max_capacity = isize::MAX as usize - (isize::MAX as usize % ALIGNMENT); let _ = MutableBuffer::with_capacity(max_capacity + 1); } + + #[test] + #[cfg(feature = "allocator_api")] + fn mutable_buffer_with_custom_allocator() { + struct MyAllocator; + + unsafe impl Allocator for MyAllocator { + fn allocate( + &self, + layout: std::alloc::Layout, + ) -> Result, std::alloc::AllocError> { + Global.allocate(layout) + } + + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: std::alloc::Layout) { + Global.deallocate(ptr, layout) + } + } + + let mut buffer = MutableBuffer::new_in(100, MyAllocator); + buffer.extend_from_slice(b"hello"); + assert_eq!(5, buffer.len()); + assert_eq!(b"hello", buffer.as_slice()); + + buffer.reserve(200); + buffer.shrink_to_fit(); + buffer.clear(); + } }