Skip to content

Commit

Permalink
refactor: Add AlignedBytes types
Browse files Browse the repository at this point in the history
This adds types that arrow `NativeType`s can cast to which can help to reduce
monomorphizations. This also adds the support to `SharedStorage` to keep use a
type with a stricter size and alignment constraint as `BackingStore` without
memcopying.

fyi @orlp
  • Loading branch information
coastalwhite committed Oct 18, 2024
1 parent 01a4e06 commit e2271e4
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 20 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ description = "Minimal implementation of the Arrow specification forked from arr

[dependencies]
atoi = { workspace = true, optional = true }
bytemuck = { workspace = true }
bytemuck = { workspace = true, features = [ "must_cast" ] }
chrono = { workspace = true }
# for timezone support
chrono-tz = { workspace = true, optional = true }
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-arrow/src/array/binview/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use polars_utils::total_ord::{TotalEq, TotalOrd};

use crate::buffer::Buffer;
use crate::datatypes::PrimitiveType;
use crate::types::NativeType;
use crate::types::{Bytes16Alignment4, NativeType};

// We use this instead of u128 because we want alignment of <= 8 bytes.
/// A reference to a set of bytes.
Expand Down Expand Up @@ -346,7 +346,9 @@ impl MinMax for View {

impl NativeType for View {
const PRIMITIVE: PrimitiveType = PrimitiveType::UInt128;

type Bytes = [u8; 16];
type AlignedBytes = Bytes16Alignment4;

#[inline]
fn to_le_bytes(&self) -> Self::Bytes {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<T> Buffer<T> {
}

/// Auxiliary method to create a new Buffer
pub(crate) fn from_storage(storage: SharedStorage<T>) -> Self {
pub fn from_storage(storage: SharedStorage<T>) -> Self {
let ptr = storage.as_ptr();
let length = storage.len();
Buffer {
Expand Down
125 changes: 121 additions & 4 deletions crates/polars-arrow/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,27 @@ use std::ops::Deref;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU64, Ordering};

use bytemuck::Pod;

use crate::ffi::InternalArrowArray;
use crate::types::{
AlignedBytes, Bytes12Alignment4, Bytes16Alignment16, Bytes16Alignment4, Bytes16Alignment8,
Bytes1Alignment1, Bytes2Alignment2, Bytes32Alignment16, Bytes4Alignment4, Bytes8Alignment4,
Bytes8Alignment8, NativeSizeAlignment,
};

enum BackingStorage {
Vec {
capacity: usize,

/// Size and alignment of the original vector type.
///
/// We have the following invariants:
/// - if this is Some(...) then all alignments involved are a power of 2
/// - align_of(Original) >= align_of(Current)
/// - size_of(Original) >= size_of(Current)
/// - size_of(Original) % size_of(Current) == 0
original_element_size_alignment: Option<NativeSizeAlignment>,
},
InternalArrowArray(InternalArrowArray),
#[cfg(feature = "arrow_rs")]
Expand All @@ -30,8 +46,53 @@ impl<T> Drop for SharedStorageInner<T> {
Some(BackingStorage::InternalArrowArray(a)) => drop(a),
#[cfg(feature = "arrow_rs")]
Some(BackingStorage::ArrowBuffer(b)) => drop(b),
Some(BackingStorage::Vec { capacity }) => unsafe {
drop(Vec::from_raw_parts(self.ptr, self.length, capacity))
Some(BackingStorage::Vec {
capacity,
original_element_size_alignment,
}) => {
#[inline]
unsafe fn drop_vec<T, O>(ptr: *mut T, length: usize, capacity: usize) {
let ptr = ptr.cast::<O>();
debug_assert!(ptr.is_aligned());

debug_assert!(size_of::<O>() >= size_of::<T>());
debug_assert_eq!(size_of::<O>() % size_of::<T>(), 0);

let scale_factor = size_of::<O>() / size_of::<T>();

// If the original element had a different size_of we need to rescale the
// length and capacity here.
let length = length / scale_factor;
let capacity = capacity / scale_factor;

// SAFETY:
// - The BackingStorage holds an invariants that make this safe
drop(unsafe { Vec::from_raw_parts(ptr, length, capacity) });
}

let ptr = self.ptr;
let length = self.length;

let Some(size_alignment) = original_element_size_alignment else {
unsafe { drop_vec::<T, T>(ptr, length, capacity) };
return;
};

use NativeSizeAlignment as SA;
unsafe {
match size_alignment {
SA::S1A1 => drop_vec::<T, Bytes1Alignment1>(ptr, length, capacity),
SA::S2A2 => drop_vec::<T, Bytes2Alignment2>(ptr, length, capacity),
SA::S4A4 => drop_vec::<T, Bytes4Alignment4>(ptr, length, capacity),
SA::S8A4 => drop_vec::<T, Bytes8Alignment4>(ptr, length, capacity),
SA::S8A8 => drop_vec::<T, Bytes8Alignment8>(ptr, length, capacity),
SA::S12A4 => drop_vec::<T, Bytes12Alignment4>(ptr, length, capacity),
SA::S16A4 => drop_vec::<T, Bytes16Alignment4>(ptr, length, capacity),
SA::S16A8 => drop_vec::<T, Bytes16Alignment8>(ptr, length, capacity),
SA::S16A16 => drop_vec::<T, Bytes16Alignment16>(ptr, length, capacity),
SA::S32A16 => drop_vec::<T, Bytes32Alignment16>(ptr, length, capacity),
}
}
},
None => {},
}
Expand Down Expand Up @@ -72,7 +133,10 @@ impl<T> SharedStorage<T> {
ref_count: AtomicU64::new(1),
ptr,
length,
backing: Some(BackingStorage::Vec { capacity }),
backing: Some(BackingStorage::Vec {
capacity,
original_element_size_alignment: None,
}),
phantom: PhantomData,
};
Self {
Expand Down Expand Up @@ -161,7 +225,11 @@ impl<T> SharedStorage<T> {
}

pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
let Some(BackingStorage::Vec { capacity }) = self.inner().backing else {
let Some(BackingStorage::Vec {
capacity,
original_element_size_alignment: None,
}) = self.inner().backing
else {
return Err(self);
};
if self.is_exclusive() {
Expand All @@ -186,6 +254,55 @@ impl<T> SharedStorage<T> {
}
}

impl<T: Pod> SharedStorage<T> {
/// Create a [`SharedStorage`] from a [`Vec`] of [`AlignedBytes`].
///
/// This will fail if the size and alignment requirements of `T` are stricter than `B`.
pub fn from_aligned_bytes_vec<B: AlignedBytes>(mut v: Vec<B>) -> Option<Self> {
if align_of::<B>() < align_of::<T>() {
return None;
}

// @NOTE: This is not a fundamental limitation, but something we impose for now. This makes
// calculating the capacity a lot easier.
if size_of::<B>() < size_of::<T>() || size_of::<B>() % size_of::<T>() != 0 {
return None;
}

let scale_factor = size_of::<B>() / size_of::<T>();

let length = v.len() * scale_factor;
let capacity = v.capacity() * scale_factor;
let ptr = v.as_mut_ptr().cast::<T>();
core::mem::forget(v);

let inner = SharedStorageInner {
ref_count: AtomicU64::new(1),
ptr,
length,
backing: Some(BackingStorage::Vec {
capacity,
original_element_size_alignment: Some(B::SIZE_ALIGNMENT_PAIR),
}),
phantom: PhantomData,
};

Some(Self {
inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
phantom: PhantomData,
})
}
}

impl SharedStorage<u8> {
/// Create a [`SharedStorage<u8>`][SharedStorage] from a [`Vec`] of [`AlignedBytes`].
///
/// This will never fail since `u8` has unit size and alignment.
pub fn bytes_from_aligned_bytes_vec<B: AlignedBytes>(v: Vec<B>) -> Self {
Self::from_aligned_bytes_vec(v).unwrap()
}
}

impl<T> Deref for SharedStorage<T> {
type Target = [T];

Expand Down
2 changes: 2 additions & 0 deletions crates/polars-arrow/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
//! Finally, this module contains traits used to compile code based on [`NativeType`] optimized
//! for SIMD, at [`mod@simd`].

mod aligned_bytes;
pub use aligned_bytes::*;
mod bit_chunk;
pub use bit_chunk::{BitChunk, BitChunkIter, BitChunkOnes};
mod index;
Expand Down
45 changes: 32 additions & 13 deletions crates/polars-arrow/src/types/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use polars_utils::min_max::MinMax;
use polars_utils::nulls::IsNull;
use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash, TotalOrd, TotalOrdWrap};

use super::aligned_bytes::*;
use super::PrimitiveType;

/// Sealed trait implemented by all physical types that can be allocated,
Expand All @@ -27,6 +28,7 @@ pub trait NativeType:
+ TotalOrd
+ IsNull
+ MinMax
+ AlignedBytesCast<Self::AlignedBytes>
{
/// The corresponding variant of [`PrimitiveType`].
const PRIMITIVE: PrimitiveType;
Expand All @@ -42,6 +44,11 @@ pub trait NativeType:
+ Default
+ IntoIterator<Item = u8>;

/// Type denoting its representation as aligned bytes.
///
/// This is `[u8; N]` where `N = size_of::<Self>` and has alignment `align_of::<Self>`.
type AlignedBytes: AlignedBytes<Unaligned = Self::Bytes> + From<Self> + Into<Self>;

/// To bytes in little endian
fn to_le_bytes(&self) -> Self::Bytes;

Expand All @@ -56,11 +63,13 @@ pub trait NativeType:
}

macro_rules! native_type {
($type:ty, $primitive_type:expr) => {
($type:ty, $aligned:ty, $primitive_type:expr) => {
impl NativeType for $type {
const PRIMITIVE: PrimitiveType = $primitive_type;

type Bytes = [u8; std::mem::size_of::<Self>()];
type AlignedBytes = $aligned;

#[inline]
fn to_le_bytes(&self) -> Self::Bytes {
Self::to_le_bytes(*self)
Expand All @@ -84,18 +93,18 @@ macro_rules! native_type {
};
}

native_type!(u8, PrimitiveType::UInt8);
native_type!(u16, PrimitiveType::UInt16);
native_type!(u32, PrimitiveType::UInt32);
native_type!(u64, PrimitiveType::UInt64);
native_type!(i8, PrimitiveType::Int8);
native_type!(i16, PrimitiveType::Int16);
native_type!(i32, PrimitiveType::Int32);
native_type!(i64, PrimitiveType::Int64);
native_type!(f32, PrimitiveType::Float32);
native_type!(f64, PrimitiveType::Float64);
native_type!(i128, PrimitiveType::Int128);
native_type!(u128, PrimitiveType::UInt128);
native_type!(u8, Bytes1Alignment1, PrimitiveType::UInt8);
native_type!(u16, Bytes2Alignment2, PrimitiveType::UInt16);
native_type!(u32, Bytes4Alignment4, PrimitiveType::UInt32);
native_type!(u64, Bytes8Alignment8, PrimitiveType::UInt64);
native_type!(i8, Bytes1Alignment1, PrimitiveType::Int8);
native_type!(i16, Bytes2Alignment2, PrimitiveType::Int16);
native_type!(i32, Bytes4Alignment4, PrimitiveType::Int32);
native_type!(i64, Bytes8Alignment8, PrimitiveType::Int64);
native_type!(f32, Bytes4Alignment4, PrimitiveType::Float32);
native_type!(f64, Bytes8Alignment8, PrimitiveType::Float64);
native_type!(i128, Bytes16Alignment16, PrimitiveType::Int128);
native_type!(u128, Bytes16Alignment16, PrimitiveType::UInt128);

/// The in-memory representation of the DayMillisecond variant of arrow's "Interval" logical type.
#[derive(Debug, Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Zeroable, Pod)]
Expand Down Expand Up @@ -151,7 +160,10 @@ impl MinMax for days_ms {

impl NativeType for days_ms {
const PRIMITIVE: PrimitiveType = PrimitiveType::DaysMs;

type Bytes = [u8; 8];
type AlignedBytes = Bytes8Alignment4;

#[inline]
fn to_le_bytes(&self) -> Self::Bytes {
let days = self.0.to_le_bytes();
Expand Down Expand Up @@ -289,7 +301,10 @@ impl MinMax for months_days_ns {

impl NativeType for months_days_ns {
const PRIMITIVE: PrimitiveType = PrimitiveType::MonthDayNano;

type Bytes = [u8; 16];
type AlignedBytes = Bytes16Alignment8;

#[inline]
fn to_le_bytes(&self) -> Self::Bytes {
let months = self.months().to_le_bytes();
Expand Down Expand Up @@ -658,7 +673,10 @@ impl MinMax for f16 {

impl NativeType for f16 {
const PRIMITIVE: PrimitiveType = PrimitiveType::Float16;

type Bytes = [u8; 2];
type AlignedBytes = Bytes2Alignment2;

#[inline]
fn to_le_bytes(&self) -> Self::Bytes {
self.0.to_le_bytes()
Expand Down Expand Up @@ -758,6 +776,7 @@ impl NativeType for i256 {
const PRIMITIVE: PrimitiveType = PrimitiveType::Int256;

type Bytes = [u8; 32];
type AlignedBytes = Bytes32Alignment16;

#[inline]
fn to_le_bytes(&self) -> Self::Bytes {
Expand Down

0 comments on commit e2271e4

Please sign in to comment.