Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Reduce memcopy in parquet #19350

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<'a> CoreReader<'a> {
if let Some(b) =
decompress(&reader_bytes, total_n_rows, separator, quote_char, eol_char)
{
reader_bytes = ReaderBytes::Owned(b);
reader_bytes = ReaderBytes::Owned(b.into());
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/schema_inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ fn infer_file_schema_inner(
buf.push(eol_char);

return infer_file_schema_inner(
&ReaderBytes::Owned(buf),
&ReaderBytes::Owned(buf.into()),
separator,
max_read_rows,
has_header,
Expand Down Expand Up @@ -481,7 +481,7 @@ fn infer_file_schema_inner(
rb.extend_from_slice(reader_bytes);
rb.push(eol_char);
return infer_file_schema_inner(
&ReaderBytes::Owned(rb),
&ReaderBytes::Owned(rb.into()),
separator,
max_read_rows,
has_header,
Expand Down
26 changes: 12 additions & 14 deletions crates/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::fs::File;
use std::io::{BufReader, Cursor, Read, Seek};
use std::sync::Arc;

use polars_core::config::verbose;
use polars_utils::mmap::{MMapSemaphore, MemSlice};
use polars_utils::mmap::MemSlice;

/// Trait used to get a hold to file handler or to the underlying bytes
/// without performing a Read.
Expand Down Expand Up @@ -67,8 +66,7 @@ impl<T: MmapBytesReader> MmapBytesReader for &mut T {
// Handle various forms of input bytes
pub enum ReaderBytes<'a> {
Borrowed(&'a [u8]),
Owned(Vec<u8>),
Mapped(MMapSemaphore, &'a File),
Owned(MemSlice),
}

impl std::ops::Deref for ReaderBytes<'_> {
Expand All @@ -77,19 +75,21 @@ impl std::ops::Deref for ReaderBytes<'_> {
match self {
Self::Borrowed(ref_bytes) => ref_bytes,
Self::Owned(vec) => vec,
Self::Mapped(mmap, _) => mmap.as_ref(),
}
}
}

/// Require 'static to force the caller to do any transmute as it's usually much
/// clearer to see there whether it's sound.
/// There are some places that perform manual lifetime management after transmuting `ReaderBytes`
/// to have a `'static` inner lifetime. The advantage to doing this is that it lets you construct a
/// `MemSlice` from the `ReaderBytes` in a zero-copy manner regardless of the underlying enum
/// variant.
impl ReaderBytes<'static> {
pub fn into_mem_slice(self) -> MemSlice {
/// Construct a `MemSlice` in a zero-copy manner from the underlying bytes, with the assumption
/// that the underlying bytes have a `'static` lifetime.
pub fn to_memslice(&self) -> MemSlice {
match self {
ReaderBytes::Borrowed(v) => MemSlice::from_static(v),
ReaderBytes::Owned(v) => MemSlice::from_vec(v),
ReaderBytes::Mapped(v, _) => MemSlice::from_mmap(Arc::new(v)),
ReaderBytes::Owned(v) => v.clone(),
}
}
}
Expand All @@ -104,16 +104,14 @@ impl<'a, T: 'a + MmapBytesReader> From<&'a mut T> for ReaderBytes<'a> {
},
None => {
if let Some(f) = m.to_file() {
let f = unsafe { std::mem::transmute::<&File, &'a File>(f) };
let mmap = MMapSemaphore::new_from_file(f).unwrap();
ReaderBytes::Mapped(mmap, f)
ReaderBytes::Owned(MemSlice::from_file(f).unwrap())
} else {
if verbose() {
eprintln!("could not memory map file; read to buffer.")
}
let mut buf = vec![];
m.read_to_end(&mut buf).expect("could not read");
ReaderBytes::Owned(buf)
ReaderBytes::Owned(MemSlice::from_vec(buf))
}
},
}
Expand Down
12 changes: 4 additions & 8 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use polars_parquet::parquet::statistics::Statistics;
use polars_parquet::read::{
self, ColumnChunkMetadata, FileMetadata, Filter, PhysicalType, RowGroupMetadata,
};
use polars_utils::mmap::MemSlice;
use rayon::prelude::*;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -908,10 +907,9 @@ pub fn read_parquet<R: MmapBytesReader>(
}

let reader = ReaderBytes::from(&mut reader);
let store = mmap::ColumnStore::Local(
unsafe { std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader) }
.into_mem_slice(),
);
let store = mmap::ColumnStore::Local(unsafe {
std::mem::transmute::<ReaderBytes<'_>, ReaderBytes<'static>>(reader).to_memslice()
});

let dfs = rg_to_dfs(
&store,
Expand Down Expand Up @@ -959,9 +957,7 @@ impl FetchRowGroupsFromMmapReader {

fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
// @TODO: we can something smarter here with mmap
Ok(mmap::ColumnStore::Local(MemSlice::from_vec(
self.0.deref().to_vec(),
)))
Ok(mmap::ColumnStore::Local(self.0.to_memslice()))
}
}

Expand Down
18 changes: 6 additions & 12 deletions crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use once_cell::sync::Lazy;
use polars_core::prelude::*;
#[cfg(any(feature = "ipc_streaming", feature = "parquet"))]
use polars_core::utils::{accumulate_dataframes_vertical_unchecked, split_df_as_ref};
use polars_utils::mmap::MMapSemaphore;
use polars_utils::mmap::{MMapSemaphore, MemSlice};
use regex::{Regex, RegexBuilder};

use crate::mmap::{MmapBytesReader, ReaderBytes};

pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
reader: &'a mut R,
) -> PolarsResult<ReaderBytes<'a>> {
pub fn get_reader_bytes<R: Read + MmapBytesReader + ?Sized>(
reader: &mut R,
) -> PolarsResult<ReaderBytes<'_>> {
// we have a file so we can mmap
// only seekable files are mmap-able
if let Some((file, offset)) = reader
Expand All @@ -23,14 +23,8 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
{
let mut options = memmap::MmapOptions::new();
options.offset(offset);

// somehow bck thinks borrows alias
// this is sound as file was already bound to 'a
use std::fs::File;

let file = unsafe { std::mem::transmute::<&File, &'a File>(file) };
let mmap = MMapSemaphore::new_from_file_with_options(file, options)?;
Ok(ReaderBytes::Mapped(mmap, file))
Ok(ReaderBytes::Owned(MemSlice::from_mmap(Arc::new(mmap))))
} else {
// we can get the bytes for free
if reader.to_bytes().is_some() {
Expand All @@ -40,7 +34,7 @@ pub fn get_reader_bytes<'a, R: Read + MmapBytesReader + ?Sized>(
// we have to read to an owned buffer to get the bytes.
let mut bytes = Vec::with_capacity(1024 * 128);
reader.read_to_end(&mut bytes)?;
Ok(ReaderBytes::Owned(bytes))
Ok(ReaderBytes::Owned(bytes.into()))
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions crates/polars-utils/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ mod private {
}
}

impl From<Vec<u8>> for MemSlice {
fn from(value: Vec<u8>) -> Self {
Self::from_vec(value)
}
}

impl MemSlice {
pub const EMPTY: Self = Self::from_static(&[]);

Expand Down
Loading