From 9877966c8856f165bfce8d76afc16ebae43b1f3b Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 28 Aug 2024 12:51:26 -0500 Subject: [PATCH] Ensure IPC stream messages are contiguous --- arrow-flight/src/lib.rs | 3 +- arrow-ipc/src/writer.rs | 125 +++++++++++++++++++++++++++------------- 2 files changed, 86 insertions(+), 42 deletions(-) diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs index 64e3ba01c5bd..51002fb8ceb9 100644 --- a/arrow-flight/src/lib.rs +++ b/arrow-flight/src/lib.rs @@ -399,7 +399,8 @@ fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult { let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1); let mut schema = vec![]; - writer::write_message(&mut schema, encoded_data, pair.1)?; + let already_written_len = 0; + writer::write_message(&mut schema, already_written_len, encoded_data, pair.1)?; Ok(IpcMessage(schema.into())) } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index b5cf20ef337f..f268f1133d44 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -43,7 +43,7 @@ use crate::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] #[derive(Debug, Clone)] pub struct IpcWriteOptions { - /// Write padding after memory buffers to this multiple of bytes. + /// Write padding to ensure that each data buffer is aligned to this multiple of bytes. /// Must be 8, 16, 32, or 64 - defaults to 64. alignment: u8, /// The legacy format is for releases before 0.15.0, and uses metadata V4 @@ -867,12 +867,12 @@ impl DictionaryTracker { pub struct FileWriter { /// The object to write to writer: W, + /// The number of bytes written + written_len: usize, /// IPC write options write_options: IpcWriteOptions, /// A reference to the schema, used in validating record batches schema: SchemaRef, - /// The number of bytes between each block of bytes, as an offset for random access - block_offsets: usize, /// Dictionary blocks that will be written as part of the IPC footer dictionary_blocks: Vec, /// Record blocks that will be written as part of the IPC footer @@ -922,12 +922,17 @@ impl FileWriter { write_options: IpcWriteOptions, ) -> Result { let data_gen = IpcDataGenerator::default(); - // write magic to header aligned on alignment boundary - let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len()); - let header_size = super::ARROW_MAGIC.len() + pad_len; + + let mut written_len = 0; + + // write magic and padding writer.write_all(&super::ARROW_MAGIC)?; + written_len += super::ARROW_MAGIC.len(); + let pad_len = pad_to_alignment(8, written_len); writer.write_all(&PADDING[..pad_len])?; - // write the schema, set the written bytes to the schema + header + written_len += pad_len; + + // write the schema let preserve_dict_id = write_options.preserve_dict_id; let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id); @@ -936,12 +941,20 @@ impl FileWriter { &mut dictionary_tracker, &write_options, ); - let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; + + let (meta, data) = write_message(&mut writer, written_len, encoded_message, &write_options)?; + + // The schema message has no body + debug_assert_eq!(data, 0); + + // written bytes = padded_magic + schema + written_len += meta; + Ok(Self { writer, + written_len, write_options, schema: Arc::new(schema.clone()), - block_offsets: meta + data + header_size, dictionary_blocks: vec![], record_blocks: vec![], finished: false, @@ -970,23 +983,32 @@ impl FileWriter { )?; for encoded_dictionary in encoded_dictionaries { - let (meta, data) = - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + let (meta, data) = write_message( + &mut self.writer, + self.written_len, + encoded_dictionary, + &self.write_options, + )?; - let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64); + let block = crate::Block::new(self.written_len as i64, meta as i32, data as i64); self.dictionary_blocks.push(block); - self.block_offsets += meta + data; + self.written_len += meta + data; } - let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?; + let (meta, data) = write_message( + &mut self.writer, + self.written_len, + encoded_message, + &self.write_options, + )?; // add a record block for the footer let block = crate::Block::new( - self.block_offsets as i64, + self.written_len as i64, meta as i32, // TODO: is this still applicable? data as i64, ); self.record_blocks.push(block); - self.block_offsets += meta + data; + self.written_len += meta + data; Ok(()) } @@ -999,7 +1021,7 @@ impl FileWriter { } // write EOS - write_continuation(&mut self.writer, &self.write_options, 0)?; + self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?; let mut fbb = FlatBufferBuilder::new(); let dictionaries = fbb.create_vector(&self.dictionary_blocks); @@ -1089,6 +1111,8 @@ impl RecordBatchWriter for FileWriter { pub struct StreamWriter { /// The object to write to writer: W, + /// The number of bytes written + written_len: usize, /// IPC write options write_options: IpcWriteOptions, /// Whether the writer footer has been written, and the writer is finished @@ -1142,9 +1166,14 @@ impl StreamWriter { &mut dictionary_tracker, &write_options, ); - write_message(&mut writer, encoded_message, &write_options)?; + let (meta, data) = write_message(&mut writer, 0, encoded_message, &write_options)?; + + // The schema message has no body + debug_assert_eq!(data, 0); + Ok(Self { writer, + written_len: meta, write_options, finished: false, dictionary_tracker, @@ -1166,10 +1195,22 @@ impl StreamWriter { .expect("StreamWriter is configured to not error on dictionary replacement"); for encoded_dictionary in encoded_dictionaries { - write_message(&mut self.writer, encoded_dictionary, &self.write_options)?; + let (meta, data) = write_message( + &mut self.writer, + self.written_len, + encoded_dictionary, + &self.write_options, + )?; + self.written_len += meta + data; } - write_message(&mut self.writer, encoded_message, &self.write_options)?; + let (meta, data) = write_message( + &mut self.writer, + self.written_len, + encoded_message, + &self.write_options, + )?; + self.written_len += meta + data; Ok(()) } @@ -1181,7 +1222,7 @@ impl StreamWriter { )); } - write_continuation(&mut self.writer, &self.write_options, 0)?; + self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?; self.finished = true; @@ -1274,48 +1315,50 @@ pub struct EncodedData { /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written pub fn write_message( mut writer: W, + already_written_len: usize, encoded: EncodedData, write_options: &IpcWriteOptions, ) -> Result<(usize, usize), ArrowError> { - let arrow_data_len = encoded.arrow_data.len(); - if arrow_data_len % usize::from(write_options.alignment) != 0 { + if already_written_len % 8 != 0 { return Err(ArrowError::MemoryError( - "Arrow data not aligned".to_string(), + "Writing an IPC Message unaligned to 8 bytes".to_string(), )); } - let a = usize::from(write_options.alignment - 1); - let buffer = encoded.ipc_message; - let flatbuf_size = buffer.len(); - let prefix_size = if write_options.write_legacy_ipc_format { + let continuation_size = if write_options.write_legacy_ipc_format { 4 } else { 8 }; - let aligned_size = (flatbuf_size + prefix_size + a) & !a; - let padding_bytes = aligned_size - flatbuf_size - prefix_size; + let flatbuf_size = encoded.ipc_message.len(); + assert_ne!(flatbuf_size, 0); + + let padding_size = pad_to_alignment( + write_options.alignment, + already_written_len + continuation_size + flatbuf_size, + ); + let padded_size = continuation_size + flatbuf_size + padding_size; + assert_eq!( + (already_written_len + padded_size) % write_options.alignment as usize, + 0 + ); + // write continuation, flatbuf, and padding write_continuation( &mut writer, write_options, - (aligned_size - prefix_size) as i32, + (padded_size - continuation_size) as i32, )?; - - // write the flatbuf - if flatbuf_size > 0 { - writer.write_all(&buffer)?; - } - // write padding - writer.write_all(&PADDING[..padding_bytes])?; + writer.write_all(&encoded.ipc_message)?; + writer.write_all(&PADDING[..padding_size])?; // write arrow data - let body_len = if arrow_data_len > 0 { + let body_len = if !encoded.arrow_data.is_empty() { write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)? } else { 0 }; - - Ok((aligned_size, body_len)) + Ok((padded_size, body_len)) } fn write_body_buffers(