From 09004c56e6e4b96d3767e03a81f3eb228b3529f1 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 10:36:21 -0800 Subject: [PATCH 01/15] refactor: make ParquetSink tests a bit more readable --- .../src/datasource/file_format/parquet.rs | 153 ++++++++++-------- 1 file changed, 86 insertions(+), 67 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 383fd6575234..6d12b26402c3 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -2335,42 +2335,46 @@ mod tests { async fn parquet_sink_write() -> Result<()> { let parquet_sink = create_written_parquet_sink("file:///").await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet files to be written, instead found {}", - written.len() - ); + // assert written to proper path + let (path, file_metadata) = get_written(parquet_sink)?; + let path_parts = path.parts().collect::>(); + assert_eq!(path_parts.len(), 1, "should not have path prefix"); // check the file metadata - let ( - path, - FileMetaData { - num_rows, - schema, - key_value_metadata, - .. + let expected_kv_meta = vec![ + KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }, + KeyValue { + key: "my-data-bool-key".to_string(), + value: None, }, - ) = written.take(1).next().unwrap(); + ]; + assert_file_metadata(file_metadata, expected_kv_meta); + + Ok(()) + } + + #[tokio::test] + async fn parquet_sink_parallel_write() -> Result<()> { + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + ..Default::default() + }; + + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + + // assert written to proper path + let (path, file_metadata) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!(path_parts.len(), 1, "should not have path prefix"); - assert_eq!(num_rows, 2, "file metadata to have 2 rows"); - assert!( - schema.iter().any(|col_schema| col_schema.name == "a"), - "output file metadata should contain col a" - ); - assert!( - schema.iter().any(|col_schema| col_schema.name == "b"), - "output file metadata should contain col b" - ); - - let mut key_value_metadata = key_value_metadata.unwrap(); - key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key)); - let expected_metadata = vec![ + // check the file metadata + let expected_kv_meta = vec![ KeyValue { key: "my-data".to_string(), value: Some("stuff".to_string()), @@ -2380,7 +2384,7 @@ mod tests { value: None, }, ]; - assert_eq!(key_value_metadata, expected_metadata); + assert_file_metadata(file_metadata, expected_kv_meta); Ok(()) } @@ -2391,18 +2395,8 @@ mod tests { let file_path = format!("file:///path/to/{}", filename); let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet file to be written, instead found {}", - written.len() - ); - - let (path, ..) = written.take(1).next().unwrap(); - + // assert written to proper path + let (path, _) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!( path_parts.len(), @@ -2420,18 +2414,8 @@ mod tests { let file_path = "file:///path/to"; let parquet_sink = create_written_parquet_sink(file_path).await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet file to be written, instead found {}", - written.len() - ); - - let (path, ..) = written.take(1).next().unwrap(); - + // assert written to proper path + let (path, _) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!( path_parts.len(), @@ -2449,18 +2433,8 @@ mod tests { let file_path = "file:///path/to/"; let parquet_sink = create_written_parquet_sink(file_path).await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet file to be written, instead found {}", - written.len() - ); - - let (path, ..) = written.take(1).next().unwrap(); - + // assert written to proper path + let (path, _) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!( path_parts.len(), @@ -2474,6 +2448,14 @@ mod tests { } async fn create_written_parquet_sink(table_path: &str) -> Result> { + create_written_parquet_sink_using_config(table_path, ParquetOptions::default()) + .await + } + + async fn create_written_parquet_sink_using_config( + table_path: &str, + global: ParquetOptions, + ) -> Result> { let field_a = Field::new("a", DataType::Utf8, false); let field_b = Field::new("b", DataType::Utf8, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); @@ -2495,6 +2477,7 @@ mod tests { ("my-data".to_string(), Some("stuff".to_string())), ("my-data-bool-key".to_string(), None), ]), + global, ..Default::default() }, )); @@ -2519,6 +2502,42 @@ mod tests { Ok(parquet_sink) } + fn get_written(parquet_sink: Arc) -> Result<(Path, FileMetaData)> { + let mut written = parquet_sink.written(); + let written = written.drain(); + assert_eq!( + written.len(), + 1, + "expected a single parquet files to be written, instead found {}", + written.len() + ); + + let (path, file_metadata) = written.take(1).next().unwrap(); + Ok((path, file_metadata)) + } + + fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: Vec) { + let FileMetaData { + num_rows, + schema, + key_value_metadata, + .. + } = file_metadata; + assert_eq!(num_rows, 2, "file metadata to have 2 rows"); + assert!( + schema.iter().any(|col_schema| col_schema.name == "a"), + "output file metadata should contain col a" + ); + assert!( + schema.iter().any(|col_schema| col_schema.name == "b"), + "output file metadata should contain col b" + ); + + let mut key_value_metadata = key_value_metadata.unwrap(); + key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key)); + assert_eq!(key_value_metadata, expected_kv); + } + #[tokio::test] async fn parquet_sink_write_partitions() -> Result<()> { let field_a = Field::new("a", DataType::Utf8, false); From b20b151f82cf2840e1472a7afb40f1632307d600 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 11:55:44 -0800 Subject: [PATCH 02/15] chore(11770): add new ParquetOptions.skip_arrow_metadata --- datafusion/common/src/config.rs | 6 ++++++ .../common/src/file_options/parquet_writer.rs | 3 +++ .../proto-common/proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 18 ++++++++++++++++++ datafusion/proto-common/src/generated/prost.rs | 3 +++ datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 +++ .../proto/src/logical_plan/file_formats.rs | 2 ++ 9 files changed, 38 insertions(+) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6e64700bd2e0..c7715d596922 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -436,6 +436,12 @@ config_namespace! { /// valid values are "1.0" and "2.0" pub writer_version: String, default = "1.0".to_string() + /// (writing) Skip encoding the embedded arrow metadata in the KV_meta + /// + /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. + /// Refer to + pub skip_arrow_metadata: bool, default = false + /// (writing) Sets default parquet compression codec. /// Valid values are: uncompressed, snappy, gzip(level), /// lzo, brotli(level), lz4, zstd(level), and lz4_raw. diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index dd9d67d6bb47..a5b71b921572 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -177,6 +177,7 @@ impl ParquetOptions { bloom_filter_on_read: _, // reads not used for writer props schema_force_view_types: _, binary_as_string: _, // not used for writer props + skip_arrow_metadata: _, } = self; let mut builder = WriterProperties::builder() @@ -444,6 +445,7 @@ mod tests { bloom_filter_on_read: defaults.bloom_filter_on_read, schema_force_view_types: defaults.schema_force_view_types, binary_as_string: defaults.binary_as_string, + skip_arrow_metadata: defaults.skip_arrow_metadata, } } @@ -546,6 +548,7 @@ mod tests { bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, schema_force_view_types: global_options_defaults.schema_force_view_types, binary_as_string: global_options_defaults.binary_as_string, + skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, }, column_specific_options, key_value_metadata, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 69626f97fd80..6a7dc1604b0a 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -497,6 +497,7 @@ message ParquetOptions { bool bloom_filter_on_write = 27; // default = false bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false + bool skip_arrow_metadata = 30; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index eb6976aa0c06..ca8306275b11 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -962,6 +962,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + skip_arrow_metadata: value.skip_arrow_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e88c1497af08..e9f9de09d4d1 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4940,6 +4940,9 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { len += 1; } + if self.skip_arrow_metadata { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5033,6 +5036,9 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { struct_ser.serialize_field("binaryAsString", &self.binary_as_string)?; } + if self.skip_arrow_metadata { + struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5161,6 +5167,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes", "binary_as_string", "binaryAsString", + "skip_arrow_metadata", + "skipArrowMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5204,6 +5212,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterOnWrite, SchemaForceViewTypes, BinaryAsString, + SkipArrowMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5253,6 +5262,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), + "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5300,6 +5310,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_on_write__ = None; let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; + let mut skip_arrow_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5413,6 +5424,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } binary_as_string__ = Some(map_.next_value()?); } + GeneratedField::SkipArrowMetadata => { + if skip_arrow_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("skipArrowMetadata")); + } + skip_arrow_metadata__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5515,6 +5532,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), + skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 6b8509775847..3263c1c755af 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, + /// default = false + #[prost(bool, tag = "30")] + pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a7cea607cb6d..79faaba864f3 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -833,6 +833,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + skip_arrow_metadata: value.skip_arrow_metadata, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 6b8509775847..3263c1c755af 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, + /// default = false + #[prost(bool, tag = "30")] + pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 62405b2fef21..772e6d23426a 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -410,6 +410,7 @@ impl TableParquetOptionsProto { maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64, schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, + skip_arrow_metadata: global_options.global.skip_arrow_metadata, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -501,6 +502,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: proto.schema_force_view_types, binary_as_string: proto.binary_as_string, + skip_arrow_metadata: proto.skip_arrow_metadata, } } } From ce9510c9541d6434a903fbf293bd750e58ad8af6 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 13:48:41 -0800 Subject: [PATCH 03/15] test(11770): demonstrate that the single threaded ParquetSink is already writing the arrow schema in the kv_meta, and allow disablement --- .../src/datasource/file_format/parquet.rs | 61 ++++++++++++++++++- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6d12b26402c3..51cd9775dfed 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -68,7 +68,7 @@ use log::debug; use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, - ArrowLeafColumn, + ArrowLeafColumn, ArrowWriterOptions, }; use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, @@ -759,10 +759,14 @@ impl ParquetSink { parquet_props: WriterProperties, ) -> Result> { let buf_writer = BufWriter::new(object_store, location.clone()); - let writer = AsyncArrowWriter::try_new( + let options = ArrowWriterOptions::new() + .with_properties(parquet_props) + .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata); + + let writer = AsyncArrowWriter::try_new_with_options( buf_writer, self.get_writer_schema(), - Some(parquet_props), + options, )?; Ok(writer) } @@ -2389,6 +2393,54 @@ mod tests { Ok(()) } + #[tokio::test] + async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> { + // expected kv metadata without schema + let expected_without = vec![ + KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }, + KeyValue { + key: "my-data-bool-key".to_string(), + value: None, + }, + ]; + // expected kv metadata with schema + let expected_with = [ + vec![KeyValue { + key: "ARROW:schema".to_string(), + value: Some(ENCODED_ARROW_SCHEMA.to_string()), + }], + expected_without.clone(), + ] + .concat(); + + // single threaded write, skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: false, + skip_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_without.clone()); + + // single threaded write, do not skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: false, + skip_arrow_metadata: false, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_with.clone()); + + Ok(()) + } + #[tokio::test] async fn parquet_sink_write_with_extension() -> Result<()> { let filename = "test_file.custom_ext"; @@ -2452,10 +2504,13 @@ mod tests { .await } + static ENCODED_ARROW_SCHEMA: &str = "/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA"; + async fn create_written_parquet_sink_using_config( table_path: &str, global: ParquetOptions, ) -> Result> { + // schema should match the ENCODED_ARROW_SCHEMA bove let field_a = Field::new("a", DataType::Utf8, false); let field_b = Field::new("b", DataType::Utf8, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); From da88cec15249c06edd450c90371db38b899b2ad7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 16:18:49 -0800 Subject: [PATCH 04/15] refactor(11770): replace with new method, since the kv_metadata is inherent to TableParquetOptions and therefore we should explicitly make the API apparant that you have to include the arrow schema or not --- datafusion-cli/Cargo.lock | 2 + datafusion/common/Cargo.toml | 2 + datafusion/common/src/file_options/mod.rs | 13 +- .../common/src/file_options/parquet_writer.rs | 118 ++++++++++++------ 4 files changed, 90 insertions(+), 45 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a435869dbece..9f75a34a7607 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1326,7 +1326,9 @@ dependencies = [ "arrow", "arrow-array", "arrow-buffer", + "arrow-ipc", "arrow-schema", + "base64 0.22.1", "half", "hashbrown 0.14.5", "indexmap", diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index a81ec724dd66..330faff1cbb2 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -52,7 +52,9 @@ apache-avro = { version = "0.17", default-features = false, features = [ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-ipc = { workspace = true } arrow-schema = { workspace = true } +base64 = "0.22.1" half = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 77781457d0d2..91885fc5b42b 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -30,7 +30,6 @@ pub mod parquet_writer; mod tests { use std::collections::HashMap; - use super::parquet_writer::ParquetWriterOptions; use crate::{ config::{ConfigFileType, TableOptions}, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -79,8 +78,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; - let properties = parquet_options.writer_options(); + let properties = table_config + .parquet + .into_writer_properties_builder()? + .build(); // Verify the expected options propagated down to parquet crate WriterProperties struct assert_eq!(properties.max_row_group_size(), 123); @@ -184,8 +185,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; - let properties = parquet_options.writer_options(); + let properties = table_config + .parquet + .into_writer_properties_builder()? + .build(); let col1 = ColumnPath::from(vec!["col1".to_owned()]); let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a5b71b921572..a99694d3e07f 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,18 +17,25 @@ //! Options related to how parquet files should be written +use base64::Engine; +use std::sync::Arc; + use crate::{ config::{ParquetOptions, TableParquetOptions}, DataFusionError, Result, }; +use arrow_schema::Schema; use parquet::{ + arrow::ARROW_SCHEMA_META_KEY, basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{ - EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, - DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + file::{ + metadata::KeyValue, + properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, + DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + }, }, - format::KeyValue, schema::types::ColumnPath, }; @@ -51,38 +58,42 @@ impl ParquetWriterOptions { } } -impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { - type Error = DataFusionError; - - fn try_from(parquet_table_options: &TableParquetOptions) -> Result { - // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) - Ok(ParquetWriterOptions { - writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)? - .build(), - }) +impl TableParquetOptions { + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. + /// + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + pub fn into_writer_properties_builder(&self) -> Result { + self.into_writer_properties_builder_with_arrow_schema(None) } -} - -impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { - type Error = DataFusionError; /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. /// - /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. - fn try_from(table_parquet_options: &TableParquetOptions) -> Result { + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column, + /// as well as the arrow schema encoded into the kv_meta at [`ARROW_SCHEMA_META_KEY`]. + pub fn into_writer_properties_builder_with_arrow_schema( + &self, + to_encode: Option<&Arc>, + ) -> Result { // Table options include kv_metadata and col-specific options let TableParquetOptions { global, column_specific_options, key_value_metadata, - } = table_parquet_options; + } = self; let mut builder = global.into_writer_properties_builder()?; - if !key_value_metadata.is_empty() { + // add kv_meta, if any + let mut kv_meta = key_value_metadata.to_owned(); + if let Some(schema) = to_encode { + kv_meta.insert( + ARROW_SCHEMA_META_KEY.into(), + Some(encode_arrow_schema(schema)), + ); + } + if !kv_meta.is_empty() { builder = builder.set_key_value_metadata(Some( - key_value_metadata - .to_owned() + kv_meta .drain() .map(|(key, value)| KeyValue { key, value }) .collect(), @@ -140,6 +151,32 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { } } +/// Encodes the Arrow schema into the IPC format, and base64 encodes it +/// +/// TODO: make arrow schema encoding available in a public API. +/// Refer to currently private `add_encoded_arrow_schema_to_metadata` and `encode_arrow_schema` public. +/// +fn encode_arrow_schema(schema: &Arc) -> String { + let options = arrow_ipc::writer::IpcWriteOptions::default(); + let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true); + let data_gen = arrow_ipc::writer::IpcDataGenerator::default(); + let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker( + schema, + &mut dictionary_tracker, + &options, + ); + + // manually prepending the length to the schema as arrow uses the legacy IPC format + // TODO: change after addressing ARROW-9777 + let schema_len = serialized_schema.ipc_message.len(); + let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); + len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); + len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); + len_prefix_schema.append(&mut serialized_schema.ipc_message); + + base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema) +} + impl ParquetOptions { /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. /// @@ -573,7 +610,8 @@ mod tests { key_value_metadata: [(key, value)].into(), }; - let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) + let writer_props = table_parquet_opts + .into_writer_properties_builder() .unwrap() .build(); assert_eq!( @@ -600,10 +638,10 @@ mod tests { let default_writer_props = WriterProperties::new(); // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // Expected: how the defaults should not match assert_ne!( @@ -656,10 +694,10 @@ mod tests { // the TableParquetOptions::default, with only the bloom filter turned on let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // the WriterProperties::default, with only the bloom filter turned on let default_writer_props = WriterProperties::builder() @@ -684,10 +722,10 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_fpp = Some(0.42); - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // the WriterProperties::default, with only fpp set let default_writer_props = WriterProperties::builder() @@ -716,10 +754,10 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_ndv = Some(42); - let from_datafusion_defaults = - WriterPropertiesBuilder::try_from(&default_table_writer_opts) - .unwrap() - .build(); + let from_datafusion_defaults = default_table_writer_opts + .into_writer_properties_builder() + .unwrap() + .build(); // the WriterProperties::default, with only ndv set let default_writer_props = WriterProperties::builder() From 013b098316e1ac49550158533543cd2164ae15be Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 16:20:50 -0800 Subject: [PATCH 05/15] fix(11770): fix parallel ParquetSink to encode arrow schema into the file metadata, based on the ParquetOptions --- .../src/datasource/file_format/parquet.rs | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 51cd9775dfed..0e6ffe38c393 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -45,7 +45,6 @@ use crate::physical_plan::{ use arrow::compute::sum; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; -use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -792,7 +791,16 @@ impl DataSink for ParquetSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?; + let parquet_props = if !self.parquet_options.global.skip_arrow_metadata { + let schema = self.config.output_schema(); + self.parquet_options + .into_writer_properties_builder_with_arrow_schema(Some(schema))? + .build() + } else { + self.parquet_options + .into_writer_properties_builder()? + .build() + }; let object_store = context .runtime_env() @@ -836,7 +844,7 @@ impl DataSink for ParquetSink { .create_async_arrow_writer( &path, Arc::clone(&object_store), - parquet_props.writer_options().clone(), + parquet_props.clone(), ) .await?; let mut reservation = @@ -871,7 +879,7 @@ impl DataSink for ParquetSink { writer, rx, schema, - props.writer_options(), + &props, parallel_options_clone, pool, ) @@ -2438,6 +2446,32 @@ mod tests { let (_, file_metadata) = get_written(parquet_sink)?; assert_file_metadata(file_metadata, expected_with.clone()); + // multithreaded write, skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + skip_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_without); + + // multithreaded write, do not skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + skip_arrow_metadata: false, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, expected_with); + Ok(()) } From aac85715ec4fa125abb3240aee3f3e1ce31f6c28 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 17:05:15 -0800 Subject: [PATCH 06/15] refactor(11770): provide deprecation warning for TryFrom --- datafusion/common/src/file_options/parquet_writer.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index a99694d3e07f..33825616866a 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -59,6 +59,17 @@ impl ParquetWriterOptions { } impl TableParquetOptions { + #[deprecated( + since = "44.0.0", + note = "Please use `TableParquetOptions::into_writer_properties_builder` and `TableParquetOptions::into_writer_properties_builder_with_arrow_schema`" + )] + pub fn try_from(table_opts: &TableParquetOptions) -> Result { + // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) + Ok(ParquetWriterOptions { + writer_options: table_opts.into_writer_properties_builder()?.build(), + }) + } + /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. /// /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. From 0b960d92ea0340af6f8d5c86d8497251ab226b2e Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 20 Dec 2024 17:51:48 -0800 Subject: [PATCH 07/15] test(11770): update tests with new default to include arrow schema --- datafusion/core/src/datasource/file_format/parquet.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 0e6ffe38c393..b7f4bce17bd1 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -2354,6 +2354,11 @@ mod tests { // check the file metadata let expected_kv_meta = vec![ + // default is to include arrow schema + KeyValue { + key: "ARROW:schema".to_string(), + value: Some(ENCODED_ARROW_SCHEMA.to_string()), + }, KeyValue { key: "my-data".to_string(), value: Some("stuff".to_string()), @@ -2387,6 +2392,11 @@ mod tests { // check the file metadata let expected_kv_meta = vec![ + // default is to include arrow schema + KeyValue { + key: "ARROW:schema".to_string(), + value: Some(ENCODED_ARROW_SCHEMA.to_string()), + }, KeyValue { key: "my-data".to_string(), value: Some("stuff".to_string()), From 5a64d83c186d1e3f5492f6e68fa4e5030bbc7fae Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 26 Dec 2024 12:16:20 -0800 Subject: [PATCH 08/15] refactor: including partitioning of arrow schema inserted into kv_metdata --- .../src/datasource/file_format/parquet.rs | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index b7f4bce17bd1..7e327f8387dc 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -749,6 +749,26 @@ impl ParquetSink { } } + /// Create writer properties based upon configuration settings, + /// including partitioning and the inclusion of arrow schema metadata. + fn create_writer_props(&self) -> Result { + let props = if !self.parquet_options.global.skip_arrow_metadata { + let schema = if self.parquet_options.global.allow_single_file_parallelism { + &self.get_writer_schema() + } else { + self.config.output_schema() + }; + self.parquet_options + .into_writer_properties_builder_with_arrow_schema(Some(schema))? + .build() + } else { + self.parquet_options + .into_writer_properties_builder()? + .build() + }; + Ok(props) + } + /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized async fn create_async_arrow_writer( @@ -791,16 +811,7 @@ impl DataSink for ParquetSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let parquet_props = if !self.parquet_options.global.skip_arrow_metadata { - let schema = self.config.output_schema(); - self.parquet_options - .into_writer_properties_builder_with_arrow_schema(Some(schema))? - .build() - } else { - self.parquet_options - .into_writer_properties_builder()? - .build() - }; + let parquet_props = self.create_writer_props()?; let object_store = context .runtime_env() From 64ef4aac6af78f751e19165c1adfd0e9a985751b Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 26 Dec 2024 13:19:46 -0800 Subject: [PATCH 09/15] test: update tests for new config prop, as well as the new file partition offsets based upon larger metadata --- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ datafusion/sqllogictest/test_files/repartition_scan.slt | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 7d70cd9db53e..46618b32d77a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -215,6 +215,7 @@ datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.schema_force_view_types true +datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -308,6 +309,7 @@ datafusion.execution.parquet.pruning true (reading) If true, the parquet reader datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index a1db84b87850..9ba96e985fe5 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -61,7 +61,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # disable round robin repartitioning statement ok @@ -77,7 +77,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # enable round robin repartitioning again statement ok @@ -102,7 +102,7 @@ physical_plan 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: column1@0 != 42 -05)--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..174], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:174..342, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..180], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:180..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +05)--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] ## Read the files as though they are ordered @@ -138,7 +138,7 @@ physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----FilterExec: column1@0 != 42 -04)------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..171], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..175], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:175..351], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:171..342]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +04)------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # Cleanup statement ok From 30448b985dd319feb0359f57b20e340bb5bb0a52 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 27 Dec 2024 11:19:37 -0800 Subject: [PATCH 10/15] chore: avoid cloning in tests, and update code docs --- .../src/datasource/file_format/parquet.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 7e327f8387dc..e9839617cd46 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -754,6 +754,9 @@ impl ParquetSink { fn create_writer_props(&self) -> Result { let props = if !self.parquet_options.global.skip_arrow_metadata { let schema = if self.parquet_options.global.allow_single_file_parallelism { + // If parallelizing writes, we may be also be doing hive style partitioning + // into multiple files which impacts the schema per file. + // Refer to `self.get_writer_schema()` &self.get_writer_schema() } else { self.config.output_schema() @@ -2379,7 +2382,7 @@ mod tests { value: None, }, ]; - assert_file_metadata(file_metadata, expected_kv_meta); + assert_file_metadata(file_metadata, &expected_kv_meta); Ok(()) } @@ -2417,7 +2420,7 @@ mod tests { value: None, }, ]; - assert_file_metadata(file_metadata, expected_kv_meta); + assert_file_metadata(file_metadata, &expected_kv_meta); Ok(()) } @@ -2454,7 +2457,7 @@ mod tests { let parquet_sink = create_written_parquet_sink_using_config("file:///", opts).await?; let (_, file_metadata) = get_written(parquet_sink)?; - assert_file_metadata(file_metadata, expected_without.clone()); + assert_file_metadata(file_metadata, &expected_without); // single threaded write, do not skip insert let opts = ParquetOptions { @@ -2465,7 +2468,7 @@ mod tests { let parquet_sink = create_written_parquet_sink_using_config("file:///", opts).await?; let (_, file_metadata) = get_written(parquet_sink)?; - assert_file_metadata(file_metadata, expected_with.clone()); + assert_file_metadata(file_metadata, &expected_with); // multithreaded write, skip insert let opts = ParquetOptions { @@ -2478,7 +2481,7 @@ mod tests { let parquet_sink = create_written_parquet_sink_using_config("file:///", opts).await?; let (_, file_metadata) = get_written(parquet_sink)?; - assert_file_metadata(file_metadata, expected_without); + assert_file_metadata(file_metadata, &expected_without); // multithreaded write, do not skip insert let opts = ParquetOptions { @@ -2491,7 +2494,7 @@ mod tests { let parquet_sink = create_written_parquet_sink_using_config("file:///", opts).await?; let (_, file_metadata) = get_written(parquet_sink)?; - assert_file_metadata(file_metadata, expected_with); + assert_file_metadata(file_metadata, &expected_with); Ok(()) } @@ -2626,7 +2629,7 @@ mod tests { Ok((path, file_metadata)) } - fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: Vec) { + fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: &Vec) { let FileMetaData { num_rows, schema, @@ -2645,7 +2648,7 @@ mod tests { let mut key_value_metadata = key_value_metadata.unwrap(); key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key)); - assert_eq!(key_value_metadata, expected_kv); + assert_eq!(&key_value_metadata, expected_kv); } #[tokio::test] From f2f9b00abd9e783fdbc32e8b167251423f483fbc Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 27 Dec 2024 12:53:46 -0800 Subject: [PATCH 11/15] refactor: return to the WriterPropertiesBuilder::TryFrom, and separately add the arrow_schema to the kv_metadata on the TableParquetOptions --- datafusion/common/src/config.rs | 14 +++ datafusion/common/src/file_options/mod.rs | 14 ++- .../common/src/file_options/parquet_writer.rs | 88 +++++++++---------- .../src/datasource/file_format/parquet.rs | 33 ++++--- 4 files changed, 77 insertions(+), 72 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 7450d5461f94..4da6921ba53c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1502,6 +1502,20 @@ impl TableParquetOptions { pub fn new() -> Self { Self::default() } + + /// Set whether the encoding of the arrow metadata should occur + /// during the writing of parquet. + /// + /// Default is to encode the arrow schema in the file kv_metadata. + pub fn with_skip_arrow_metadata(self, skip: bool) -> Self { + Self { + global: ParquetOptions { + skip_arrow_metadata: skip, + ..self.global + }, + ..self + } + } } impl ConfigField for TableParquetOptions { diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 91885fc5b42b..15136664d68c 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -39,7 +39,7 @@ mod tests { use parquet::{ basic::{Compression, Encoding, ZstdLevel}, - file::properties::{EnabledStatistics, WriterVersion}, + file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion}, schema::types::ColumnPath, }; @@ -78,10 +78,8 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let properties = table_config - .parquet - .into_writer_properties_builder()? - .build(); + let properties = + WriterPropertiesBuilder::try_from(&table_config.parquet)?.build(); // Verify the expected options propagated down to parquet crate WriterProperties struct assert_eq!(properties.max_row_group_size(), 123); @@ -185,10 +183,8 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let properties = table_config - .parquet - .into_writer_properties_builder()? - .build(); + let properties = + WriterPropertiesBuilder::try_from(&table_config.parquet)?.build(); let col1 = ColumnPath::from(vec!["col1".to_owned()]); let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 33825616866a..6e0990202c12 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -59,52 +59,49 @@ impl ParquetWriterOptions { } impl TableParquetOptions { - #[deprecated( - since = "44.0.0", - note = "Please use `TableParquetOptions::into_writer_properties_builder` and `TableParquetOptions::into_writer_properties_builder_with_arrow_schema`" - )] - pub fn try_from(table_opts: &TableParquetOptions) -> Result { + /// Add the arrow schema to the parquet kv_metadata. + /// If already exists, then overwrites. + pub fn arrow_schema(&mut self, schema: &Arc) { + self.key_value_metadata.insert( + ARROW_SCHEMA_META_KEY.into(), + Some(encode_arrow_schema(schema)), + ); + } +} + +impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { + type Error = DataFusionError; + + fn try_from(parquet_table_options: &TableParquetOptions) -> Result { // ParquetWriterOptions will have defaults for the remaining fields (e.g. sorting_columns) Ok(ParquetWriterOptions { - writer_options: table_opts.into_writer_properties_builder()?.build(), + writer_options: WriterPropertiesBuilder::try_from(parquet_table_options)? + .build(), }) } +} - /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. - /// - /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. - pub fn into_writer_properties_builder(&self) -> Result { - self.into_writer_properties_builder_with_arrow_schema(None) - } +impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { + type Error = DataFusionError; /// Convert the session's [`TableParquetOptions`] into a single write action's [`WriterPropertiesBuilder`]. /// - /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column, - /// as well as the arrow schema encoded into the kv_meta at [`ARROW_SCHEMA_META_KEY`]. - pub fn into_writer_properties_builder_with_arrow_schema( - &self, - to_encode: Option<&Arc>, - ) -> Result { + /// The returned [`WriterPropertiesBuilder`] includes customizations applicable per column. + fn try_from(table_parquet_options: &TableParquetOptions) -> Result { // Table options include kv_metadata and col-specific options let TableParquetOptions { global, column_specific_options, key_value_metadata, - } = self; + } = table_parquet_options; let mut builder = global.into_writer_properties_builder()?; // add kv_meta, if any - let mut kv_meta = key_value_metadata.to_owned(); - if let Some(schema) = to_encode { - kv_meta.insert( - ARROW_SCHEMA_META_KEY.into(), - Some(encode_arrow_schema(schema)), - ); - } - if !kv_meta.is_empty() { + if !key_value_metadata.is_empty() { builder = builder.set_key_value_metadata(Some( - kv_meta + key_value_metadata + .to_owned() .drain() .map(|(key, value)| KeyValue { key, value }) .collect(), @@ -621,8 +618,7 @@ mod tests { key_value_metadata: [(key, value)].into(), }; - let writer_props = table_parquet_opts - .into_writer_properties_builder() + let writer_props = WriterPropertiesBuilder::try_from(&table_parquet_opts) .unwrap() .build(); assert_eq!( @@ -649,10 +645,10 @@ mod tests { let default_writer_props = WriterProperties::new(); // WriterProperties::try_from(TableParquetOptions::default), a.k.a. using datafusion's defaults - let from_datafusion_defaults = default_table_writer_opts - .into_writer_properties_builder() - .unwrap() - .build(); + let from_datafusion_defaults = + WriterPropertiesBuilder::try_from(&default_table_writer_opts) + .unwrap() + .build(); // Expected: how the defaults should not match assert_ne!( @@ -705,10 +701,10 @@ mod tests { // the TableParquetOptions::default, with only the bloom filter turned on let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; - let from_datafusion_defaults = default_table_writer_opts - .into_writer_properties_builder() - .unwrap() - .build(); + let from_datafusion_defaults = + WriterPropertiesBuilder::try_from(&default_table_writer_opts) + .unwrap() + .build(); // the WriterProperties::default, with only the bloom filter turned on let default_writer_props = WriterProperties::builder() @@ -733,10 +729,10 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_fpp = Some(0.42); - let from_datafusion_defaults = default_table_writer_opts - .into_writer_properties_builder() - .unwrap() - .build(); + let from_datafusion_defaults = + WriterPropertiesBuilder::try_from(&default_table_writer_opts) + .unwrap() + .build(); // the WriterProperties::default, with only fpp set let default_writer_props = WriterProperties::builder() @@ -765,10 +761,10 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_ndv = Some(42); - let from_datafusion_defaults = default_table_writer_opts - .into_writer_properties_builder() - .unwrap() - .build(); + let from_datafusion_defaults = + WriterPropertiesBuilder::try_from(&default_table_writer_opts) + .unwrap() + .build(); // the WriterProperties::default, with only ndv set let default_writer_props = WriterProperties::builder() diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index e9839617cd46..8f64bea39df7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -73,7 +73,7 @@ use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, }; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -752,24 +752,23 @@ impl ParquetSink { /// Create writer properties based upon configuration settings, /// including partitioning and the inclusion of arrow schema metadata. fn create_writer_props(&self) -> Result { - let props = if !self.parquet_options.global.skip_arrow_metadata { - let schema = if self.parquet_options.global.allow_single_file_parallelism { - // If parallelizing writes, we may be also be doing hive style partitioning - // into multiple files which impacts the schema per file. - // Refer to `self.get_writer_schema()` - &self.get_writer_schema() - } else { - self.config.output_schema() - }; - self.parquet_options - .into_writer_properties_builder_with_arrow_schema(Some(schema))? - .build() + let schema = if self.parquet_options.global.allow_single_file_parallelism { + // If parallelizing writes, we may be also be doing hive style partitioning + // into multiple files which impacts the schema per file. + // Refer to `self.get_writer_schema()` + &self.get_writer_schema() } else { - self.parquet_options - .into_writer_properties_builder()? - .build() + self.config.output_schema() }; - Ok(props) + + // TODO: avoid this clone in follow up PR, where the writer properties & schema + // are calculated once on `ParquetSink::new` + let mut parquet_opts = self.parquet_options.clone(); + if !self.parquet_options.global.skip_arrow_metadata { + parquet_opts.arrow_schema(schema); + } + + Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build()) } /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore From c5ad794bb9652e84e9ad7a66f15bef964fb2a4dc Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 27 Dec 2024 13:10:10 -0800 Subject: [PATCH 12/15] refactor: require the arrow_schema key to be present in the kv_metadata, if is required by the configuration --- datafusion/common/src/file_options/parquet_writer.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 6e0990202c12..6b8e5dfe26cd 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::{ config::{ParquetOptions, TableParquetOptions}, - DataFusionError, Result, + DataFusionError, Result, _internal_datafusion_err, }; use arrow_schema::Schema; @@ -97,6 +97,13 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { let mut builder = global.into_writer_properties_builder()?; + // check that the arrow schema is present in the kv_metadata, if configured to do so + if !global.skip_arrow_metadata + && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY) + { + return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings")); + } + // add kv_meta, if any if !key_value_metadata.is_empty() { builder = builder.set_key_value_metadata(Some( @@ -190,6 +197,8 @@ impl ParquetOptions { /// /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options /// applied per column; a customization which is not applicable for [`ParquetOptions`]. + /// + /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`]. pub fn into_writer_properties_builder(&self) -> Result { let ParquetOptions { data_pagesize_limit, From 30633771e7b15264a01b6c394d48c51824d151e9 Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 27 Dec 2024 13:13:15 -0800 Subject: [PATCH 13/15] chore: update configs.md --- docs/source/user-guide/configs.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 329b9a95c8f9..1c39064c15d7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -61,6 +61,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.skip_arrow_metadata | false | (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to | | datafusion.execution.parquet.compression | zstd(3) | (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. | | datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes | From 80a76d02571fac5b4e2f4645eb987801320f30ba Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 27 Dec 2024 14:27:45 -0800 Subject: [PATCH 14/15] test: update tests to handle the (default) required arrow schema in the kv_metadata --- datafusion/common/src/file_options/mod.rs | 12 +++-- .../common/src/file_options/parquet_writer.rs | 47 ++++++++++++++++++- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 15136664d68c..02667e016571 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -78,8 +78,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let properties = - WriterPropertiesBuilder::try_from(&table_config.parquet)?.build(); + let properties = WriterPropertiesBuilder::try_from( + &table_config.parquet.with_skip_arrow_metadata(true), + )? + .build(); // Verify the expected options propagated down to parquet crate WriterProperties struct assert_eq!(properties.max_row_group_size(), 123); @@ -183,8 +185,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let properties = - WriterPropertiesBuilder::try_from(&table_config.parquet)?.build(); + let properties = WriterPropertiesBuilder::try_from( + &table_config.parquet.with_skip_arrow_metadata(true), + )? + .build(); let col1 = ColumnPath::from(vec!["col1".to_owned()]); let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 6b8e5dfe26cd..f5be00b5f6cd 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -609,13 +609,48 @@ mod tests { } } + #[test] + fn table_parquet_opts_to_writer_props_skip_arrow_metadata() { + // TableParquetOptions, all props set to default + let mut table_parquet_opts = TableParquetOptions::default(); + assert!( + !table_parquet_opts.global.skip_arrow_metadata, + "default false, to not skip the arrow schema requirement" + ); + + // see errors without the schema added, using default settings + let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts); + assert!( + should_error.is_err(), + "should error without the required arrow schema in kv_metadata", + ); + + // succeeds if we permit skipping the arrow schema + table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true); + let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts); + assert!( + should_succeed.is_ok(), + "should work with the arrow schema skipped by config", + ); + + // Set the arrow schema back to required + table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false); + // add the arrow schema to the kv_meta + table_parquet_opts.arrow_schema(&Arc::new(Schema::empty())); + let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts); + assert!( + should_succeed.is_ok(), + "should work with the arrow schema included in TableParquetOptions", + ); + } + #[test] fn table_parquet_opts_to_writer_props() { // ParquetOptions, all props set to non-default let parquet_options = parquet_options_with_non_defaults(); // TableParquetOptions, using ParquetOptions for global settings - let key = "foo".to_string(); + let key = ARROW_SCHEMA_META_KEY.to_string(); let value = Some("bar".into()); let table_parquet_opts = TableParquetOptions { global: parquet_options.clone(), @@ -642,7 +677,7 @@ mod tests { #[test] fn test_defaults_match() { // ensure the global settings are the same - let default_table_writer_opts = TableParquetOptions::default(); + let mut default_table_writer_opts = TableParquetOptions::default(); let default_parquet_opts = ParquetOptions::default(); assert_eq!( default_table_writer_opts.global, @@ -650,6 +685,10 @@ mod tests { "should have matching defaults for TableParquetOptions.global and ParquetOptions", ); + // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema) + default_table_writer_opts = + default_table_writer_opts.with_skip_arrow_metadata(true); + // WriterProperties::default, a.k.a. using extern parquet's defaults let default_writer_props = WriterProperties::new(); @@ -697,6 +736,7 @@ mod tests { session_config_from_writer_props(&default_writer_props); from_extern_parquet.global.created_by = same_created_by; from_extern_parquet.global.compression = Some("zstd(3)".into()); + from_extern_parquet.global.skip_arrow_metadata = true; assert_eq!( default_table_writer_opts, @@ -710,6 +750,7 @@ mod tests { // the TableParquetOptions::default, with only the bloom filter turned on let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; + default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema let from_datafusion_defaults = WriterPropertiesBuilder::try_from(&default_table_writer_opts) .unwrap() @@ -738,6 +779,7 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_fpp = Some(0.42); + default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema let from_datafusion_defaults = WriterPropertiesBuilder::try_from(&default_table_writer_opts) .unwrap() @@ -770,6 +812,7 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_ndv = Some(42); + default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema let from_datafusion_defaults = WriterPropertiesBuilder::try_from(&default_table_writer_opts) .unwrap() From 999cab945920f5885348f2d263f77bf4c95324ed Mon Sep 17 00:00:00 2001 From: wiedld Date: Fri, 27 Dec 2024 16:07:48 -0800 Subject: [PATCH 15/15] chore: add reference to arrow-rs upstream PR --- datafusion/common/src/file_options/parquet_writer.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f5be00b5f6cd..46bce06470f3 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -168,9 +168,8 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { /// Encodes the Arrow schema into the IPC format, and base64 encodes it /// -/// TODO: make arrow schema encoding available in a public API. -/// Refer to currently private `add_encoded_arrow_schema_to_metadata` and `encode_arrow_schema` public. -/// +/// TODO: use extern parquet's private method, once publicly available. +/// Refer to fn encode_arrow_schema(schema: &Arc) -> String { let options = arrow_ipc::writer::IpcWriteOptions::default(); let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true);