Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of reading and writing parquet metadata outside the file #6081

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]


[[example]]
name = "external_metadata"
required-features = ["arrow", "async"]
path = "./examples/external_metadata.rs"

[[example]]
name = "read_parquet"
required-features = ["arrow"]
Expand Down
236 changes: 236 additions & 0 deletions parquet/examples/external_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_cast::pretty::pretty_format_batches;
use futures::TryStreamExt;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;

/// This example demonstrates advanced usage of the Parquet metadata APIs.
///
/// It is sometimes desired to copy the metadata for parquet files stored on
/// remote object storage (e.g. S3) to a local file or an in-memory cache, use a
/// query engine like DataFusion to analyze the metadata to determine which file
/// to read, and then read any matching files with a single subsequent object
/// store request.
///
/// # Example Overview
///
/// 1. Reads the metadata of a Parquet file using [`ParquetMetaDataReader`]
///
/// 2. Removes column statistics from the metadata (to make it smaller)
///
/// 3. Stores the metadata in a separate file using [`ParquetMetaDataWriter`]
///
/// 4. Reads the metadata from the separate file and uses that to read the
/// Parquet file, thus avoiding a second IO to read metadata or reparsing
/// the footer.
///
#[tokio::main(flavor = "current_thread")]
async fn main() -> parquet::errors::Result<()> {
let tempdir = TempDir::new().unwrap();
let parquet_path = create_parquet_file(&tempdir);
let metadata_path = tempdir.path().join("thrift_metadata.dat");

// In this example, we use a tokio file to mimic an async remote data source
let mut remote_parquet_file = tokio::fs::File::open(&parquet_path).await?;

let metadata = get_metadata_from_remote_parquet_file(&mut remote_parquet_file).await;
println!(
"Metadata from 'remote' Parquet file into memory: {} bytes",
metadata.memory_size()
);

// now slim down the metadata and write it to a "local" file
let metadata = prepare_metadata(metadata);
write_metadata_to_local_file(metadata, &metadata_path);

// now read the metadata from the local file and use it to read the "remote" Parquet file
let metadata = read_metadata_from_local_file(&metadata_path);
println!("Read metadata from file");

let batches = read_remote_parquet_file_with_metadata(remote_parquet_file, metadata).await;

// display the results
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
let batches_lines: Vec<_> = batches_string.split('\n').collect();

assert_eq!(
batches_lines,
[
"+-----+-------------+",
"| id | description |",
"+-----+-------------+",
"| 100 | oranges |",
"| 200 | apples |",
"| 201 | grapefruit |",
"| 300 | bannanas |",
"| 102 | grapes |",
"| 33 | pears |",
"+-----+-------------+",
],
"actual output:\n\n{batches_lines:#?}"
);

Ok(())
}

/// Reads the metadata from a "remote" parquet file
///
/// Note that this function models reading from a remote file source using a
/// tokio file. In a real application, you would implement [`MetadataFetch`] for
/// your own remote source.
///
/// [`MetadataFetch`]: parquet::arrow::async_reader::MetadataFetch
async fn get_metadata_from_remote_parquet_file(
remote_file: &mut tokio::fs::File,
) -> ParquetMetaData {
// the remote source must know the total file size (e.g. from an object store LIST operation)
let file_size = remote_file.metadata().await.unwrap().len();

// tell the reader to read the page index
ParquetMetaDataReader::new()
.with_page_indexes(true)
.load_and_finish(remote_file, file_size as usize)
.await
.unwrap()
}

/// modifies the metadata to reduce its size
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
let orig_size = metadata.memory_size();

let mut builder = metadata.into_builder();

// remove column statistics to reduce the size of the metadata by converting
// the various structures into their respective builders and modifying them
// as needed.
for row_group in builder.take_row_groups() {
let mut row_group_builder = row_group.into_builder();
for column in row_group_builder.take_columns() {
let column = column.into_builder().clear_statistics().build().unwrap();
row_group_builder = row_group_builder.add_column_metadata(column);
}
let row_group = row_group_builder.build().unwrap();
builder = builder.add_row_group(row_group);
}
let metadata = builder.build();

// verifiy that the size has indeed been reduced
let new_size = metadata.memory_size();
assert!(new_size < orig_size, "metadata size did not decrease");
println!("Reduced metadata size from {} to {}", orig_size, new_size);
metadata
}

/// writes the metadata to a file
///
/// The data is stored using the same thrift format as the Parquet file metadata
fn write_metadata_to_local_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
let file = File::create(file).unwrap();
ParquetMetaDataWriter::new(file, &metadata)
.finish()
.unwrap()
}

