Skip to content

Commit

Permalink
Remove internal buffering from AsyncArrowWriter (#5484) (#5485)
Browse files Browse the repository at this point in the history
* Remove confusing buffer_size from AsyncArrowWriter (#5484)

* Review feedback
  • Loading branch information
tustvold authored Mar 13, 2024
1 parent c252a18 commit 19a3bb0
Showing 1 changed file with 59 additions and 133 deletions.
192 changes: 59 additions & 133 deletions parquet/src/arrow/async_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@
//! # #[tokio::main(flavor="current_thread")]
//! # async fn main() {
//! #
//! use std::sync::Arc;
//! use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader};
//! use bytes::Bytes;
//! use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder};
//!
//! # use std::sync::Arc;
//! # use arrow_array::{ArrayRef, Int64Array, RecordBatch, RecordBatchReader};
//! # use bytes::Bytes;
//! # use parquet::arrow::{AsyncArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder};
//! #
//! let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
//! let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
//!
//! let mut buffer = Vec::new();
//! let mut writer =
//! AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), 0, None).unwrap();
//! let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
//! writer.write(&to_write).await.unwrap();
//! writer.close().await.unwrap();
//!
Expand All @@ -62,19 +61,19 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use tokio::io::{AsyncWrite, AsyncWriteExt};

/// Async arrow writer.
/// Encodes [`RecordBatch`] to parquet, outputting to an [`AsyncWrite`]
///
/// It is implemented based on the sync writer [`ArrowWriter`] with an inner buffer.
/// The buffered data will be flushed to the writer provided by caller when the
/// buffer's threshold is exceeded.
/// ## Memory Usage
///
/// ## Memory Limiting
/// This writer eagerly writes data as soon as possible to the underlying [`AsyncWrite`],
/// permitting fine-grained control over buffering and I/O scheduling. However, the columnar
/// nature of parquet forces data for an entire row group to be buffered in memory, before
/// it can be flushed. Depending on the data and the configured row group size, this buffering
/// may be substantial.
///
/// The nature of parquet forces buffering of an entire row group before it can be flushed
/// to the underlying writer. This buffering may exceed the configured buffer size
/// of [`AsyncArrowWriter`]. Memory usage can be limited by prematurely flushing the row group,
/// although this will have implications for file size and query performance. See [ArrowWriter]
/// for more information.
/// Memory usage can be limited by calling [`Self::flush`] to flush the in progress row group,
/// although this will likely increase overall file size and reduce query performance.
/// See [ArrowWriter] for more information.
///
/// ```no_run
/// # use tokio::fs::File;
Expand All @@ -96,50 +95,30 @@ pub struct AsyncArrowWriter<W> {

/// Async writer provided by caller
async_writer: W,

/// Trigger forced flushing once buffer size reaches this value
buffer_size: usize,
}

impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// Try to create a new Async Arrow Writer.
///
/// `buffer_size` determines the minimum number of bytes to buffer before flushing
/// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may
/// force buffering of data in excess of this within the underlying [`ArrowWriter`].
/// See the documentation on [`ArrowWriter`] for more details
/// Try to create a new Async Arrow Writer
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
buffer_size: usize,
props: Option<WriterProperties>,
) -> Result<Self> {
let options = ArrowWriterOptions::new().with_properties(props.unwrap_or_default());
Self::try_new_with_options(writer, arrow_schema, buffer_size, options)
Self::try_new_with_options(writer, arrow_schema, options)
}

/// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`].
///
/// `buffer_size` determines the minimum number of bytes to buffer before flushing
/// to the underlying [`AsyncWrite`]. However, the nature of writing parquet may
/// force buffering of data in excess of this within the underlying [`ArrowWriter`].
/// See the documentation on [`ArrowWriter`] for more details
/// Try to create a new Async Arrow Writer with [`ArrowWriterOptions`]
pub fn try_new_with_options(
writer: W,
arrow_schema: SchemaRef,
buffer_size: usize,
options: ArrowWriterOptions,
) -> Result<Self> {
let sync_writer = ArrowWriter::try_new_with_options(
Vec::with_capacity(buffer_size),
arrow_schema,
options,
)?;
let sync_writer = ArrowWriter::try_new_with_options(Vec::new(), arrow_schema, options)?;

Ok(Self {
sync_writer,
async_writer: writer,
buffer_size,
})
}

Expand Down Expand Up @@ -168,14 +147,18 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
/// After every sync write by the inner [ArrowWriter], the inner buffer will be
/// checked and flush if at least half full
pub async fn write(&mut self, batch: &RecordBatch) -> Result<()> {
let before = self.sync_writer.flushed_row_groups().len();
self.sync_writer.write(batch)?;
self.try_flush(false).await
if before != self.sync_writer.flushed_row_groups().len() {
self.do_write().await?;
}
Ok(())
}

/// Flushes all buffered rows into a new row group
pub async fn flush(&mut self) -> Result<()> {
self.sync_writer.flush()?;
self.try_flush(false).await?;
self.do_write().await?;

Ok(())
}
Expand All @@ -194,19 +177,15 @@ impl<W: AsyncWrite + Unpin + Send> AsyncArrowWriter<W> {
let metadata = self.sync_writer.finish()?;

// Force to flush the remaining data.
self.try_flush(true).await?;
self.do_write().await?;
self.async_writer.shutdown().await?;

Ok(metadata)
}

/// Flush the buffered data into the `async_writer`
async fn try_flush(&mut self, force: bool) -> Result<()> {
/// Flush the data written by `sync_writer` into the `async_writer`
async fn do_write(&mut self) -> Result<()> {
let buffer = self.sync_writer.inner_mut();
if !force && (buffer.is_empty() || buffer.len() < self.buffer_size) {
// no need to flush
return Ok(());
}

self.async_writer
.write_all(buffer.as_slice())
Expand Down Expand Up @@ -254,8 +233,7 @@ mod tests {
let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();

let mut buffer = Vec::new();
let mut writer =
AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), 0, None).unwrap();
let mut writer = AsyncArrowWriter::try_new(&mut buffer, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

Expand Down Expand Up @@ -283,7 +261,6 @@ mod tests {
let mut async_writer = AsyncArrowWriter::try_new(
&mut async_buffer,
reader.schema(),
1024,
Some(write_props.clone()),
)
.unwrap();
Expand Down Expand Up @@ -345,54 +322,6 @@ mod tests {
}
}

#[tokio::test]
async fn test_async_writer_with_buffer_flush_threshold() {
let write_props = WriterProperties::builder()
.set_max_row_group_size(2048)
.build();
let expect_encode_size = {
let reader = get_test_reader();
let mut buffer = Vec::new();
let mut async_writer = AsyncArrowWriter::try_new(
&mut buffer,
reader.schema(),
0,
Some(write_props.clone()),
)
.unwrap();
for record_batch in reader {
let record_batch = record_batch.unwrap();
async_writer.write(&record_batch).await.unwrap();
}
async_writer.close().await.unwrap();
buffer.len()
};

let test_buffer_flush_thresholds = vec![0, 1024, 40 * 1024, 50 * 1024, 100 * 1024];

for buffer_flush_threshold in test_buffer_flush_thresholds {
let reader = get_test_reader();
let mut test_async_sink = TestAsyncSink {
sink: Vec::new(),
min_accept_bytes: buffer_flush_threshold,
expect_total_bytes: expect_encode_size,
};
let mut async_writer = AsyncArrowWriter::try_new(
&mut test_async_sink,
reader.schema(),
buffer_flush_threshold * 2,
Some(write_props.clone()),
)
.unwrap();

for record_batch in reader {
let record_batch = record_batch.unwrap();
async_writer.write(&record_batch).await.unwrap();
}
async_writer.close().await.unwrap();
}
}

#[tokio::test]
async fn test_async_writer_file() {
let col = Arc::new(Int64Array::from_iter_values([1, 2, 3])) as ArrayRef;
Expand All @@ -406,7 +335,7 @@ mod tests {
let temp = tempfile::tempfile().unwrap();

let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), 0, None).unwrap();
let mut writer = AsyncArrowWriter::try_new(file, to_write.schema(), None).unwrap();
writer.write(&to_write).await.unwrap();
writer.close().await.unwrap();

Expand All @@ -430,36 +359,33 @@ mod tests {
// build a record batch
let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap();

for buffer_size in [0, 8, 1024] {
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer =
AsyncArrowWriter::try_new(file, batch.schema(), buffer_size, None).unwrap();

// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
writer.write(&batch).await.unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());

// updated on second write
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().await.unwrap();
}
let temp = tempfile::tempfile().unwrap();
let file = tokio::fs::File::from_std(temp.try_clone().unwrap());
let mut writer = AsyncArrowWriter::try_new(file, batch.schema(), None).unwrap();

// starts empty
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert_eq!(writer.bytes_written(), 4); // Initial Parquet header
writer.write(&batch).await.unwrap();

// updated on write
let initial_size = writer.in_progress_size();
assert!(initial_size > 0);
assert_eq!(writer.in_progress_rows(), batch.num_rows());

// updated on second write
writer.write(&batch).await.unwrap();
assert!(writer.in_progress_size() > initial_size);
assert_eq!(writer.in_progress_rows(), batch.num_rows() * 2);

// in progress tracking is cleared, but the overall data written is updated
let pre_flush_bytes_written = writer.bytes_written();
writer.flush().await.unwrap();
assert_eq!(writer.in_progress_size(), 0);
assert_eq!(writer.in_progress_rows(), 0);
assert!(writer.bytes_written() > pre_flush_bytes_written);

writer.close().await.unwrap();
}
}

0 comments on commit 19a3bb0

Please sign in to comment.