From 91603fd2ddf803557aa3a958ecf4e241e080cab4 Mon Sep 17 00:00:00 2001 From: ritchie Date: Sat, 20 Jan 2024 13:04:18 +0100 Subject: [PATCH] feat: fix parquet for binview --- crates/polars-arrow/src/array/binview/mod.rs | 3 +- .../polars-arrow/src/array/binview/mutable.rs | 4 +- crates/polars-arrow/src/array/binview/view.rs | 22 +++-- crates/polars-core/src/fmt.rs | 24 ++++-- crates/polars-io/src/parquet/read_impl.rs | 80 ++++++++++--------- .../src/arrow/read/deserialize/mod.rs | 2 + 6 files changed, 83 insertions(+), 52 deletions(-) diff --git a/crates/polars-arrow/src/array/binview/mod.rs b/crates/polars-arrow/src/array/binview/mod.rs index 769db31d736f..44b0de62217f 100644 --- a/crates/polars-arrow/src/array/binview/mod.rs +++ b/crates/polars-arrow/src/array/binview/mod.rs @@ -401,7 +401,8 @@ impl BinaryViewArrayGeneric { impl BinaryViewArray { /// Validate the underlying bytes on UTF-8. pub fn validate_utf8(&self) -> PolarsResult<()> { - validate_utf8_only(&self.views, &self.buffers) + // SAFETY: views are correct + unsafe { validate_utf8_only(&self.views, &self.buffers) } } /// Convert [`BinaryViewArray`] to [`Utf8ViewArray`]. diff --git a/crates/polars-arrow/src/array/binview/mutable.rs b/crates/polars-arrow/src/array/binview/mutable.rs index e2ded543a5b0..73446ff10783 100644 --- a/crates/polars-arrow/src/array/binview/mutable.rs +++ b/crates/polars-arrow/src/array/binview/mutable.rs @@ -316,7 +316,9 @@ impl MutableBinaryViewArray { impl MutableBinaryViewArray<[u8]> { pub fn validate_utf8(&mut self) -> PolarsResult<()> { - validate_utf8_only(&self.views, &self.completed_buffers) + self.finish_in_progress(); + // views are correct + unsafe { validate_utf8_only(&self.views, &self.completed_buffers) } } } diff --git a/crates/polars-arrow/src/array/binview/view.rs b/crates/polars-arrow/src/array/binview/view.rs index 4152ec01fa35..059c312522c7 100644 --- a/crates/polars-arrow/src/array/binview/view.rs +++ b/crates/polars-arrow/src/array/binview/view.rs @@ -1,4 +1,5 @@ use polars_error::*; +use polars_utils::slice::GetSaferUnchecked; use crate::buffer::Buffer; @@ -85,18 +86,27 @@ pub(super) fn validate_utf8_view(views: &[u128], buffers: &[Buffer]) -> Pola validate_view(views, buffers, validate_utf8) } -pub(super) fn validate_utf8_only(views: &[u128], buffers: &[Buffer]) -> PolarsResult<()> { +/// # Safety +/// The views and buffers must uphold the invariants of BinaryView otherwise we will go OOB. +pub(super) unsafe fn validate_utf8_only( + views: &[u128], + buffers: &[Buffer], +) -> PolarsResult<()> { for view in views { let len = *view as u32; if len <= 12 { - validate_utf8(&view.to_le_bytes()[4..4 + len as usize])?; + validate_utf8( + view.to_le_bytes() + .get_unchecked_release(4..4 + len as usize), + )?; } else { - let view = View::from(*view); - let data = &buffers[view.buffer_idx as usize]; + let buffer_idx = (*view >> 64) as u32; + let offset = (*view >> 96) as u32; + let data = buffers.get_unchecked_release(buffer_idx as usize); - let start = view.offset as usize; + let start = offset as usize; let end = start + len as usize; - let b = &data.as_slice()[start..end]; + let b = &data.as_slice().get_unchecked_release(start..end); validate_utf8(b)?; }; } diff --git a/crates/polars-core/src/fmt.rs b/crates/polars-core/src/fmt.rs index 2bd70c3659fd..3dc423982996 100644 --- a/crates/polars-core/src/fmt.rs +++ b/crates/polars-core/src/fmt.rs @@ -497,6 +497,14 @@ fn fmt_df_shape((shape0, shape1): &(usize, usize)) -> String { ) } +fn get_str_width() -> usize { + std::env::var(FMT_STR_LEN) + .as_deref() + .unwrap_or("") + .parse() + .unwrap_or(32) +} + impl Display for DataFrame { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { #[cfg(any(feature = "fmt", feature = "fmt_no_tty"))] @@ -506,11 +514,7 @@ impl Display for DataFrame { self.columns.iter().all(|s| s.len() == height), "The column lengths in the DataFrame are not equal." ); - let str_truncate = std::env::var(FMT_STR_LEN) - .as_deref() - .unwrap_or("") - .parse() - .unwrap_or(32); + let str_truncate = get_str_width(); let max_n_cols = std::env::var(FMT_MAX_COLS) .as_deref() @@ -984,8 +988,14 @@ impl Display for AnyValue<'_> { AnyValue::String(v) => write!(f, "{}", format_args!("\"{v}\"")), AnyValue::StringOwned(v) => write!(f, "{}", format_args!("\"{v}\"")), AnyValue::Binary(d) => { - let s = String::from_utf8_lossy(d); - write!(f, "{}", format_args!("b\"{s}\"")) + let max_width = get_str_width() * 2; + if d.len() > max_width { + let s = String::from_utf8_lossy(&d[..max_width]); + write!(f, "{}", format_args!("b\"{s}...\"")) + } else { + let s = String::from_utf8_lossy(d); + write!(f, "{}", format_args!("b\"{s}\"")) + } }, AnyValue::BinaryOwned(d) => { let s = String::from_utf8_lossy(d); diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 312d30dc6e94..91089703712f 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -297,48 +297,54 @@ fn rg_to_dfs_par_over_rg( }) .collect::>(); - let dfs = row_groups - .into_par_iter() - .map(|(rg_idx, md, projection_height, row_count_start)| { - if projection_height == 0 - || use_statistics - && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? - { - return Ok(None); - } - // test we don't read the parquet file if this env var is set - #[cfg(debug_assertions)] - { - assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) - } + let dfs = POOL.install(|| { + row_groups + .into_par_iter() + .map(|(rg_idx, md, projection_height, row_count_start)| { + if projection_height == 0 + || use_statistics + && !read_this_row_group( + predicate, + &file_metadata.row_groups[rg_idx], + schema, + )? + { + return Ok(None); + } + // test we don't read the parquet file if this env var is set + #[cfg(debug_assertions)] + { + assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) + } - let chunk_size = md.num_rows(); - let columns = projection - .iter() - .map(|column_i| { - column_idx_to_series( - *column_i, - md, - projection_height, - schema, - store, - chunk_size, - ) - }) - .collect::>>()?; + let chunk_size = md.num_rows(); + let columns = projection + .iter() + .map(|column_i| { + column_idx_to_series( + *column_i, + md, + projection_height, + schema, + store, + chunk_size, + ) + }) + .collect::>>()?; - let mut df = DataFrame::new_no_checks(columns); + let mut df = DataFrame::new_no_checks(columns); - if let Some(rc) = &row_index { - df.with_row_index_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset)); - } + if let Some(rc) = &row_index { + df.with_row_index_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset)); + } - materialize_hive_partitions(&mut df, hive_partition_columns, projection_height); - apply_predicate(&mut df, predicate, false)?; + materialize_hive_partitions(&mut df, hive_partition_columns, projection_height); + apply_predicate(&mut df, predicate, false)?; - Ok(Some(df)) - }) - .collect::>>()?; + Ok(Some(df)) + }) + .collect::>>() + })?; Ok(dfs.into_iter().flatten().collect()) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs index 0d55700cfade..1ea087c06171 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/mod.rs @@ -119,6 +119,8 @@ fn is_primitive(data_type: &ArrowDataType) -> bool { | arrow::datatypes::PhysicalType::Utf8 | arrow::datatypes::PhysicalType::LargeUtf8 | arrow::datatypes::PhysicalType::Binary + | arrow::datatypes::PhysicalType::BinaryView + | arrow::datatypes::PhysicalType::Utf8View | arrow::datatypes::PhysicalType::LargeBinary | arrow::datatypes::PhysicalType::FixedSizeBinary | arrow::datatypes::PhysicalType::Dictionary(_)