/// Reads the metadata from a file
///
/// This function reads the format written by `write_metadata_to_file`
fn read_metadata_from_local_file(file: impl AsRef<Path>) -> ParquetMetaData {
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
alamb marked this conversation as resolved.
Show resolved Hide resolved
.with_page_indexes(true)
.parse_and_finish(&file)
.unwrap()
}

/// Reads the "remote" Parquet file using the metadata
///
/// This shows how to read the Parquet file using previously read metadata
/// instead of the metadata in the Parquet file itself. This avoids an IO /
/// having to fetch and decode the metadata from the Parquet file before
/// beginning to read it.
///
/// Note that this function models reading from a remote file source using a
/// tokio file. In a real application, you would implement [`AsyncFileReader`]
/// for your own remote source.
///
/// In this example, we simply buffer the results in memory as Arrow record
/// batches but a real application would likely process the batches as they are
/// read.
///
/// [`AsyncFileReader`]: parquet::arrow::async_reader::AsyncFileReader
async fn read_remote_parquet_file_with_metadata(
remote_file: tokio::fs::File,
metadata: ParquetMetaData,
) -> Vec<RecordBatch> {
let options = ArrowReaderOptions::new()
// tell the reader to read the page index
.with_page_index(true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also kind of akward -- it would be great if the actual reading of the parquet metadata could do this...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean loading the page index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, what I was trying to get at was that since the ColumnIndex and OffsetIndex (aka the "Page index structures") are not store inline, decode_metadata doesn't read them -- the logic to do so is baked into this reader

https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to describe this more on #6002 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we now get rid of with_page_index? Didn't ParquetMetaDataReader already load it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can...it appears that options.page_index is only used in ArrowReaderMetadata::load, so it should have no effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is now pretty clear -- we still need to explicitly call for loading the page index, but it makes sense I think

We could potentially change the default to be "read the page index unless told not to"

// create a reader with pre-existing metadata
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader =
ParquetRecordBatchStreamBuilder::new_with_metadata(remote_file, arrow_reader_metadata)
.build()
.unwrap();

reader.try_collect::<Vec<_>>().await.unwrap()
}

/// Make a new parquet file in the temporary directory, and returns the path
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
let path = tmpdir.path().join("example.parquet");
let new_file = File::create(&path).unwrap();

let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
),
(
"description",
Arc::new(StringArray::from(vec![
"oranges",
"apples",
"grapefruit",
"bannanas",
"grapes",
"pears",
])),
),
])
.unwrap();

let props = WriterProperties::builder()
// ensure we write the page index level statistics
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();

writer.write(&batch).unwrap();
writer.finish().unwrap();

path
}
25 changes: 11 additions & 14 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,27 @@
//!
//! # APIs for working with Parquet Metadata
//!
//! The Parquet readers and writers in this crate read and write
//! The Parquet readers and writers in this crate handle reading and writing
//! metadata into parquet files. To work with metadata directly,
//! the following APIs are available.
//! the following APIs are available:
//!
//! Reading:
//! * Read from bytes to `ParquetMetaData`: [`decode_footer`]
//! and [`decode_metadata`]
//! * Read from an `async` source to `ParquetMetaData`: [`MetadataLoader`]
//! * Read from bytes or from an async source to `ParquetMetaData`: [`ParquetMetaDataReader`]
//! * [`ParquetMetaDataReader`] for reading
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also updates the docs to point at the new APIs added in the previous releases :bowtie:

I can pull these changes out into their own PR if we prefer

//! * [`ParquetMetaDataWriter`] for writing.
//!
//! [`MetadataLoader`]: https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.MetadataLoader.html
//! [`decode_footer`]: crate::file::footer::decode_footer
//! [`decode_metadata`]: crate::file::footer::decode_metadata
//! [`ParquetMetaDataReader`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html
//! [`ParquetMetaDataWriter`]: https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataWriter.html
//!
//! Writing:
//! * Write `ParquetMetaData` to bytes in memory: [`ParquetMetaDataWriter`]
//! * Writes `ParquetMetaData` to an async target: Not yet supported
//! # Examples
//!
//! Please see [`external_metadata.rs`]
//!
//! [`external_metadata.rs`]: https://github.com/apache/arrow-rs/tree/master/parquet/examples/external_metadata.rs
//!
//! # Metadata Encodings and Structures
//!
//! There are three different encodings of Parquet Metadata in this crate:
//!
//! 1. `bytes`:encoded with the Thrift TCompactProtocol as defined in
//! 1. `bytes`:encoded with the Thrift `TCompactProtocol` as defined in
//! [parquet.thrift]
//!
//! 2. [`format`]: Rust structures automatically generated by the thrift compiler
Expand Down
Loading