Skip to content

Commit

Permalink
Example of reading and writing parquet metadata outside the file
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Sep 26, 2024
1 parent 2cc0c16 commit b6454ab
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 0 deletions.
6 changes: 6 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]


[[example]]
name = "external_metadata"
required-features = ["arrow", "async"]
path = "./examples/external_metadata.rs"

[[example]]
name = "read_parquet"
required-features = ["arrow"]
Expand Down
217 changes: 217 additions & 0 deletions parquet/examples/external_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow_cast::pretty::pretty_format_batches;
use parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder};
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tempfile::TempDir;

/// This example demonstrates advanced usage of the Parquet metadata APIs.
///
/// # Usecase
///
/// 1. Read Parquet metadata from an existing Parquet file.
///
/// 2. Store that metadata somewhere other than the rest of the parquet data
/// (e.g. a cache) that is not necessarily in RAM
///
/// 3. Use the metadata to determine of the file should be read, and if so,
/// read the data stored in that Parquet file, without re-reading / reparsing
/// the metadata.
///
/// Note that Parquet metadata is not necessarily contiguous in the files: part
/// is stored in the footer (the last bytes of the file), but other portions
/// (such as the PageIndex) can be stored elsewhere,
///
/// You can use the these APIs to store a copy of the metadata for parquet files
/// stored on remote object storage (e.g. S3) in a local file or an in-memory
/// cache, use a query engine like DataFusion to analyze the metadata to
/// determine which file to read, and then read those files with a single
/// object store request.
///
/// # Specifically, this example:
/// 1. Reads the metadata of a Parquet file
/// 2. Removes some column statistics from the metadata (to make them smaller)
/// 3. Stores the metadata in a separate file
/// 4. Reads the metadata from the separate file and uses that to read the Parquet file
///
/// Without this API, to implement the functionality you would need to implement
/// a conversion of the `ParquetMetaData` and related structures to/from some
/// other structs that can be serialized/deserialized.

#[tokio::main(flavor = "current_thread")]
async fn main() -> parquet::errors::Result<()> {
let tempdir = TempDir::new().unwrap();
println!("data in {tempdir:?}");
let parquet_path = create_parquet_file(&tempdir);
let metadata_path = tempdir.path().join("thrift_metadata.dat");
// temp: don't clean up tempdir
std::mem::forget(tempdir);

let metadata = get_metadata_from_parquet_file(&parquet_path).await;
println!(
"Read metadata from Parquet file into memory: {} bytes",
metadata.memory_size()
);
let metadata = prepare_metadata(metadata);
write_metadata_to_file(metadata, &metadata_path);

// now read the metadata from the file and use it to read the Parquet file
let metadata = read_metadata_from_file(&metadata_path);
println!("Read metadata from file: {metadata:#?}");

let batches = read_parquet_file_with_metadata(&parquet_path, metadata);

// display the results
let batches_string = pretty_format_batches(&batches).unwrap().to_string();
let batches_lines: Vec<_> = batches_string.split('\n').collect();

assert_eq!(
batches_lines,
[
"+-----+-------------+",
"| id | description |",
"+-----+-------------+",
"| 100 | oranges |",
"| 200 | apples |",
"| 201 | grapefruit |",
"| 300 | bannanas |",
"| 102 | grames |",
"| 33 | pears |",
"+-----+-------------+",
],
"actual output:\n\n{batches_lines:#?}"
);

Ok(())
}

/// Reads the metadata from a parquet file
async fn get_metadata_from_parquet_file(file: impl AsRef<Path>) -> ParquetMetaData {
// pretend we are reading the metadata from a remote object store
let file = std::fs::File::open(file).unwrap();
let file = tokio::fs::File::from_std(file);

// tell the reader to read the page index
let reader_options = ArrowReaderOptions::new().with_page_index(true);

let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, reader_options)
.await
.unwrap();

// The metadata is Arc'd -- so unwrap it after dropping the builder
let metadata = Arc::clone(builder.metadata());
drop(builder);
Arc::try_unwrap(metadata).unwrap()
}

/// modifies the metadata to reduce its size
fn prepare_metadata(metadata: ParquetMetaData) -> ParquetMetaData {
// maybe we will do this
metadata
}

/// writes the metadata to a file
///
/// The data is stored using the same thrift format as the Parquet file metadata
fn write_metadata_to_file(metadata: ParquetMetaData, file: impl AsRef<Path>) {
let file = File::create(file).unwrap();
let writer = ParquetMetaDataWriter::new(file, &metadata);
writer.finish().unwrap()
}

/// Reads the metadata from a file
///
/// This function reads the format written by `write_metadata_to_file`
fn read_metadata_from_file(file: impl AsRef<Path>) -> ParquetMetaData {
let file = File::open(file).unwrap();
ParquetMetaDataReader::new()
.with_column_indexes(true)
.with_offset_indexes(true)
.parse_and_finish(&file)
.unwrap()
}

/// Reads the Parquet file using the metadata
///
/// This shows how to read the Parquet file using previously read metadata
/// instead of the metadata in the Parquet file itself. This avoids an IO /
/// having to fetch and decode the metadata from the Parquet file before
/// beginning to read it.
///
/// In this example, we read the results as Arrow record batches
fn read_parquet_file_with_metadata(
file: impl AsRef<Path>,
metadata: ParquetMetaData,
) -> Vec<RecordBatch> {
let file = std::fs::File::open(file).unwrap();
let options = ArrowReaderOptions::new()
// tell the reader to read the page index
.with_page_index(true);
// create a reader with pre-existing metadata
let arrow_reader_metadata = ArrowReaderMetadata::try_new(metadata.into(), options).unwrap();
let reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, arrow_reader_metadata)
.build()
.unwrap();

reader.collect::<arrow::error::Result<Vec<_>>>().unwrap()
}

/// Make a new parquet file in the temporary directory, and returns the path
fn create_parquet_file(tmpdir: &TempDir) -> PathBuf {
let path = tmpdir.path().join("example.parquet");
let new_file = File::create(&path).unwrap();

let batch = RecordBatch::try_from_iter(vec![
(
"id",
Arc::new(Int32Array::from(vec![100, 200, 201, 300, 102, 33])) as ArrayRef,
),
(
"description",
Arc::new(StringArray::from(vec![
"oranges",
"apples",
"grapefruit",
"bannanas",
"grames",
"pears",
])),
),
])
.unwrap();

let props = WriterProperties::builder()
// ensure we write the page index level statistics
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let mut writer = ArrowWriter::try_new(new_file, batch.schema(), Some(props)).unwrap();

writer.write(&batch).unwrap();
writer.finish().unwrap();

path
}

0 comments on commit b6454ab

Please sign in to comment.