From 9630aaf55bda98e2028c4f44e6a7264ec41e04d5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 8 Dec 2023 21:51:48 +0000 Subject: [PATCH] Blockwise IO in IPC FileReader (#5153) (#5179) * Blockwise IO in IPC FileReader (#5153) * Docs * Clippy * Update arrow-ipc/src/reader.rs Co-authored-by: Andrew Lamb --------- Co-authored-by: Andrew Lamb --- arrow-ipc/src/reader.rs | 121 ++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 73 deletions(-) diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 6f2cb30a1629..06e53505fc22 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_array::*; -use arrow_buffer::{Buffer, MutableBuffer}; +use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; use arrow_data::ArrayData; use arrow_schema::*; use crate::compression::CompressionCodec; -use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER}; +use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER}; use DataType::*; /// Read a buffer based on offset and length @@ -498,10 +498,34 @@ pub fn read_dictionary( Ok(()) } +/// Read the data for a given block +fn read_block(mut reader: R, block: &Block) -> Result { + reader.seek(SeekFrom::Start(block.offset() as u64))?; + let body_len = block.bodyLength().to_usize().unwrap(); + let metadata_len = block.metaDataLength().to_usize().unwrap(); + let total_len = body_len.checked_add(metadata_len).unwrap(); + + let mut buf = MutableBuffer::from_len_zeroed(total_len); + reader.read_exact(&mut buf)?; + Ok(buf.into()) +} + +/// Parse an encapsulated message +/// +/// +fn parse_message(buf: &[u8]) -> Result { + let buf = match buf[..4] == CONTINUATION_MARKER { + true => &buf[8..], + false => &buf[4..], + }; + crate::root_as_message(buf) + .map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))) +} + /// Arrow File reader pub struct FileReader { /// Buffered file reader that supports reading and seeking - reader: BufReader, + reader: R, /// The schema that is read from the file header schema: SchemaRef, @@ -535,7 +559,6 @@ pub struct FileReader { impl fmt::Debug for FileReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { f.debug_struct("FileReader") - .field("reader", &"BufReader<..>") .field("schema", &self.schema) .field("blocks", &self.blocks) .field("current_block", &self.current_block) @@ -543,37 +566,28 @@ impl fmt::Debug for FileReader { .field("dictionaries_by_id", &self.dictionaries_by_id) .field("metadata_version", &self.metadata_version) .field("projection", &self.projection) - .finish() + .finish_non_exhaustive() } } impl FileReader { /// Try to create a new file reader /// - /// Returns errors if the file does not meet the Arrow Format header and footer - /// requirements - pub fn try_new(reader: R, projection: Option>) -> Result { - let mut reader = BufReader::new(reader); - // check if header and footer contain correct magic bytes - let mut magic_buffer: [u8; 6] = [0; 6]; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != super::ARROW_MAGIC { - return Err(ArrowError::ParseError( - "Arrow file does not contain correct header".to_string(), - )); - } - reader.seek(SeekFrom::End(-6))?; - reader.read_exact(&mut magic_buffer)?; - if magic_buffer != super::ARROW_MAGIC { + /// Returns errors if the file does not meet the Arrow Format footer requirements + pub fn try_new(mut reader: R, projection: Option>) -> Result { + // Space for ARROW_MAGIC (6 bytes) and length (4 bytes) + let mut buffer = [0; 10]; + reader.seek(SeekFrom::End(-10))?; + reader.read_exact(&mut buffer)?; + + if buffer[4..] != super::ARROW_MAGIC { return Err(ArrowError::ParseError( "Arrow file does not contain correct footer".to_string(), )); } + // read footer length - let mut footer_size: [u8; 4] = [0; 4]; - reader.seek(SeekFrom::End(-10))?; - reader.read_exact(&mut footer_size)?; - let footer_len = i32::from_le_bytes(footer_size); + let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap()); // read footer let mut footer_data = vec![0; footer_len as usize]; @@ -607,35 +621,14 @@ impl FileReader { let mut dictionaries_by_id = HashMap::new(); if let Some(dictionaries) = footer.dictionaries() { for block in dictionaries { - // read length from end of offset - let mut message_size: [u8; 4] = [0; 4]; - reader.seek(SeekFrom::Start(block.offset() as u64))?; - reader.read_exact(&mut message_size)?; - if message_size == CONTINUATION_MARKER { - reader.read_exact(&mut message_size)?; - } - let footer_len = i32::from_le_bytes(message_size); - let mut block_data = vec![0; footer_len as usize]; - - reader.read_exact(&mut block_data)?; - - let message = crate::root_as_message(&block_data[..]).map_err(|err| { - ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) - })?; + let buf = read_block(&mut reader, block)?; + let message = parse_message(&buf)?; match message.header_type() { crate::MessageHeader::DictionaryBatch => { let batch = message.header_as_dictionary_batch().unwrap(); - - // read the block that makes up the dictionary batch into a buffer - let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); - reader.seek(SeekFrom::Start( - block.offset() as u64 + block.metaDataLength() as u64, - ))?; - reader.read_exact(&mut buf)?; - read_dictionary( - &buf.into(), + &buf.slice(block.metaDataLength() as _), batch, &schema, &mut dictionaries_by_id, @@ -702,27 +695,15 @@ impl FileReader { } fn maybe_next(&mut self) -> Result, ArrowError> { - let block = self.blocks[self.current_block]; + let block = &self.blocks[self.current_block]; self.current_block += 1; // read length - self.reader.seek(SeekFrom::Start(block.offset() as u64))?; - let mut meta_buf = [0; 4]; - self.reader.read_exact(&mut meta_buf)?; - if meta_buf == CONTINUATION_MARKER { - // continuation marker encountered, read message next - self.reader.read_exact(&mut meta_buf)?; - } - let meta_len = i32::from_le_bytes(meta_buf); - - let mut block_data = vec![0; meta_len as usize]; - self.reader.read_exact(&mut block_data)?; - let message = crate::root_as_message(&block_data[..]).map_err(|err| { - ArrowError::ParseError(format!("Unable to get root as footer: {err:?}")) - })?; + let buffer = read_block(&mut self.reader, block)?; + let message = parse_message(&buffer)?; // some old test data's footer metadata is not set, so we account for that - if self.metadata_version != crate::MetadataVersion::V1 + if self.metadata_version != MetadataVersion::V1 && message.version() != self.metadata_version { return Err(ArrowError::IpcError( @@ -739,14 +720,8 @@ impl FileReader { ArrowError::IpcError("Unable to read IPC message as record batch".to_string()) })?; // read the block that makes up the record batch into a buffer - let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); - self.reader.seek(SeekFrom::Start( - block.offset() as u64 + block.metaDataLength() as u64, - ))?; - self.reader.read_exact(&mut buf)?; - read_record_batch( - &buf.into(), + &buffer.slice(block.metaDataLength() as _), batch, self.schema(), &self.dictionaries_by_id, @@ -766,14 +741,14 @@ impl FileReader { /// /// It is inadvisable to directly read from the underlying reader. pub fn get_ref(&self) -> &R { - self.reader.get_ref() + &self.reader } /// Gets a mutable reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. pub fn get_mut(&mut self) -> &mut R { - self.reader.get_mut() + &mut self.reader } }