Skip to content

Commit

Permalink
Blockwise IO in IPC FileReader (#5153) (#5179)
Browse files Browse the repository at this point in the history
* Blockwise IO in IPC FileReader (#5153)

* Docs

* Clippy

* Update arrow-ipc/src/reader.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb authored Dec 8, 2023
1 parent c5a9953 commit 9630aaf
Showing 1 changed file with 48 additions and 73 deletions.
121 changes: 48 additions & 73 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;

use arrow_array::*;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::ArrayData;
use arrow_schema::*;

use crate::compression::CompressionCodec;
use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER};
use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
use DataType::*;

/// Read a buffer based on offset and length
Expand Down Expand Up @@ -498,10 +498,34 @@ pub fn read_dictionary(
Ok(())
}

/// Read the data for a given block
fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
reader.seek(SeekFrom::Start(block.offset() as u64))?;
let body_len = block.bodyLength().to_usize().unwrap();
let metadata_len = block.metaDataLength().to_usize().unwrap();
let total_len = body_len.checked_add(metadata_len).unwrap();

let mut buf = MutableBuffer::from_len_zeroed(total_len);
reader.read_exact(&mut buf)?;
Ok(buf.into())
}

/// Parse an encapsulated message
///
/// <https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
let buf = match buf[..4] == CONTINUATION_MARKER {
true => &buf[8..],
false => &buf[4..],
};
crate::root_as_message(buf)
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
reader: BufReader<R>,
reader: R,

/// The schema that is read from the file header
schema: SchemaRef,
Expand Down Expand Up @@ -535,45 +559,35 @@ pub struct FileReader<R: Read + Seek> {
impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("reader", &"BufReader<..>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
.field("total_blocks", &self.total_blocks)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
.finish()
.finish_non_exhaustive()
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let mut reader = BufReader::new(reader);
// check if header and footer contain correct magic bytes
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
/// Returns errors if the file does not meet the Arrow Format footer requirements
pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
// Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
let mut buffer = [0; 10];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut buffer)?;

if buffer[4..] != super::ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct footer".to_string(),
));
}

// read footer length
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);
let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap());

// read footer
let mut footer_data = vec![0; footer_len as usize];
Expand Down Expand Up @@ -607,35 +621,14 @@ impl<R: Read + Seek> FileReader<R> {
let mut dictionaries_by_id = HashMap::new();
if let Some(dictionaries) = footer.dictionaries() {
for block in dictionaries {
// read length from end of offset
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
}
let footer_len = i32::from_le_bytes(message_size);
let mut block_data = vec![0; footer_len as usize];

reader.read_exact(&mut block_data)?;

let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;
let buf = read_block(&mut reader, block)?;
let message = parse_message(&buf)?;

match message.header_type() {
crate::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();

// read the block that makes up the dictionary batch into a buffer
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;

read_dictionary(
&buf.into(),
&buf.slice(block.metaDataLength() as _),
batch,
&schema,
&mut dictionaries_by_id,
Expand Down Expand Up @@ -702,27 +695,15 @@ impl<R: Read + Seek> FileReader<R> {
}

fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
let block = self.blocks[self.current_block];
let block = &self.blocks[self.current_block];
self.current_block += 1;

// read length
self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
let mut meta_buf = [0; 4];
self.reader.read_exact(&mut meta_buf)?;
if meta_buf == CONTINUATION_MARKER {
// continuation marker encountered, read message next
self.reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);

let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = crate::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as footer: {err:?}"))
})?;
let buffer = read_block(&mut self.reader, block)?;
let message = parse_message(&buffer)?;

// some old test data's footer metadata is not set, so we account for that
if self.metadata_version != crate::MetadataVersion::V1
if self.metadata_version != MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IpcError(
Expand All @@ -739,14 +720,8 @@ impl<R: Read + Seek> FileReader<R> {
ArrowError::IpcError("Unable to read IPC message as record batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;

read_record_batch(
&buf.into(),
&buffer.slice(block.metaDataLength() as _),
batch,
self.schema(),
&self.dictionaries_by_id,
Expand All @@ -766,14 +741,14 @@ impl<R: Read + Seek> FileReader<R> {
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_ref(&self) -> &R {
self.reader.get_ref()
&self.reader
}

/// Gets a mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
self.reader.get_mut()
&mut self.reader
}
}

Expand Down

0 comments on commit 9630aaf

Please sign in to comment.