Skip to content

Commit

Permalink
fix parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 19, 2024
1 parent 71ce6d4 commit 301729c
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 16 deletions.
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
}
let mut mutable = MutableBinaryViewArray::with_capacity(self.len());
let buffers = self.raw_buffers.as_ref();
dbg!(self.buffers.as_ref());

for view in self.views.as_ref() {
unsafe { mutable.push_view(*view, buffers) }
Expand Down Expand Up @@ -454,13 +453,14 @@ impl<T: ViewType + ?Sized> Array for BinaryViewArrayGeneric<T> {
}

unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
debug_assert!(offset + length <= self.len(),);
debug_assert!(offset + length <= self.len());
self.validity = self
.validity
.take()
.map(|bitmap| bitmap.sliced_unchecked(offset, length))
.filter(|bitmap| bitmap.unset_bits() > 0);
self.views.slice_unchecked(offset, length);
self.total_bytes_len = self.len_iter().map(|v| v as usize).sum::<usize>();
}

fn with_validity(&self, validity: Option<Bitmap>) -> Box<dyn Array> {
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,8 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
debug_assert!(self.views.capacity() > self.views.len());
self.views.push_unchecked(v)
} else {
dbg!(View::from(v));
self.total_buffer_len += len as usize;
let buffer_idx = (v >> 64) as u32;
dbg!(buffer_idx);
let offset = (v >> 96) as u32;
let (data_ptr, data_len) = *buffers.get_unchecked_release(buffer_idx as usize);
let data = std::slice::from_raw_parts(data_ptr, data_len);
Expand Down
16 changes: 9 additions & 7 deletions crates/polars-parquet/src/arrow/read/schema/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ pub fn read_schema_from_metadata(metadata: &mut Metadata) -> PolarsResult<Option
fn convert_field(field: Field) -> Field {
Field {
name: field.name,
data_type: convert_data_type(field.data_type),
data_type: convert_data_type(field.data_type, false),
is_nullable: field.is_nullable,
metadata: field.metadata,
}
}

fn convert_data_type(data_type: ArrowDataType) -> ArrowDataType {
fn convert_data_type(data_type: ArrowDataType, in_dict: bool) -> ArrowDataType {
use ArrowDataType::*;
match data_type {
List(field) => LargeList(Box::new(convert_field(*field))),
Expand All @@ -37,14 +37,16 @@ fn convert_data_type(data_type: ArrowDataType) -> ArrowDataType {
}
Struct(fields)
},
Binary => LargeBinary,
Utf8 => LargeUtf8,
Binary | LargeBinary if in_dict => LargeBinary,
Binary | LargeBinary => BinaryView,
Utf8 | LargeUtf8 if in_dict => LargeUtf8,
Utf8 | LargeUtf8 => Utf8View,
Dictionary(it, data_type, sorted) => {
let dtype = convert_data_type(*data_type);
let dtype = convert_data_type(*data_type, true);
Dictionary(it, Box::new(dtype), sorted)
},
Extension(name, data_type, metadata) => {
let data_type = convert_data_type(*data_type);
let data_type = convert_data_type(*data_type, false);
Extension(name, Box::new(data_type), metadata)
},
Map(field, _ordered) => {
Expand All @@ -70,7 +72,7 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> PolarsResult<ArrowSchem
let mut schema = deserialize_schema(slice).map(|x| x.0)?;
// Convert the data types to the data types we support.
for field in schema.fields.iter_mut() {
field.data_type = convert_data_type(std::mem::take(&mut field.data_type))
field.data_type = convert_data_type(std::mem::take(&mut field.data_type), false)
}
Ok(schema)
},
Expand Down
29 changes: 29 additions & 0 deletions crates/polars-parquet/src/arrow/read/statistics/binview.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use arrow::array::{MutableArray, MutableBinaryViewArray, ViewType};
use polars_error::PolarsResult;

use crate::parquet::statistics::{BinaryStatistics, Statistics as ParquetStatistics};

pub(super) fn push<T: ViewType + ?Sized>(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> PolarsResult<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutableBinaryViewArray<T>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutableBinaryViewArray<T>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<BinaryStatistics>().unwrap());
min.push(from.and_then(|s| {
let opt_b = s.min_value.as_deref();
unsafe { opt_b.map(|b| T::from_bytes_unchecked(b)) }
}));
max.push(from.and_then(|s| {
let opt_b = s.max_value.as_deref();
unsafe { opt_b.map(|b| T::from_bytes_unchecked(b)) }
}));
Ok(())
}
10 changes: 10 additions & 0 deletions crates/polars-parquet/src/arrow/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::parquet::statistics::{
use crate::parquet::types::int96_to_i64_ns;

mod binary;
mod binview;
mod boolean;
mod dictionary;
mod fixlen;
Expand Down Expand Up @@ -196,6 +197,12 @@ fn make_mutable(data_type: &ArrowDataType, capacity: usize) -> PolarsResult<Box<
PhysicalType::Null => {
Box::new(MutableNullArray::new(ArrowDataType::Null, 0)) as Box<dyn MutableArray>
},
PhysicalType::BinaryView => {
Box::new(MutableBinaryViewArray::<[u8]>::with_capacity(capacity))
as Box<dyn MutableArray>
},
PhysicalType::Utf8View => Box::new(MutableBinaryViewArray::<str>::with_capacity(capacity))
as Box<dyn MutableArray>,
other => {
polars_bail!(
nyi = "deserializing parquet stats from {other:?} is still not implemented"
Expand Down Expand Up @@ -535,7 +542,10 @@ fn push(
LargeBinary => binary::push::<i64>(from, min, max),
Utf8 => utf8::push::<i32>(from, min, max),
LargeUtf8 => utf8::push::<i64>(from, min, max),
BinaryView => binview::push::<[u8]>(from, min, max),
Utf8View => binview::push::<str>(from, min, max),
FixedSizeBinary(_) => fixlen::push(from, min, max),

Null => null::push(min, max),
other => todo!("{:?}", other),
}
Expand Down
46 changes: 45 additions & 1 deletion crates/polars-parquet/src/arrow/write/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,52 @@ use crate::parquet::schema::types::{
};
use crate::parquet::schema::Repetition;

fn convert_field(field: Field) -> Field {
Field {
name: field.name,
data_type: convert_data_type(field.data_type),
is_nullable: field.is_nullable,
metadata: field.metadata,
}
}

fn convert_data_type(data_type: ArrowDataType) -> ArrowDataType {
use ArrowDataType::*;
match data_type {
LargeList(field) => LargeList(Box::new(convert_field(*field))),
Struct(mut fields) => {
for field in &mut fields {
*field = convert_field(std::mem::take(field))
}
Struct(fields)
},
BinaryView => LargeBinary,
Utf8View => LargeUtf8,
Dictionary(it, data_type, sorted) => {
let dtype = convert_data_type(*data_type);
Dictionary(it, Box::new(dtype), sorted)
},
Extension(name, data_type, metadata) => {
let data_type = convert_data_type(*data_type);
Extension(name, Box::new(data_type), metadata)
},
dt => dt,
}
}

pub fn schema_to_metadata_key(schema: &ArrowSchema) -> KeyValue {
let serialized_schema = schema_to_bytes(schema, &default_ipc_fields(&schema.fields));
// Convert schema until more arrow readers are aware of binview
let serialized_schema = if schema.fields.iter().any(|field| field.data_type.is_view()) {
let fields = schema
.fields
.iter()
.map(|field| convert_field(field.clone()))
.collect::<Vec<_>>();
let schema = ArrowSchema::from(fields);
schema_to_bytes(&schema, &default_ipc_fields(&schema.fields))
} else {
schema_to_bytes(schema, &default_ipc_fields(&schema.fields))
};

// manually prepending the length to the schema as arrow uses the legacy IPC format
// TODO: change after addressing ARROW-9777
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-pipe/src/executors/sinks/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl IOThread {
path.push(format!("{count}.ipc"));

let file = File::create(path).unwrap();
let writer = IpcWriter::new(file).with_pl_flavor(true);
let writer = IpcWriter::new(file).with_pl_flavor(false);
let mut writer = writer.batched(&schema).unwrap();
writer.write_batch(&df).unwrap();
writer.finish().unwrap();
Expand All @@ -163,7 +163,7 @@ impl IOThread {
path.push(format!("{count}.ipc"));

let file = File::create(path).unwrap();
let writer = IpcWriter::new(file).with_pl_flavor(true);
let writer = IpcWriter::new(file).with_pl_flavor(false);
let mut writer = writer.batched(&schema).unwrap();

for df in iter {
Expand Down Expand Up @@ -199,7 +199,7 @@ impl IOThread {
path.push(format!("_{count}.ipc"));

let file = File::create(path).unwrap();
let mut writer = IpcWriter::new(file).with_pl_flavor(true);
let mut writer = IpcWriter::new(file).with_pl_flavor(false);
writer.finish(&mut df).unwrap();
} else {
let iter = Box::new(std::iter::once(df));
Expand Down Expand Up @@ -227,7 +227,7 @@ impl IOThread {
// duplicates
path.push(format!("_{count}.ipc"));
let file = File::create(path).unwrap();
let writer = IpcWriter::new(file).with_pl_flavor(true);
let writer = IpcWriter::new(file).with_pl_flavor(false);
let mut writer = writer.batched(&self.schema).unwrap();
writer.write_batch(&df).unwrap();
writer.finish().unwrap();
Expand Down

0 comments on commit 301729c

Please sign in to comment.