Skip to content

Commit

Permalink
feat: Implement Parquet footer-mode encryption
Browse files Browse the repository at this point in the history
Uses PARC magic.  Haven't tested if we perfectly follow the Parquet spec.

Includes some design choices around key metadata (such as using Sha3
key metadata to select keys) which are not sufficiently abstracted for
a general purpose library.  (That is, there would be some changes to
make if this were to be upstreamed.)
  • Loading branch information
srh committed Sep 5, 2024
1 parent ba5455c commit 9d6173c
Show file tree
Hide file tree
Showing 11 changed files with 774 additions and 84 deletions.
8 changes: 6 additions & 2 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ edition = "2018"

[dependencies]
# update note: pin `parquet-format` to specific version until it does not break at minor
# version, see ARROW-11187.
parquet-format = "~2.6.1"
# version, see ARROW-11187. update: since this comment, it is now pinned at ~4.0.0 and
# upstream arrow-rs parquet vendors it
parquet-format = "~4.0.0"
aes-gcm = "0.10.3"
byteorder = "1"
thrift = "0.13"
snap = { version = "1.0", optional = true }
Expand All @@ -45,7 +47,9 @@ arrow = { path = "../arrow", version = "5.0.0", optional = true }
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
serde = { version = "1.0.115", features = ["derive"] }
rand = "0.8"
sha3 = "0.10.8"

[dev-dependencies]
criterion = "0.3"
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ pub enum Encoding {
///
/// The ids are encoded using the RLE encoding.
RLE_DICTIONARY,

/// Encoding for floating-point data.
///
/// K byte-streams are created where K is the size in bytes of the data type.
/// The individual bytes of an FP value are scattered to the corresponding stream and
/// the streams are concatenated.
/// This itself does not reduce the size of the data but can lead to better compression
/// afterwards.
BYTE_STREAM_SPLIT,
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -701,6 +710,7 @@ impl convert::From<parquet::Encoding> for Encoding {
parquet::Encoding::DeltaLengthByteArray => Encoding::DELTA_LENGTH_BYTE_ARRAY,
parquet::Encoding::DeltaByteArray => Encoding::DELTA_BYTE_ARRAY,
parquet::Encoding::RleDictionary => Encoding::RLE_DICTIONARY,
parquet::Encoding::ByteStreamSplit => Encoding::BYTE_STREAM_SPLIT,
}
}
}
Expand All @@ -716,6 +726,7 @@ impl convert::From<Encoding> for parquet::Encoding {
Encoding::DELTA_LENGTH_BYTE_ARRAY => parquet::Encoding::DeltaLengthByteArray,
Encoding::DELTA_BYTE_ARRAY => parquet::Encoding::DeltaByteArray,
Encoding::RLE_DICTIONARY => parquet::Encoding::RleDictionary,
Encoding::BYTE_STREAM_SPLIT => parquet::Encoding::ByteStreamSplit,
}
}
}
Expand Down
15 changes: 9 additions & 6 deletions parquet/src/column/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,14 @@ impl CompressedPage {
self.uncompressed_size
}

