Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add round trip tests for reading/writing parquet metadata #6463

Merged

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Sep 26, 2024

Draft as this PR builds on #6466

Which issue does this PR close?

Part of #6002

Rationale for this change

@adriangb , @etseidl and myself were unclear what happens when parquet metadata that has been serialized directly with the ParquetMetaDataWriter: https://github.com/apache/arrow-rs/pull/6081/files#r1773996251

I figured the best way to find out was to write some tests for it 🤓

What changes are included in this PR?

Are there any user-facing changes?

No, just tests

@adriangb
Copy link
Contributor

adriangb commented Sep 26, 2024

I think this PR or a subsequent one can also then delete these tests that I wrote before we had the dedicated reader:

#[test]
fn test_roundtrip_parquet_metadata_without_page_index() {
// We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
// we at least test round trip without them
let metadata = get_test_metadata(false, false);
assert!(!has_page_index(&metadata.metadata));
let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}
let data = buf.into_inner().freeze();
let decoded_metadata = ParquetMetaDataReader::new()
.parse_and_finish(&data)
.unwrap();
assert!(!has_page_index(&metadata.metadata));
assert_eq!(metadata.metadata, decoded_metadata);
}
fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata {
let mut buf = BytesMut::new().writer();
let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
"a",
ArrowDataType::Int32,
true,
)]));
// build row groups / pages that exercise different combinations of nulls and values
// note that below we set the row group and page sizes to 4 and 2 respectively
// so that these "groupings" make sense
let a: ArrayRef = Arc::new(Int32Array::from(vec![
// a row group that has all values
Some(i32::MIN),
Some(-1),
Some(1),
Some(i32::MAX),
// a row group with a page of all nulls and a page of all values
None,
None,
Some(2),
Some(3),
// a row group that has all null pages
None,
None,
None,
None,
// a row group having 1 page with all values and 1 page with some nulls
Some(4),
Some(5),
None,
Some(6),
// a row group having 1 page with all nulls and 1 page with some nulls
None,
None,
Some(7),
None,
// a row group having all pages with some nulls
None,
Some(8),
Some(9),
None,
]));
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
let writer_props_builder = match write_page_index {
true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
};
// tune the size or pages to the data above
// to make sure we exercise code paths where all items in a page are null, etc.
let writer_props = writer_props_builder
.set_max_row_group_size(4)
.set_data_page_row_count_limit(2)
.set_write_batch_size(2)
.build();
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
let data = buf.into_inner().freeze();
let reader_opts = match read_page_index {
true => ReadOptionsBuilder::new().with_page_index().build(),
false => ReadOptionsBuilder::new().build(),
};
let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap();
let metadata = reader.metadata().clone();
TestMetadata {
file_size: data.len(),
metadata,
}
}
/// Temporary function so we can test loading metadata with page indexes
/// while we haven't fully figured out how to load it cleanly
async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
use crate::errors::Result as ParquetResult;
use futures::future::BoxFuture;
use futures::FutureExt;
use std::ops::Range;
/// Adapt a `Bytes` to a `MetadataFetch` implementation.
struct AsyncBytes {
data: Bytes,
}
impl AsyncBytes {
fn new(data: Bytes) -> Self {
Self { data }
}
}
impl MetadataFetch for AsyncBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
}
}
/// A `MetadataFetch` implementation that reads from a subset of the full data
/// while accepting ranges that address the full data.
struct MaskedBytes {
inner: Box<dyn MetadataFetch + Send>,
inner_range: Range<usize>,
}
impl MaskedBytes {
fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
Self { inner, inner_range }
}
}
impl MetadataFetch for &mut MaskedBytes {
fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let inner_range = self.inner_range.clone();
println!("inner_range: {:?}", inner_range);
println!("range: {:?}", range);
assert!(inner_range.start <= range.start && inner_range.end >= range.end);
let range =
range.start - self.inner_range.start..range.end - self.inner_range.start;
self.inner.fetch(range)
}
}
let metadata_length = data.len();
let mut reader = MaskedBytes::new(
Box::new(AsyncBytes::new(data)),
file_size - metadata_length..file_size,
);
let metadata = MetadataLoader::load(&mut reader, file_size, None)
.await
.unwrap();
let loaded_metadata = metadata.finish();
let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
metadata.load_page_index(true, true).await.unwrap();
metadata.finish()
}
fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
assert_eq!(left.column_descr(), right.column_descr());
assert_eq!(left.encodings(), right.encodings());
assert_eq!(left.num_values(), right.num_values());
assert_eq!(left.compressed_size(), right.compressed_size());
assert_eq!(left.data_page_offset(), right.data_page_offset());
assert_eq!(left.statistics(), right.statistics());
assert_eq!(left.offset_index_length(), right.offset_index_length());
assert_eq!(left.column_index_length(), right.column_index_length());
assert_eq!(
left.unencoded_byte_array_data_bytes(),
right.unencoded_byte_array_data_bytes()
);
}
fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) {
assert_eq!(left.num_rows(), right.num_rows());
assert_eq!(left.file_offset(), right.file_offset());
assert_eq!(left.total_byte_size(), right.total_byte_size());
assert_eq!(left.schema_descr(), right.schema_descr());
assert_eq!(left.num_columns(), right.num_columns());
left.columns()
.iter()
.zip(right.columns().iter())
.for_each(|(lc, rc)| {
check_columns_are_equivalent(lc, rc);
});
}
#[tokio::test]
async fn test_encode_parquet_metadata_with_page_index() {
// Create a ParquetMetadata with page index information
let metadata = get_test_metadata(true, true);
assert!(has_page_index(&metadata.metadata));
let mut buf = BytesMut::new().writer();
{
let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
writer.finish().unwrap();
}
let data = buf.into_inner().freeze();
let decoded_metadata = load_metadata_from_bytes(data.len(), data).await;
// Because the page index offsets will differ, compare invariant parts of the metadata
assert_eq!(
metadata.metadata.file_metadata(),
decoded_metadata.file_metadata()
);
assert_eq!(
metadata.metadata.column_index(),
decoded_metadata.column_index()
);
assert_eq!(
metadata.metadata.offset_index(),
decoded_metadata.offset_index()
);
assert_eq!(
metadata.metadata.num_row_groups(),
decoded_metadata.num_row_groups()
);
// check that the mins and maxes are what we expect for each page
// also indirectly checking that the pages were written out as we expected them to be laid out
// (if they're not, or something gets refactored in the future that breaks that assumption,
// this test may have to drop down to a lower level and create metadata directly instead of relying on
// writing an entire file)
let column_indexes = metadata.metadata.column_index().unwrap();
assert_eq!(column_indexes.len(), 6);
// make sure each row group has 2 pages by checking the first column
// page counts for each column for each row group, should all be the same and there should be
// 12 pages in total across 6 row groups / 1 column
let mut page_counts = vec![];
for row_group in column_indexes {
for column in row_group {
match column {
Index::INT32(column_index) => {
page_counts.push(column_index.indexes.len());
}
_ => panic!("unexpected column index type"),
}
}
}
assert_eq!(page_counts, vec![2; 6]);
metadata
.metadata
.row_groups()
.iter()
.zip(decoded_metadata.row_groups().iter())
.for_each(|(left, right)| {
check_row_groups_are_equivalent(left, right);
});
}
}

@alamb
Copy link
Contributor Author

alamb commented Sep 26, 2024

I think this PR or a subsequent one can also then delete these tests that I wrote before we had the dedicated reader:

Good idea, I did it in 9045968

@@ -377,300 +377,3 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
}
}
}

#[cfg(test)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed per @adriangb 's comment: #6463 (comment)

@alamb alamb force-pushed the alamb/tests_for_parquet_metadata_roundtrip branch from 81e6fff to 4f6785f Compare October 2, 2024 20:05
@alamb alamb marked this pull request as ready for review October 2, 2024 20:05
parquet/src/arrow/mod.rs Outdated Show resolved Hide resolved
parquet/src/arrow/mod.rs Outdated Show resolved Hide resolved
parquet/src/arrow/mod.rs Outdated Show resolved Hide resolved
Co-authored-by: Matthijs Brobbel <[email protected]>
Copy link
Contributor Author

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mbrobbel for the review

@alamb
Copy link
Contributor Author

alamb commented Oct 5, 2024

Thanks again for the help and review @adriangb and @mbrobbel

@alamb alamb merged commit ac51632 into apache:master Oct 5, 2024
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants