diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 863bb5181f45..6ee85ad5aa56 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1335,7 +1335,9 @@ dependencies = [ "arrow", "arrow-array", "arrow-buffer", + "arrow-ipc", "arrow-schema", + "base64 0.22.1", "half", "hashbrown 0.14.5", "indexmap", diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index b331a55a98d0..feba589082b0 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -53,7 +53,9 @@ apache-avro = { version = "0.17", default-features = false, features = [ arrow = { workspace = true } arrow-array = { workspace = true } arrow-buffer = { workspace = true } +arrow-ipc = { workspace = true } arrow-schema = { workspace = true } +base64 = "0.22.1" half = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 942aa308e200..4da6921ba53c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -436,6 +436,12 @@ config_namespace! { /// valid values are "1.0" and "2.0" pub writer_version: String, default = "1.0".to_string() + /// (writing) Skip encoding the embedded arrow metadata in the KV_meta + /// + /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. + /// Refer to + pub skip_arrow_metadata: bool, default = false + /// (writing) Sets default parquet compression codec. /// Valid values are: uncompressed, snappy, gzip(level), /// lzo, brotli(level), lz4, zstd(level), and lz4_raw. @@ -1496,6 +1502,20 @@ impl TableParquetOptions { pub fn new() -> Self { Self::default() } + + /// Set whether the encoding of the arrow metadata should occur + /// during the writing of parquet. + /// + /// Default is to encode the arrow schema in the file kv_metadata. + pub fn with_skip_arrow_metadata(self, skip: bool) -> Self { + Self { + global: ParquetOptions { + skip_arrow_metadata: skip, + ..self.global + }, + ..self + } + } } impl ConfigField for TableParquetOptions { diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index 77781457d0d2..02667e016571 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -30,7 +30,6 @@ pub mod parquet_writer; mod tests { use std::collections::HashMap; - use super::parquet_writer::ParquetWriterOptions; use crate::{ config::{ConfigFileType, TableOptions}, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, @@ -40,7 +39,7 @@ mod tests { use parquet::{ basic::{Compression, Encoding, ZstdLevel}, - file::properties::{EnabledStatistics, WriterVersion}, + file::properties::{EnabledStatistics, WriterPropertiesBuilder, WriterVersion}, schema::types::ColumnPath, }; @@ -79,8 +78,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; - let properties = parquet_options.writer_options(); + let properties = WriterPropertiesBuilder::try_from( + &table_config.parquet.with_skip_arrow_metadata(true), + )? + .build(); // Verify the expected options propagated down to parquet crate WriterProperties struct assert_eq!(properties.max_row_group_size(), 123); @@ -184,8 +185,10 @@ mod tests { table_config.set_config_format(ConfigFileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; - let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; - let properties = parquet_options.writer_options(); + let properties = WriterPropertiesBuilder::try_from( + &table_config.parquet.with_skip_arrow_metadata(true), + )? + .build(); let col1 = ColumnPath::from(vec!["col1".to_owned()]); let col2_nested = ColumnPath::from(vec!["col2".to_owned(), "nested".to_owned()]); diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index dd9d67d6bb47..46bce06470f3 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -17,18 +17,25 @@ //! Options related to how parquet files should be written +use base64::Engine; +use std::sync::Arc; + use crate::{ config::{ParquetOptions, TableParquetOptions}, - DataFusionError, Result, + DataFusionError, Result, _internal_datafusion_err, }; +use arrow_schema::Schema; use parquet::{ + arrow::ARROW_SCHEMA_META_KEY, basic::{BrotliLevel, GzipLevel, ZstdLevel}, - file::properties::{ - EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, - DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + file::{ + metadata::KeyValue, + properties::{ + EnabledStatistics, WriterProperties, WriterPropertiesBuilder, WriterVersion, + DEFAULT_MAX_STATISTICS_SIZE, DEFAULT_STATISTICS_ENABLED, + }, }, - format::KeyValue, schema::types::ColumnPath, }; @@ -51,6 +58,17 @@ impl ParquetWriterOptions { } } +impl TableParquetOptions { + /// Add the arrow schema to the parquet kv_metadata. + /// If already exists, then overwrites. + pub fn arrow_schema(&mut self, schema: &Arc) { + self.key_value_metadata.insert( + ARROW_SCHEMA_META_KEY.into(), + Some(encode_arrow_schema(schema)), + ); + } +} + impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { type Error = DataFusionError; @@ -79,6 +97,14 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { let mut builder = global.into_writer_properties_builder()?; + // check that the arrow schema is present in the kv_metadata, if configured to do so + if !global.skip_arrow_metadata + && !key_value_metadata.contains_key(ARROW_SCHEMA_META_KEY) + { + return Err(_internal_datafusion_err!("arrow schema was not added to the kv_metadata, even though it is required by configuration settings")); + } + + // add kv_meta, if any if !key_value_metadata.is_empty() { builder = builder.set_key_value_metadata(Some( key_value_metadata @@ -140,11 +166,38 @@ impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder { } } +/// Encodes the Arrow schema into the IPC format, and base64 encodes it +/// +/// TODO: use extern parquet's private method, once publicly available. +/// Refer to +fn encode_arrow_schema(schema: &Arc) -> String { + let options = arrow_ipc::writer::IpcWriteOptions::default(); + let mut dictionary_tracker = arrow_ipc::writer::DictionaryTracker::new(true); + let data_gen = arrow_ipc::writer::IpcDataGenerator::default(); + let mut serialized_schema = data_gen.schema_to_bytes_with_dictionary_tracker( + schema, + &mut dictionary_tracker, + &options, + ); + + // manually prepending the length to the schema as arrow uses the legacy IPC format + // TODO: change after addressing ARROW-9777 + let schema_len = serialized_schema.ipc_message.len(); + let mut len_prefix_schema = Vec::with_capacity(schema_len + 8); + len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]); + len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut()); + len_prefix_schema.append(&mut serialized_schema.ipc_message); + + base64::prelude::BASE64_STANDARD.encode(&len_prefix_schema) +} + impl ParquetOptions { /// Convert the global session options, [`ParquetOptions`], into a single write action's [`WriterPropertiesBuilder`]. /// /// The returned [`WriterPropertiesBuilder`] can then be further modified with additional options /// applied per column; a customization which is not applicable for [`ParquetOptions`]. + /// + /// Note that this method does not include the key_value_metadata from [`TableParquetOptions`]. pub fn into_writer_properties_builder(&self) -> Result { let ParquetOptions { data_pagesize_limit, @@ -177,6 +230,7 @@ impl ParquetOptions { bloom_filter_on_read: _, // reads not used for writer props schema_force_view_types: _, binary_as_string: _, // not used for writer props + skip_arrow_metadata: _, } = self; let mut builder = WriterProperties::builder() @@ -444,6 +498,7 @@ mod tests { bloom_filter_on_read: defaults.bloom_filter_on_read, schema_force_view_types: defaults.schema_force_view_types, binary_as_string: defaults.binary_as_string, + skip_arrow_metadata: defaults.skip_arrow_metadata, } } @@ -546,19 +601,55 @@ mod tests { bloom_filter_on_read: global_options_defaults.bloom_filter_on_read, schema_force_view_types: global_options_defaults.schema_force_view_types, binary_as_string: global_options_defaults.binary_as_string, + skip_arrow_metadata: global_options_defaults.skip_arrow_metadata, }, column_specific_options, key_value_metadata, } } + #[test] + fn table_parquet_opts_to_writer_props_skip_arrow_metadata() { + // TableParquetOptions, all props set to default + let mut table_parquet_opts = TableParquetOptions::default(); + assert!( + !table_parquet_opts.global.skip_arrow_metadata, + "default false, to not skip the arrow schema requirement" + ); + + // see errors without the schema added, using default settings + let should_error = WriterPropertiesBuilder::try_from(&table_parquet_opts); + assert!( + should_error.is_err(), + "should error without the required arrow schema in kv_metadata", + ); + + // succeeds if we permit skipping the arrow schema + table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(true); + let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts); + assert!( + should_succeed.is_ok(), + "should work with the arrow schema skipped by config", + ); + + // Set the arrow schema back to required + table_parquet_opts = table_parquet_opts.with_skip_arrow_metadata(false); + // add the arrow schema to the kv_meta + table_parquet_opts.arrow_schema(&Arc::new(Schema::empty())); + let should_succeed = WriterPropertiesBuilder::try_from(&table_parquet_opts); + assert!( + should_succeed.is_ok(), + "should work with the arrow schema included in TableParquetOptions", + ); + } + #[test] fn table_parquet_opts_to_writer_props() { // ParquetOptions, all props set to non-default let parquet_options = parquet_options_with_non_defaults(); // TableParquetOptions, using ParquetOptions for global settings - let key = "foo".to_string(); + let key = ARROW_SCHEMA_META_KEY.to_string(); let value = Some("bar".into()); let table_parquet_opts = TableParquetOptions { global: parquet_options.clone(), @@ -585,7 +676,7 @@ mod tests { #[test] fn test_defaults_match() { // ensure the global settings are the same - let default_table_writer_opts = TableParquetOptions::default(); + let mut default_table_writer_opts = TableParquetOptions::default(); let default_parquet_opts = ParquetOptions::default(); assert_eq!( default_table_writer_opts.global, @@ -593,6 +684,10 @@ mod tests { "should have matching defaults for TableParquetOptions.global and ParquetOptions", ); + // selectively skip the arrow_schema metadata, since the WriterProperties default has an empty kv_meta (no arrow schema) + default_table_writer_opts = + default_table_writer_opts.with_skip_arrow_metadata(true); + // WriterProperties::default, a.k.a. using extern parquet's defaults let default_writer_props = WriterProperties::new(); @@ -640,6 +735,7 @@ mod tests { session_config_from_writer_props(&default_writer_props); from_extern_parquet.global.created_by = same_created_by; from_extern_parquet.global.compression = Some("zstd(3)".into()); + from_extern_parquet.global.skip_arrow_metadata = true; assert_eq!( default_table_writer_opts, @@ -653,6 +749,7 @@ mod tests { // the TableParquetOptions::default, with only the bloom filter turned on let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; + default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema let from_datafusion_defaults = WriterPropertiesBuilder::try_from(&default_table_writer_opts) .unwrap() @@ -681,6 +778,7 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_fpp = Some(0.42); + default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema let from_datafusion_defaults = WriterPropertiesBuilder::try_from(&default_table_writer_opts) .unwrap() @@ -713,6 +811,7 @@ mod tests { let mut default_table_writer_opts = TableParquetOptions::default(); default_table_writer_opts.global.bloom_filter_on_write = true; default_table_writer_opts.global.bloom_filter_ndv = Some(42); + default_table_writer_opts.arrow_schema(&Arc::new(Schema::empty())); // add the required arrow schema let from_datafusion_defaults = WriterPropertiesBuilder::try_from(&default_table_writer_opts) .unwrap() diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 383fd6575234..8f64bea39df7 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -45,7 +45,6 @@ use crate::physical_plan::{ use arrow::compute::sum; use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions}; -use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common::{ @@ -68,13 +67,13 @@ use log::debug; use object_store::buffered::BufWriter; use parquet::arrow::arrow_writer::{ compute_leaves, get_column_writers, ArrowColumnChunk, ArrowColumnWriter, - ArrowLeafColumn, + ArrowLeafColumn, ArrowWriterOptions, }; use parquet::arrow::{ arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter, }; use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData}; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -750,6 +749,28 @@ impl ParquetSink { } } + /// Create writer properties based upon configuration settings, + /// including partitioning and the inclusion of arrow schema metadata. + fn create_writer_props(&self) -> Result { + let schema = if self.parquet_options.global.allow_single_file_parallelism { + // If parallelizing writes, we may be also be doing hive style partitioning + // into multiple files which impacts the schema per file. + // Refer to `self.get_writer_schema()` + &self.get_writer_schema() + } else { + self.config.output_schema() + }; + + // TODO: avoid this clone in follow up PR, where the writer properties & schema + // are calculated once on `ParquetSink::new` + let mut parquet_opts = self.parquet_options.clone(); + if !self.parquet_options.global.skip_arrow_metadata { + parquet_opts.arrow_schema(schema); + } + + Ok(WriterPropertiesBuilder::try_from(&parquet_opts)?.build()) + } + /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized async fn create_async_arrow_writer( @@ -759,10 +780,14 @@ impl ParquetSink { parquet_props: WriterProperties, ) -> Result> { let buf_writer = BufWriter::new(object_store, location.clone()); - let writer = AsyncArrowWriter::try_new( + let options = ArrowWriterOptions::new() + .with_properties(parquet_props) + .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata); + + let writer = AsyncArrowWriter::try_new_with_options( buf_writer, self.get_writer_schema(), - Some(parquet_props), + options, )?; Ok(writer) } @@ -788,7 +813,7 @@ impl DataSink for ParquetSink { data: SendableRecordBatchStream, context: &Arc, ) -> Result { - let parquet_props = ParquetWriterOptions::try_from(&self.parquet_options)?; + let parquet_props = self.create_writer_props()?; let object_store = context .runtime_env() @@ -832,7 +857,7 @@ impl DataSink for ParquetSink { .create_async_arrow_writer( &path, Arc::clone(&object_store), - parquet_props.writer_options().clone(), + parquet_props.clone(), ) .await?; let mut reservation = @@ -867,7 +892,7 @@ impl DataSink for ParquetSink { writer, rx, schema, - props.writer_options(), + &props, parallel_options_clone, pool, ) @@ -2335,42 +2360,74 @@ mod tests { async fn parquet_sink_write() -> Result<()> { let parquet_sink = create_written_parquet_sink("file:///").await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet files to be written, instead found {}", - written.len() - ); + // assert written to proper path + let (path, file_metadata) = get_written(parquet_sink)?; + let path_parts = path.parts().collect::>(); + assert_eq!(path_parts.len(), 1, "should not have path prefix"); // check the file metadata - let ( - path, - FileMetaData { - num_rows, - schema, - key_value_metadata, - .. + let expected_kv_meta = vec![ + // default is to include arrow schema + KeyValue { + key: "ARROW:schema".to_string(), + value: Some(ENCODED_ARROW_SCHEMA.to_string()), + }, + KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }, + KeyValue { + key: "my-data-bool-key".to_string(), + value: None, }, - ) = written.take(1).next().unwrap(); + ]; + assert_file_metadata(file_metadata, &expected_kv_meta); + + Ok(()) + } + + #[tokio::test] + async fn parquet_sink_parallel_write() -> Result<()> { + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + ..Default::default() + }; + + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + + // assert written to proper path + let (path, file_metadata) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!(path_parts.len(), 1, "should not have path prefix"); - assert_eq!(num_rows, 2, "file metadata to have 2 rows"); - assert!( - schema.iter().any(|col_schema| col_schema.name == "a"), - "output file metadata should contain col a" - ); - assert!( - schema.iter().any(|col_schema| col_schema.name == "b"), - "output file metadata should contain col b" - ); + // check the file metadata + let expected_kv_meta = vec![ + // default is to include arrow schema + KeyValue { + key: "ARROW:schema".to_string(), + value: Some(ENCODED_ARROW_SCHEMA.to_string()), + }, + KeyValue { + key: "my-data".to_string(), + value: Some("stuff".to_string()), + }, + KeyValue { + key: "my-data-bool-key".to_string(), + value: None, + }, + ]; + assert_file_metadata(file_metadata, &expected_kv_meta); - let mut key_value_metadata = key_value_metadata.unwrap(); - key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key)); - let expected_metadata = vec![ + Ok(()) + } + + #[tokio::test] + async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> { + // expected kv metadata without schema + let expected_without = vec![ KeyValue { key: "my-data".to_string(), value: Some("stuff".to_string()), @@ -2380,7 +2437,63 @@ mod tests { value: None, }, ]; - assert_eq!(key_value_metadata, expected_metadata); + // expected kv metadata with schema + let expected_with = [ + vec![KeyValue { + key: "ARROW:schema".to_string(), + value: Some(ENCODED_ARROW_SCHEMA.to_string()), + }], + expected_without.clone(), + ] + .concat(); + + // single threaded write, skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: false, + skip_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, &expected_without); + + // single threaded write, do not skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: false, + skip_arrow_metadata: false, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, &expected_with); + + // multithreaded write, skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + skip_arrow_metadata: true, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, &expected_without); + + // multithreaded write, do not skip insert + let opts = ParquetOptions { + allow_single_file_parallelism: true, + maximum_parallel_row_group_writers: 2, + maximum_buffered_record_batches_per_stream: 2, + skip_arrow_metadata: false, + ..Default::default() + }; + let parquet_sink = + create_written_parquet_sink_using_config("file:///", opts).await?; + let (_, file_metadata) = get_written(parquet_sink)?; + assert_file_metadata(file_metadata, &expected_with); Ok(()) } @@ -2391,18 +2504,8 @@ mod tests { let file_path = format!("file:///path/to/{}", filename); let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet file to be written, instead found {}", - written.len() - ); - - let (path, ..) = written.take(1).next().unwrap(); - + // assert written to proper path + let (path, _) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!( path_parts.len(), @@ -2420,18 +2523,8 @@ mod tests { let file_path = "file:///path/to"; let parquet_sink = create_written_parquet_sink(file_path).await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet file to be written, instead found {}", - written.len() - ); - - let (path, ..) = written.take(1).next().unwrap(); - + // assert written to proper path + let (path, _) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!( path_parts.len(), @@ -2449,18 +2542,8 @@ mod tests { let file_path = "file:///path/to/"; let parquet_sink = create_written_parquet_sink(file_path).await?; - // assert written - let mut written = parquet_sink.written(); - let written = written.drain(); - assert_eq!( - written.len(), - 1, - "expected a single parquet file to be written, instead found {}", - written.len() - ); - - let (path, ..) = written.take(1).next().unwrap(); - + // assert written to proper path + let (path, _) = get_written(parquet_sink)?; let path_parts = path.parts().collect::>(); assert_eq!( path_parts.len(), @@ -2474,6 +2557,17 @@ mod tests { } async fn create_written_parquet_sink(table_path: &str) -> Result> { + create_written_parquet_sink_using_config(table_path, ParquetOptions::default()) + .await + } + + static ENCODED_ARROW_SCHEMA: &str = "/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA"; + + async fn create_written_parquet_sink_using_config( + table_path: &str, + global: ParquetOptions, + ) -> Result> { + // schema should match the ENCODED_ARROW_SCHEMA bove let field_a = Field::new("a", DataType::Utf8, false); let field_b = Field::new("b", DataType::Utf8, false); let schema = Arc::new(Schema::new(vec![field_a, field_b])); @@ -2495,6 +2589,7 @@ mod tests { ("my-data".to_string(), Some("stuff".to_string())), ("my-data-bool-key".to_string(), None), ]), + global, ..Default::default() }, )); @@ -2519,6 +2614,42 @@ mod tests { Ok(parquet_sink) } + fn get_written(parquet_sink: Arc) -> Result<(Path, FileMetaData)> { + let mut written = parquet_sink.written(); + let written = written.drain(); + assert_eq!( + written.len(), + 1, + "expected a single parquet files to be written, instead found {}", + written.len() + ); + + let (path, file_metadata) = written.take(1).next().unwrap(); + Ok((path, file_metadata)) + } + + fn assert_file_metadata(file_metadata: FileMetaData, expected_kv: &Vec) { + let FileMetaData { + num_rows, + schema, + key_value_metadata, + .. + } = file_metadata; + assert_eq!(num_rows, 2, "file metadata to have 2 rows"); + assert!( + schema.iter().any(|col_schema| col_schema.name == "a"), + "output file metadata should contain col a" + ); + assert!( + schema.iter().any(|col_schema| col_schema.name == "b"), + "output file metadata should contain col b" + ); + + let mut key_value_metadata = key_value_metadata.unwrap(); + key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key)); + assert_eq!(&key_value_metadata, expected_kv); + } + #[tokio::test] async fn parquet_sink_write_partitions() -> Result<()> { let field_a = Field::new("a", DataType::Utf8, false); diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 69626f97fd80..6a7dc1604b0a 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -497,6 +497,7 @@ message ParquetOptions { bool bloom_filter_on_write = 27; // default = false bool schema_force_view_types = 28; // default = false bool binary_as_string = 29; // default = false + bool skip_arrow_metadata = 30; // default = false oneof metadata_size_hint_opt { uint64 metadata_size_hint = 4; diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index eb6976aa0c06..ca8306275b11 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -962,6 +962,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + skip_arrow_metadata: value.skip_arrow_metadata, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index e88c1497af08..e9f9de09d4d1 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4940,6 +4940,9 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { len += 1; } + if self.skip_arrow_metadata { + len += 1; + } if self.dictionary_page_size_limit != 0 { len += 1; } @@ -5033,6 +5036,9 @@ impl serde::Serialize for ParquetOptions { if self.binary_as_string { struct_ser.serialize_field("binaryAsString", &self.binary_as_string)?; } + if self.skip_arrow_metadata { + struct_ser.serialize_field("skipArrowMetadata", &self.skip_arrow_metadata)?; + } if self.dictionary_page_size_limit != 0 { #[allow(clippy::needless_borrow)] #[allow(clippy::needless_borrows_for_generic_args)] @@ -5161,6 +5167,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "schemaForceViewTypes", "binary_as_string", "binaryAsString", + "skip_arrow_metadata", + "skipArrowMetadata", "dictionary_page_size_limit", "dictionaryPageSizeLimit", "data_page_row_count_limit", @@ -5204,6 +5212,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterOnWrite, SchemaForceViewTypes, BinaryAsString, + SkipArrowMetadata, DictionaryPageSizeLimit, DataPageRowCountLimit, MaxRowGroupSize, @@ -5253,6 +5262,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterOnWrite" | "bloom_filter_on_write" => Ok(GeneratedField::BloomFilterOnWrite), "schemaForceViewTypes" | "schema_force_view_types" => Ok(GeneratedField::SchemaForceViewTypes), "binaryAsString" | "binary_as_string" => Ok(GeneratedField::BinaryAsString), + "skipArrowMetadata" | "skip_arrow_metadata" => Ok(GeneratedField::SkipArrowMetadata), "dictionaryPageSizeLimit" | "dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit), "dataPageRowCountLimit" | "data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit), "maxRowGroupSize" | "max_row_group_size" => Ok(GeneratedField::MaxRowGroupSize), @@ -5300,6 +5310,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_on_write__ = None; let mut schema_force_view_types__ = None; let mut binary_as_string__ = None; + let mut skip_arrow_metadata__ = None; let mut dictionary_page_size_limit__ = None; let mut data_page_row_count_limit__ = None; let mut max_row_group_size__ = None; @@ -5413,6 +5424,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } binary_as_string__ = Some(map_.next_value()?); } + GeneratedField::SkipArrowMetadata => { + if skip_arrow_metadata__.is_some() { + return Err(serde::de::Error::duplicate_field("skipArrowMetadata")); + } + skip_arrow_metadata__ = Some(map_.next_value()?); + } GeneratedField::DictionaryPageSizeLimit => { if dictionary_page_size_limit__.is_some() { return Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit")); @@ -5515,6 +5532,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_on_write: bloom_filter_on_write__.unwrap_or_default(), schema_force_view_types: schema_force_view_types__.unwrap_or_default(), binary_as_string: binary_as_string__.unwrap_or_default(), + skip_arrow_metadata: skip_arrow_metadata__.unwrap_or_default(), dictionary_page_size_limit: dictionary_page_size_limit__.unwrap_or_default(), data_page_row_count_limit: data_page_row_count_limit__.unwrap_or_default(), max_row_group_size: max_row_group_size__.unwrap_or_default(), diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 6b8509775847..3263c1c755af 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, + /// default = false + #[prost(bool, tag = "30")] + pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index a7cea607cb6d..79faaba864f3 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -833,6 +833,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { maximum_buffered_record_batches_per_stream: value.maximum_buffered_record_batches_per_stream as u64, schema_force_view_types: value.schema_force_view_types, binary_as_string: value.binary_as_string, + skip_arrow_metadata: value.skip_arrow_metadata, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 6b8509775847..3263c1c755af 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -763,6 +763,9 @@ pub struct ParquetOptions { /// default = false #[prost(bool, tag = "29")] pub binary_as_string: bool, + /// default = false + #[prost(bool, tag = "30")] + pub skip_arrow_metadata: bool, #[prost(uint64, tag = "12")] pub dictionary_page_size_limit: u64, #[prost(uint64, tag = "18")] diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 62405b2fef21..772e6d23426a 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -410,6 +410,7 @@ impl TableParquetOptionsProto { maximum_buffered_record_batches_per_stream: global_options.global.maximum_buffered_record_batches_per_stream as u64, schema_force_view_types: global_options.global.schema_force_view_types, binary_as_string: global_options.global.binary_as_string, + skip_arrow_metadata: global_options.global.skip_arrow_metadata, }), column_specific_options: column_specific_options.into_iter().map(|(column_name, options)| { ParquetColumnSpecificOptions { @@ -501,6 +502,7 @@ impl From<&ParquetOptionsProto> for ParquetOptions { maximum_buffered_record_batches_per_stream: proto.maximum_buffered_record_batches_per_stream as usize, schema_force_view_types: proto.schema_force_view_types, binary_as_string: proto.binary_as_string, + skip_arrow_metadata: proto.skip_arrow_metadata, } } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 7d70cd9db53e..46618b32d77a 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -215,6 +215,7 @@ datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.schema_force_view_types true +datafusion.execution.parquet.skip_arrow_metadata false datafusion.execution.parquet.skip_metadata true datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.write_batch_size 1024 @@ -308,6 +309,7 @@ datafusion.execution.parquet.pruning true (reading) If true, the parquet reader datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization". datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`. +datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index a1db84b87850..9ba96e985fe5 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -61,7 +61,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # disable round robin repartitioning statement ok @@ -77,7 +77,7 @@ logical_plan physical_plan 01)CoalesceBatchesExec: target_batch_size=8192 02)--FilterExec: column1@0 != 42 -03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..88], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:88..176], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:176..264], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:264..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +03)----ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..137], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:137..274], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:274..411], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:411..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # enable round robin repartitioning again statement ok @@ -102,7 +102,7 @@ physical_plan 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] 03)----CoalesceBatchesExec: target_batch_size=8192 04)------FilterExec: column1@0 != 42 -05)--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..174], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:174..342, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..180], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:180..351]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +05)--------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..272], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:272..538, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..6], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:6..278], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:278..547]]}, projection=[column1], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] ## Read the files as though they are ordered @@ -138,7 +138,7 @@ physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----FilterExec: column1@0 != 42 -04)------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..171], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..175], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:175..351], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:171..342]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] +04)------ParquetExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:0..269], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:0..273], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/2.parquet:273..547], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/repartition_scan/parquet_table/1.parquet:269..538]]}, projection=[column1], output_ordering=[column1@0 ASC NULLS LAST], predicate=column1@0 != 42, pruning_predicate=column1_null_count@2 != column1_row_count@3 AND (column1_min@0 != 42 OR 42 != column1_max@1), required_guarantees=[column1 not in (42)] # Cleanup statement ok diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 329b9a95c8f9..1c39064c15d7 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -61,6 +61,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.data_pagesize_limit | 1048576 | (writing) Sets best effort maximum size of data page in bytes | | datafusion.execution.parquet.write_batch_size | 1024 | (writing) Sets write_batch_size in bytes | | datafusion.execution.parquet.writer_version | 1.0 | (writing) Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.skip_arrow_metadata | false | (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to | | datafusion.execution.parquet.compression | zstd(3) | (writing) Sets default parquet compression codec. Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. If NULL, uses default parquet writer setting Note that this default setting is not the same as the default parquet writer setting. | | datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes |