Skip to content

Commit

Permalink
Fix encoding/decoding REE Dicts when using streaming IPC (#6399)
Browse files Browse the repository at this point in the history
* arrow-ipc: Add test for streaming IPC with REE dicts

* arrow-schema: Include child fields of REE fields
  • Loading branch information
brancz authored Sep 18, 2024
1 parent f5a6382 commit e7598a4
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
63 changes: 61 additions & 2 deletions arrow-ipc/src/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,10 @@ impl StreamDecoder {
#[cfg(test)]
mod tests {
use super::*;
use crate::writer::StreamWriter;
use arrow_array::{Int32Array, Int64Array, RecordBatch};
use crate::writer::{IpcWriteOptions, StreamWriter};
use arrow_array::{
types::Int32Type, DictionaryArray, Int32Array, Int64Array, RecordBatch, RunArray,
};
use arrow_schema::{DataType, Field, Schema};

// Further tests in arrow-integration-testing/tests/ipc_reader.rs
Expand Down Expand Up @@ -315,4 +317,61 @@ mod tests {
let err = decoder.finish().unwrap_err().to_string();
assert_eq!(err, "Ipc error: Unexpected End of Stream");
}

#[test]
fn test_read_ree_dict_record_batches_from_buffer() {
let schema = Schema::new(vec![Field::new(
"test1",
DataType::RunEndEncoded(
Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)),
Arc::new(Field::new_dict(
"values".to_string(),
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
0,
false,
)),
),
true,
)]);
let batch = RecordBatch::try_new(
schema.clone().into(),
vec![Arc::new(
RunArray::try_new(
&Int32Array::from(vec![1, 2, 3]),
&vec![Some("a"), None, Some("a")]
.into_iter()
.collect::<DictionaryArray<Int32Type>>(),
)
.expect("Failed to create RunArray"),
)],
)
.expect("Failed to create RecordBatch");

let mut buffer = vec![];
{
let mut writer = StreamWriter::try_new_with_options(
&mut buffer,
&schema,
IpcWriteOptions::default().with_preserve_dict_id(false),
)
.expect("Failed to create StreamWriter");
writer.write(&batch).expect("Failed to write RecordBatch");
writer.finish().expect("Failed to finish StreamWriter");
}

let mut decoder = StreamDecoder::new();
let buf = &mut Buffer::from(buffer.as_slice());
while let Some(batch) = decoder
.decode(buf)
.map_err(|e| {
ArrowError::ExternalError(format!("Failed to decode record batch: {}", e).into())
})
.expect("Failed to decode record batch")
{
assert_eq!(batch, batch);
}

decoder.finish().expect("Failed to finish decoder");
}
}
1 change: 1 addition & 0 deletions arrow-schema/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ impl Field {
| DataType::FixedSizeList(field, _)
| DataType::Map(field, _) => field.fields(),
DataType::Dictionary(_, value_field) => Field::_fields(value_field.as_ref()),
DataType::RunEndEncoded(_, field) => field.fields(),
_ => vec![],
}
}
Expand Down

0 comments on commit e7598a4

Please sign in to comment.