/// Returns compressed size in bytes.
/// Returns compressed size (but not encrypted size) in bytes.
///
/// Note that it is assumed that buffer is compressed, but it may not be. In this
/// case compressed size will be equal to uncompressed size.
pub fn compressed_size(&self) -> usize {
/// Note that it is assumed that buffer is compressed, but it may not be. In this case
/// compressed size will be equal to uncompressed size.
///
/// Other so-called "(total_)?compressed_size" fields include encryption overhead, when
/// applicable, which this does not.
pub fn compressed_unencrypted_size(&self) -> usize {
self.compressed_page.buffer().len()
}

Expand Down Expand Up @@ -206,7 +209,7 @@ pub trait PageWriter {
///
/// This method is called for every compressed page we write into underlying buffer,
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option<u16>) -> Result<PageWriteSpec>;

/// Writes column chunk metadata into the output stream/sink.
///
Expand Down Expand Up @@ -299,7 +302,7 @@ mod tests {

assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
assert_eq!(cpage.uncompressed_size(), 5);
assert_eq!(cpage.compressed_size(), 3);
assert_eq!(cpage.compressed_unencrypted_size(), 3);
assert_eq!(cpage.num_values(), 10);
assert_eq!(cpage.encoding(), Encoding::PLAIN);
assert_eq!(cpage.data(), &[0, 1, 2]);
Expand Down
67 changes: 54 additions & 13 deletions parquet/src/column/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub struct ColumnWriterImpl<T: DataType> {
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter>,
page_ordinal: usize,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
encoder: Box<dyn Encoder<T>>,
Expand All @@ -185,6 +186,8 @@ pub struct ColumnWriterImpl<T: DataType> {
total_bytes_written: u64,
total_rows_written: u64,
total_uncompressed_size: u64,
// Includes encryption overhead -- the thrift definition field includes encryption overhead, and
// we keep its name here.
total_compressed_size: u64,
total_num_values: u64,
dictionary_page_offset: Option<u64>,
Expand Down Expand Up @@ -231,10 +234,14 @@ impl<T: DataType> ColumnWriterImpl<T> {
)
.unwrap();

// We start counting pages from zero.
let page_ordinal: usize = 0;

Self {
descr,
props,
page_writer,
page_ordinal,
has_dictionary,
dict_encoder,
encoder: fallback_encoder,
Expand Down Expand Up @@ -824,7 +831,10 @@ impl<T: DataType> ColumnWriterImpl<T> {
/// Writes compressed data page into underlying sink and updates global metrics.
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
let page_spec = self.page_writer.write_page(page)?;
let page_ordinal = self.page_ordinal;
let aad_page_ordinal: Option<u16> = Some(page_ordinal as u16);
self.page_ordinal += 1;
let page_spec = self.page_writer.write_page(page, aad_page_ordinal)?;
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -858,7 +868,7 @@ impl<T: DataType> ColumnWriterImpl<T> {
CompressedPage::new(dict_page, uncompressed_size)
};

let page_spec = self.page_writer.write_page(compressed_page)?;
let page_spec = self.page_writer.write_page(compressed_page, None)?;
self.update_metrics_for_page(page_spec);
Ok(())
}
Expand Down Expand Up @@ -1026,10 +1036,10 @@ fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
mod tests {
use rand::distributions::uniform::SampleUniform;

use crate::column::{
use crate::{column::{
page::PageReader,
reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
};
}, file::encryption::USUAL_ENCRYPTION_OVERHEAD};
use crate::file::{
properties::WriterProperties, reader::SerializedPageReader,
writer::SerializedPageWriter,
Expand Down Expand Up @@ -1642,21 +1652,24 @@ mod tests {
);
}

const TEST_ROW_GROUP_ORDINAL: i16 = 1234;
const TEST_COLUMN_ORDINAL: u16 = 135;

#[test]
fn test_column_writer_add_data_pages_with_dict() {
// ARROW-5129: Test verifies that we add data page in case of dictionary encoding
// and no fallback occurred so far.
let file = get_temp_file("test_column_writer_add_data_pages_with_dict", &[]);
let sink = FileSink::new(&file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL));
let props = Arc::new(
WriterProperties::builder()
.set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes
.set_write_batch_size(3) // write 3 values at a time
.build(),
);
let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props.clone());
writer.write_batch(data, None, None).unwrap();
let (bytes_written, _, _) = writer.close().unwrap();

Expand All @@ -1665,8 +1678,12 @@ mod tests {
let mut page_reader = Box::new(
SerializedPageReader::new(
source,
None,
TEST_ROW_GROUP_ORDINAL,
TEST_COLUMN_ORDINAL,
data.len() as i64,
Compression::UNCOMPRESSED,
props.dictionary_enabled(&ColumnPath::from("col")),
Int32Type::get_physical_type(),
)
.unwrap(),
Expand Down Expand Up @@ -1803,7 +1820,7 @@ mod tests {
) {
let file = get_temp_file(file_name, &[]);
let sink = FileSink::new(&file);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let page_writer = Box::new(SerializedPageWriter::new(sink, None, TEST_ROW_GROUP_ORDINAL, TEST_COLUMN_ORDINAL));

let max_def_level = match def_levels {
Some(buf) => *buf.iter().max().unwrap_or(&0i16),
Expand All @@ -1823,11 +1840,12 @@ mod tests {
max_batch_size = cmp::max(max_batch_size, levels.len());
}

let props = Arc::new(props);
let mut writer = get_test_column_writer::<T>(
page_writer,
max_def_level,
max_rep_level,
Arc::new(props),
props.clone(),
);

let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
Expand All @@ -1838,8 +1856,12 @@ mod tests {
let page_reader = Box::new(
SerializedPageReader::new(
source,
None,
TEST_ROW_GROUP_ORDINAL,
TEST_COLUMN_ORDINAL,
column_metadata.num_values(),
column_metadata.compression(),
props.dictionary_enabled(&ColumnPath::from("col")),
T::get_physical_type(),
)
.unwrap(),
Expand Down Expand Up @@ -1977,20 +1999,39 @@ mod tests {

/// Returns page writer that collects pages without serializing them.
fn get_test_page_writer() -> Box<dyn PageWriter> {
Box::new(TestPageWriter {})
Box::new(TestPageWriter {simulate_encrypted: false, last_page_ordinal: None})
}

struct TestPageWriter {}
struct TestPageWriter {
/// Always false, currently -- enabling would just affect return values that get fed into
/// test assertions.
simulate_encrypted: bool,
last_page_ordinal: Option<u16>,
}

impl PageWriter for TestPageWriter {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
fn write_page(&mut self, page: CompressedPage, aad_page_ordinal: Option<u16>) -> Result<PageWriteSpec> {
// We're a bit loose in this assertion -- the caller could write or not write a dictionary page.
match aad_page_ordinal {
Some(n) if n != 0 => {
assert_eq!(self.last_page_ordinal, Some(n - 1));
}
_ => {
assert_eq!(None, self.last_page_ordinal);
}
}
self.last_page_ordinal = aad_page_ordinal;

// Note, the normal PageWriteSpec result would include PageMetaData overhead, and these
// values are thus not perfectly faked, but the only thing that looks at them are test
// assertions.
let mut res = PageWriteSpec::new();
res.page_type = page.page_type();
res.uncompressed_size = page.uncompressed_size();
res.compressed_size = page.compressed_size();
res.compressed_size = self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.compressed_unencrypted_size();
res.num_values = page.num_values();
res.offset = 0;
res.bytes_written = page.data().len() as u64;
res.bytes_written = (self.simulate_encrypted as usize * USUAL_ENCRYPTION_OVERHEAD + page.data().len()) as u64;
Ok(res)
}

Expand Down
Loading

0 comments on commit 9d6173c

Please sign in to comment.