Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Verify 32-bit CRC checksum when decoding pages #6290

Merged
merged 16 commits into from
Sep 28, 2024
3 changes: 3 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"]
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]

[[example]]
name = "read_parquet"
Expand Down
3 changes: 2 additions & 1 deletion parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your
- `zstd` (default) - support for parquet using `zstd` compression
- `snap` (default) - support for parquet using `snappy` compression
- `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin)
- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding
- `experimental` - Experimental APIs which may change, even between minor releases

## Parquet Feature Status
Expand All @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your

## License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Licensed under the Apache License, Version 2.0: <http://www.apache.org/licenses/LICENSE-2.0>.
xmakro marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 9 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,15 @@ pub(crate) fn decode_page(
physical_type: Type,
decompressor: Option<&mut Box<dyn Codec>>,
) -> Result<Page> {
// Verify the 32-bit CRC checksum of the page
#[cfg(feature = "crc")]
if let Some(expected_crc) = page_header.crc {
let crc = crc32fast::hash(&buffer);
if crc != expected_crc as u32 {
return Err(general_err!("Page CRC checksum mismatch"));
}
}
xmakro marked this conversation as resolved.
Show resolved Hide resolved

// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
Expand Down
73 changes: 73 additions & 0 deletions parquet/tests/arrow_reader/checksum.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This file contains an end to end test for verifying checksums when reading parquet files.

use std::path::PathBuf;
xmakro marked this conversation as resolved.
Show resolved Hide resolved

use arrow::util::test_util::parquet_test_data;
use parquet::arrow::arrow_reader::ArrowReaderBuilder;

#[test]
fn test_datapage_v1_corrupt_checksum() {
let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet");
assert_eq!(errors, [
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Ok(()),
Ok(()),
Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()),
Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string())
]);
}

#[test]
fn test_datapage_v1_uncompressed_checksum() {
let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet");
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
}

#[test]
fn test_datapage_v1_snappy_compressed_checksum() {
let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet");
assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]);
}

#[test]
fn test_plain_dict_uncompressed_checksum() {
let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet");
assert_eq!(errors, [Ok(())]);
}
#[test]
fn test_rle_dict_snappy_checksum() {
let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet");
assert_eq!(errors, [Ok(())]);
}

/// Reads a file and returns a vector with one element per record batch.
/// The record batch data is replaced with () and errors are stringified.
fn read_file_batch_errors(name: &str) -> Vec<Result<(), String>> {
let path = PathBuf::from(parquet_test_data()).join(name);
println!("Reading file: {:?}", path);
let file = std::fs::File::open(&path).unwrap();
let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap();
reader
.map(|x| match x {
Ok(_) => Ok(()),
Err(e) => Err(e.to_string()),
})
.collect()
}
xmakro marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions parquet/tests/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use std::sync::Arc;
use tempfile::NamedTempFile;

mod bad_data;
#[cfg(feature = "crc")]
mod checksum;
mod statistics;

// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values
Expand Down
Loading