Skip to content

Commit

Permalink
Skip writing down null buffers for non-nullable primitive arrays (#6524)
Browse files Browse the repository at this point in the history
* Add a new roundtrip test and improve validations

The new validations should help improve coverage from the existing
tests, but none of them currently cover the null-masking bug.

* Don't return nulls for non-nullable cols in reader

* Coverage for more non-null nested leaf types

* Update parquet/src/arrow/arrow_writer/mod.rs

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
bkirwi and tustvold authored Oct 21, 2024
1 parent 4d6edea commit 936e40e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 5 deletions.
85 changes: 84 additions & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,85 @@ mod tests {
roundtrip(batch, Some(SMALL_SIZE / 2));
}

#[test]
fn arrow_writer_2_level_struct_mixed_null_2() {
// tests writing <struct<struct<primitive>>, where the primitive columns are non-null.
let field_c = Field::new("c", DataType::Int32, false);
let field_d = Field::new("d", DataType::FixedSizeBinary(4), false);
let field_e = Field::new(
"e",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
);

let field_b = Field::new(
"b",
DataType::Struct(vec![field_c, field_d, field_e].into()),
false,
);
let type_a = DataType::Struct(vec![field_b.clone()].into());
let field_a = Field::new("a", type_a, true);
let schema = Schema::new(vec![field_a.clone()]);

// create data
let c = Int32Array::from_iter_values(0..6);
let d = FixedSizeBinaryArray::try_from_iter(
["aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"].into_iter(),
)
.expect("four byte values");
let e = Int32DictionaryArray::from_iter(["one", "two", "three", "four", "five", "one"]);
let b_data = ArrayDataBuilder::new(field_b.data_type().clone())
.len(6)
.add_child_data(c.into_data())
.add_child_data(d.into_data())
.add_child_data(e.into_data())
.build()
.unwrap();
let b = StructArray::from(b_data);
let a_data = ArrayDataBuilder::new(field_a.data_type().clone())
.len(6)
.null_bit_buffer(Some(Buffer::from([0b00100101])))
.add_child_data(b.into_data())
.build()
.unwrap();
let a = StructArray::from(a_data);

assert_eq!(a.null_count(), 3);
assert_eq!(a.column(0).null_count(), 0);

// build a record batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();

roundtrip(batch, Some(SMALL_SIZE / 2));
}

#[test]
fn test_empty_dict() {
let struct_fields = Fields::from(vec![Field::new(
"dict",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
)]);

let schema = Schema::new(vec![Field::new_struct(
"struct",
struct_fields.clone(),
true,
)]);
let dictionary = Arc::new(DictionaryArray::new(
Int32Array::new_null(5),
Arc::new(StringArray::new_null(0)),
));

let s = StructArray::new(
struct_fields,
vec![dictionary],
Some(NullBuffer::new_null(5)),
);

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(s)]).unwrap();
roundtrip(batch, None);
}
#[test]
fn arrow_writer_page_size() {
let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
Expand Down Expand Up @@ -1735,7 +1814,11 @@ mod tests {
}

fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
roundtrip_opts_with_array_validation(expected_batch, props, |a, b| assert_eq!(a, b))
roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
a.validate_full().expect("valid expected data");
b.validate_full().expect("valid actual data");
assert_eq!(a, b)
})
}

struct RoundTripOptions {
Expand Down
21 changes: 17 additions & 4 deletions parquet/src/arrow/record_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ where
std::mem::take(&mut self.values)
}

/// Returns currently stored null bitmap data.
/// Returns currently stored null bitmap data for nullable columns.
/// For non-nullable columns, the bitmap is discarded.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
self.consume_bitmap()
Expand All @@ -186,11 +187,23 @@ where
self.num_records = 0;
}

/// Returns bitmap data.
/// Returns bitmap data for nullable columns.
/// For non-nullable columns, the bitmap is discarded.
pub fn consume_bitmap(&mut self) -> Option<Buffer> {
self.def_levels
let mask = self
.def_levels
.as_mut()
.map(|levels| levels.consume_bitmask())
.map(|levels| levels.consume_bitmask());

// While we always consume the bitmask here, we only want to return
// the bitmask for nullable arrays. (Marking nulls on a non-nullable
// array may fail validations, even if those nulls are masked off at
// a higher level.)
if self.column_desc.self_type().is_optional() {
mask
} else {
None
}
}

/// Try to read one batch of data returning the number of records read
Expand Down

0 comments on commit 936e40e

Please sign in to comment.