Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/barak1412/polars into expos…
Browse files Browse the repository at this point in the history
…e_regex_escape
  • Loading branch information
barak1412 committed Oct 21, 2024
2 parents f2a95af + eb596c9 commit 2dde33f
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 23 deletions.
137 changes: 116 additions & 21 deletions crates/polars-arrow/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,48 @@ use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU64, Ordering};

use bytemuck::Pod;

// Allows us to transmute between types while also keeping the original
// stats and drop method of the Vec around.
struct VecVTable {
size: usize,
align: usize,
drop_buffer: unsafe fn(*mut (), usize),
}

impl VecVTable {
const fn new<T>() -> Self {
unsafe fn drop_buffer<T>(ptr: *mut (), cap: usize) {
unsafe { drop(Vec::from_raw_parts(ptr.cast::<T>(), 0, cap)) }
}

Self {
size: size_of::<T>(),
align: align_of::<T>(),
drop_buffer: drop_buffer::<T>,
}
}

fn new_static<T>() -> &'static Self {
const { &Self::new::<T>() }
}
}

use crate::ffi::InternalArrowArray;

enum BackingStorage {
Vec { capacity: usize },
Vec {
original_capacity: usize, // Elements, not bytes.
vtable: &'static VecVTable,
},
InternalArrowArray(InternalArrowArray),
}

struct SharedStorageInner<T> {
ref_count: AtomicU64,
ptr: *mut T,
length: usize,
length_in_bytes: usize,
backing: Option<BackingStorage>,
// https://github.com/rust-lang/rfcs/blob/master/text/0769-sound-generic-drop.md#phantom-data
phantom: PhantomData<T>,
Expand All @@ -24,8 +55,20 @@ impl<T> Drop for SharedStorageInner<T> {
fn drop(&mut self) {
match self.backing.take() {
Some(BackingStorage::InternalArrowArray(a)) => drop(a),
Some(BackingStorage::Vec { capacity }) => unsafe {
drop(Vec::from_raw_parts(self.ptr, self.length, capacity))
Some(BackingStorage::Vec {
original_capacity,
vtable,
}) => unsafe {
// Drop the elements in our slice.
if std::mem::needs_drop::<T>() {
core::ptr::drop_in_place(core::ptr::slice_from_raw_parts_mut(
self.ptr,
self.length_in_bytes / size_of::<T>(),
));
}

// Free the buffer.
(vtable.drop_buffer)(self.ptr.cast(), original_capacity);
},
None => {},
}
Expand All @@ -42,12 +85,13 @@ unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}

impl<T> SharedStorage<T> {
pub fn from_static(slice: &'static [T]) -> Self {
let length = slice.len();
#[expect(clippy::manual_slice_size_calculation)]
let length_in_bytes = slice.len() * size_of::<T>();
let ptr = slice.as_ptr().cast_mut();
let inner = SharedStorageInner {
ref_count: AtomicU64::new(2), // Never used, but 2 so it won't pass exclusivity tests.
ptr,
length,
length_in_bytes,
backing: None,
phantom: PhantomData,
};
Expand All @@ -58,15 +102,18 @@ impl<T> SharedStorage<T> {
}

pub fn from_vec(mut v: Vec<T>) -> Self {
let length = v.len();
let capacity = v.capacity();
let length_in_bytes = v.len() * size_of::<T>();
let original_capacity = v.capacity();
let ptr = v.as_mut_ptr();
core::mem::forget(v);
let inner = SharedStorageInner {
ref_count: AtomicU64::new(1),
ptr,
length,
backing: Some(BackingStorage::Vec { capacity }),
length_in_bytes,
backing: Some(BackingStorage::Vec {
original_capacity,
vtable: VecVTable::new_static::<T>(),
}),
phantom: PhantomData,
};
Self {
Expand All @@ -79,7 +126,7 @@ impl<T> SharedStorage<T> {
let inner = SharedStorageInner {
ref_count: AtomicU64::new(1),
ptr: ptr.cast_mut(),
length: len,
length_in_bytes: len * size_of::<T>(),
backing: Some(BackingStorage::InternalArrowArray(arr)),
phantom: PhantomData,
};
Expand All @@ -93,7 +140,7 @@ impl<T> SharedStorage<T> {
impl<T> SharedStorage<T> {
#[inline(always)]
pub fn len(&self) -> usize {
self.inner().length
self.inner().length_in_bytes / size_of::<T>()
}

#[inline(always)]
Expand Down Expand Up @@ -121,21 +168,35 @@ impl<T> SharedStorage<T> {
pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> {
self.is_exclusive().then(|| {
let inner = self.inner();
unsafe { core::slice::from_raw_parts_mut(inner.ptr, inner.length) }
let len = inner.length_in_bytes / size_of::<T>();
unsafe { core::slice::from_raw_parts_mut(inner.ptr, len) }
})
}

pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
let Some(BackingStorage::Vec { capacity }) = self.inner().backing else {
// We may only go back to a Vec if we originally came from a Vec
// where the desired size/align matches the original.
let Some(BackingStorage::Vec {
original_capacity,
vtable,
}) = self.inner().backing
else {
return Err(self);
};
if self.is_exclusive() {
let slf = ManuallyDrop::new(self);
let inner = slf.inner();
Ok(unsafe { Vec::from_raw_parts(inner.ptr, inner.length, capacity) })
} else {
Err(self)

if vtable.size != size_of::<T>() || vtable.align != align_of::<T>() {
return Err(self);
}

// If there are other references we can't go back to an owned Vec.
if !self.is_exclusive() {
return Err(self);
}

let slf = ManuallyDrop::new(self);
let inner = slf.inner();
let len = inner.length_in_bytes / size_of::<T>();
Ok(unsafe { Vec::from_raw_parts(inner.ptr, len, original_capacity) })
}

#[inline(always)]
Expand All @@ -151,14 +212,48 @@ impl<T> SharedStorage<T> {
}
}

impl<T: Pod> SharedStorage<T> {
fn try_transmute<U: Pod>(self) -> Result<SharedStorage<U>, Self> {
let inner = self.inner();

// The length of the array in bytes must be a multiple of the target size.
// We can skip this check if the size of U divides the size of T.
if size_of::<T>() % size_of::<U>() != 0 && inner.length_in_bytes % size_of::<U>() != 0 {
return Err(self);
}

// The pointer must be properly aligned for U.
// We can skip this check if the alignment of U divides the alignment of T.
if align_of::<T>() % align_of::<U>() != 0 && !inner.ptr.cast::<U>().is_aligned() {
return Err(self);
}

Ok(SharedStorage {
inner: self.inner.cast(),
phantom: PhantomData,
})
}
}

impl SharedStorage<u8> {
/// Create a [`SharedStorage<u8>`][SharedStorage] from a [`Vec`] of [`Pod`].
pub fn bytes_from_pod_vec<T: Pod>(v: Vec<T>) -> Self {
// This can't fail, bytes is compatible with everything.
SharedStorage::from_vec(v)
.try_transmute::<u8>()
.unwrap_or_else(|_| unreachable!())
}
}

impl<T> Deref for SharedStorage<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &Self::Target {
unsafe {
let inner = self.inner();
core::slice::from_raw_parts(inner.ptr, inner.length)
let len = inner.length_in_bytes / size_of::<T>();
core::slice::from_raw_parts(inner.ptr, len)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions docs/source/user-guide/migration/pandas.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ eager evaluation. The lazy evaluation mode is powerful because Polars carries ou
automatic query optimization when it examines the query plan and looks for ways to
accelerate the query or reduce memory usage.

`Dask` also supports lazy evaluation when it generates a query plan. However, `Dask`
does not carry out query optimization on the query plan.
`Dask` also supports lazy evaluation when it generates a query plan.

## Key syntax differences

Expand Down

0 comments on commit 2dde33f

Please sign in to comment.