From 45686edf6b07930b91619a07a1f3b4609a70b8e2 Mon Sep 17 00:00:00 2001 From: Ben <55319792+benrutter@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:22:35 +0100 Subject: [PATCH 1/2] docs: Tiny correction post dask-expr (#19354) --- docs/source/user-guide/migration/pandas.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/source/user-guide/migration/pandas.md b/docs/source/user-guide/migration/pandas.md index 5fa435278949..3d1f0996bdad 100644 --- a/docs/source/user-guide/migration/pandas.md +++ b/docs/source/user-guide/migration/pandas.md @@ -50,8 +50,7 @@ eager evaluation. The lazy evaluation mode is powerful because Polars carries ou automatic query optimization when it examines the query plan and looks for ways to accelerate the query or reduce memory usage. -`Dask` also supports lazy evaluation when it generates a query plan. However, `Dask` -does not carry out query optimization on the query plan. +`Dask` also supports lazy evaluation when it generates a query plan. ## Key syntax differences From eb596c96e13fc878612347cd6ce7f0138f3cdb0d Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Mon, 21 Oct 2024 17:23:28 +0200 Subject: [PATCH 2/2] refactor(rust): Add capability to transmute to SharedStorage (#19353) --- crates/polars-arrow/src/storage.rs | 137 ++++++++++++++++++++++++----- 1 file changed, 116 insertions(+), 21 deletions(-) diff --git a/crates/polars-arrow/src/storage.rs b/crates/polars-arrow/src/storage.rs index e7656f9d880b..6c71bfd6c385 100644 --- a/crates/polars-arrow/src/storage.rs +++ b/crates/polars-arrow/src/storage.rs @@ -4,17 +4,48 @@ use std::ops::Deref; use std::ptr::NonNull; use std::sync::atomic::{AtomicU64, Ordering}; +use bytemuck::Pod; + +// Allows us to transmute between types while also keeping the original +// stats and drop method of the Vec around. +struct VecVTable { + size: usize, + align: usize, + drop_buffer: unsafe fn(*mut (), usize), +} + +impl VecVTable { + const fn new() -> Self { + unsafe fn drop_buffer(ptr: *mut (), cap: usize) { + unsafe { drop(Vec::from_raw_parts(ptr.cast::(), 0, cap)) } + } + + Self { + size: size_of::(), + align: align_of::(), + drop_buffer: drop_buffer::, + } + } + + fn new_static() -> &'static Self { + const { &Self::new::() } + } +} + use crate::ffi::InternalArrowArray; enum BackingStorage { - Vec { capacity: usize }, + Vec { + original_capacity: usize, // Elements, not bytes. + vtable: &'static VecVTable, + }, InternalArrowArray(InternalArrowArray), } struct SharedStorageInner { ref_count: AtomicU64, ptr: *mut T, - length: usize, + length_in_bytes: usize, backing: Option, // https://github.com/rust-lang/rfcs/blob/master/text/0769-sound-generic-drop.md#phantom-data phantom: PhantomData, @@ -24,8 +55,20 @@ impl Drop for SharedStorageInner { fn drop(&mut self) { match self.backing.take() { Some(BackingStorage::InternalArrowArray(a)) => drop(a), - Some(BackingStorage::Vec { capacity }) => unsafe { - drop(Vec::from_raw_parts(self.ptr, self.length, capacity)) + Some(BackingStorage::Vec { + original_capacity, + vtable, + }) => unsafe { + // Drop the elements in our slice. + if std::mem::needs_drop::() { + core::ptr::drop_in_place(core::ptr::slice_from_raw_parts_mut( + self.ptr, + self.length_in_bytes / size_of::(), + )); + } + + // Free the buffer. + (vtable.drop_buffer)(self.ptr.cast(), original_capacity); }, None => {}, } @@ -42,12 +85,13 @@ unsafe impl Sync for SharedStorage {} impl SharedStorage { pub fn from_static(slice: &'static [T]) -> Self { - let length = slice.len(); + #[expect(clippy::manual_slice_size_calculation)] + let length_in_bytes = slice.len() * size_of::(); let ptr = slice.as_ptr().cast_mut(); let inner = SharedStorageInner { ref_count: AtomicU64::new(2), // Never used, but 2 so it won't pass exclusivity tests. ptr, - length, + length_in_bytes, backing: None, phantom: PhantomData, }; @@ -58,15 +102,18 @@ impl SharedStorage { } pub fn from_vec(mut v: Vec) -> Self { - let length = v.len(); - let capacity = v.capacity(); + let length_in_bytes = v.len() * size_of::(); + let original_capacity = v.capacity(); let ptr = v.as_mut_ptr(); core::mem::forget(v); let inner = SharedStorageInner { ref_count: AtomicU64::new(1), ptr, - length, - backing: Some(BackingStorage::Vec { capacity }), + length_in_bytes, + backing: Some(BackingStorage::Vec { + original_capacity, + vtable: VecVTable::new_static::(), + }), phantom: PhantomData, }; Self { @@ -79,7 +126,7 @@ impl SharedStorage { let inner = SharedStorageInner { ref_count: AtomicU64::new(1), ptr: ptr.cast_mut(), - length: len, + length_in_bytes: len * size_of::(), backing: Some(BackingStorage::InternalArrowArray(arr)), phantom: PhantomData, }; @@ -93,7 +140,7 @@ impl SharedStorage { impl SharedStorage { #[inline(always)] pub fn len(&self) -> usize { - self.inner().length + self.inner().length_in_bytes / size_of::() } #[inline(always)] @@ -121,21 +168,35 @@ impl SharedStorage { pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> { self.is_exclusive().then(|| { let inner = self.inner(); - unsafe { core::slice::from_raw_parts_mut(inner.ptr, inner.length) } + let len = inner.length_in_bytes / size_of::(); + unsafe { core::slice::from_raw_parts_mut(inner.ptr, len) } }) } pub fn try_into_vec(mut self) -> Result, Self> { - let Some(BackingStorage::Vec { capacity }) = self.inner().backing else { + // We may only go back to a Vec if we originally came from a Vec + // where the desired size/align matches the original. + let Some(BackingStorage::Vec { + original_capacity, + vtable, + }) = self.inner().backing + else { return Err(self); }; - if self.is_exclusive() { - let slf = ManuallyDrop::new(self); - let inner = slf.inner(); - Ok(unsafe { Vec::from_raw_parts(inner.ptr, inner.length, capacity) }) - } else { - Err(self) + + if vtable.size != size_of::() || vtable.align != align_of::() { + return Err(self); + } + + // If there are other references we can't go back to an owned Vec. + if !self.is_exclusive() { + return Err(self); } + + let slf = ManuallyDrop::new(self); + let inner = slf.inner(); + let len = inner.length_in_bytes / size_of::(); + Ok(unsafe { Vec::from_raw_parts(inner.ptr, len, original_capacity) }) } #[inline(always)] @@ -151,6 +212,39 @@ impl SharedStorage { } } +impl SharedStorage { + fn try_transmute(self) -> Result, Self> { + let inner = self.inner(); + + // The length of the array in bytes must be a multiple of the target size. + // We can skip this check if the size of U divides the size of T. + if size_of::() % size_of::() != 0 && inner.length_in_bytes % size_of::() != 0 { + return Err(self); + } + + // The pointer must be properly aligned for U. + // We can skip this check if the alignment of U divides the alignment of T. + if align_of::() % align_of::() != 0 && !inner.ptr.cast::().is_aligned() { + return Err(self); + } + + Ok(SharedStorage { + inner: self.inner.cast(), + phantom: PhantomData, + }) + } +} + +impl SharedStorage { + /// Create a [`SharedStorage`][SharedStorage] from a [`Vec`] of [`Pod`]. + pub fn bytes_from_pod_vec(v: Vec) -> Self { + // This can't fail, bytes is compatible with everything. + SharedStorage::from_vec(v) + .try_transmute::() + .unwrap_or_else(|_| unreachable!()) + } +} + impl Deref for SharedStorage { type Target = [T]; @@ -158,7 +252,8 @@ impl Deref for SharedStorage { fn deref(&self) -> &Self::Target { unsafe { let inner = self.inner(); - core::slice::from_raw_parts(inner.ptr, inner.length) + let len = inner.length_in_bytes / size_of::(); + core::slice::from_raw_parts(inner.ptr, len) } } }