diff --git a/Cargo.lock b/Cargo.lock index c3109a177997..7d308d3988ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2853,7 +2853,6 @@ dependencies = [ "arrow-buffer", "arrow-data", "arrow-schema", - "async-stream", "atoi", "atoi_simd", "avro-schema", diff --git a/crates/polars-arrow/Cargo.toml b/crates/polars-arrow/Cargo.toml index 5e7e1eebff0a..379a9b090d18 100644 --- a/crates/polars-arrow/Cargo.toml +++ b/crates/polars-arrow/Cargo.toml @@ -55,9 +55,6 @@ zstd = { workspace = true, optional = true } # to write to parquet as a stream futures = { workspace = true, optional = true } -# to read IPC as a stream -async-stream = { version = "0.3.2", optional = true } - # avro support avro-schema = { workspace = true, optional = true } @@ -75,6 +72,7 @@ arrow-array = { workspace = true, optional = true } arrow-buffer = { workspace = true, optional = true } arrow-data = { workspace = true, optional = true } arrow-schema = { workspace = true, optional = true } +tokio = { workspace = true, optional = true } [dev-dependencies] criterion = "0.5" @@ -105,8 +103,6 @@ full = [ "arrow_rs", "io_ipc", "io_flight", - "io_ipc_write_async", - "io_ipc_read_async", "io_ipc_compression", "io_avro", "io_avro_compression", @@ -119,8 +115,6 @@ full = [ ] arrow_rs = ["arrow-buffer", "arrow-schema", "arrow-data", "arrow-array"] io_ipc = ["arrow-format", "polars-error/arrow-format"] -io_ipc_write_async = ["io_ipc", "futures"] -io_ipc_read_async = ["io_ipc", "futures", "async-stream"] io_ipc_compression = ["lz4", "zstd", "io_ipc"] io_flight = ["io_ipc", "arrow-format/flight-data"] diff --git a/crates/polars-arrow/src/io/ipc/read/file_async.rs b/crates/polars-arrow/src/io/ipc/read/file_async.rs deleted file mode 100644 index 567a58c1a1fb..000000000000 --- a/crates/polars-arrow/src/io/ipc/read/file_async.rs +++ /dev/null @@ -1,350 +0,0 @@ -//! Async reader for Arrow IPC files -use std::io::SeekFrom; - -use arrow_format::ipc::planus::ReadAsRoot; -use arrow_format::ipc::{Block, MessageHeaderRef}; -use futures::stream::BoxStream; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream, StreamExt}; -use polars_error::{polars_bail, polars_err, PolarsResult}; -use polars_utils::aliases::PlHashMap; - -use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch}; -use super::file::{deserialize_footer, get_record_batch}; -use super::{Dictionaries, FileMetadata, OutOfSpecKind}; -use crate::array::*; -use crate::datatypes::ArrowSchema; -use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER}; -use crate::record_batch::RecordBatchT; - -/// Async reader for Arrow IPC files -pub struct FileStream<'a> { - stream: BoxStream<'a, PolarsResult>>>, - schema: Option, - metadata: FileMetadata, -} - -impl<'a> FileStream<'a> { - /// Create a new IPC file reader. - /// - /// # Examples - /// See [`FileSink`](crate::io::ipc::write::file_async::FileSink). - pub fn new( - reader: R, - metadata: FileMetadata, - projection: Option>, - limit: Option, - ) -> Self - where - R: AsyncRead + AsyncSeek + Unpin + Send + 'a, - { - let (projection, schema) = if let Some(projection) = projection { - let (p, h, schema) = prepare_projection(&metadata.schema, projection); - (Some((p, h)), Some(schema)) - } else { - (None, None) - }; - - let stream = Self::stream(reader, None, metadata.clone(), projection, limit); - Self { - stream, - metadata, - schema, - } - } - - /// Get the metadata from the IPC file. - pub fn metadata(&self) -> &FileMetadata { - &self.metadata - } - - /// Get the projected schema from the IPC file. - pub fn schema(&self) -> &ArrowSchema { - self.schema.as_ref().unwrap_or(&self.metadata.schema) - } - - fn stream( - mut reader: R, - mut dictionaries: Option, - metadata: FileMetadata, - projection: Option<(Vec, PlHashMap)>, - limit: Option, - ) -> BoxStream<'a, PolarsResult>>> - where - R: AsyncRead + AsyncSeek + Unpin + Send + 'a, - { - async_stream::try_stream! { - // read dictionaries - cached_read_dictionaries(&mut reader, &metadata, &mut dictionaries).await?; - - let mut meta_buffer = Default::default(); - let mut block_buffer = Default::default(); - let mut scratch = Default::default(); - let mut remaining = limit.unwrap_or(usize::MAX); - for block in 0..metadata.blocks.len() { - let chunk = read_batch( - &mut reader, - dictionaries.as_mut().unwrap(), - &metadata, - projection.as_ref().map(|x| x.0.as_ref()), - Some(remaining), - block, - &mut meta_buffer, - &mut block_buffer, - &mut scratch - ).await?; - remaining -= chunk.len(); - - let chunk = if let Some((_, map)) = &projection { - // re-order according to projection - apply_projection(chunk, map) - } else { - chunk - }; - - yield chunk; - } - } - .boxed() - } -} - -impl<'a> Stream for FileStream<'a> { - type Item = PolarsResult>>; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.get_mut().stream.poll_next_unpin(cx) - } -} - -/// Reads the footer's length and magic number in footer -async fn read_footer_len(reader: &mut R) -> PolarsResult { - // read footer length and magic number in footer - reader.seek(SeekFrom::End(-10)).await?; - let mut footer: [u8; 10] = [0; 10]; - - reader.read_exact(&mut footer).await?; - let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap()); - - if footer[4..] != ARROW_MAGIC_V2 { - polars_bail!(oos = OutOfSpecKind::InvalidFooter) - } - footer_len - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength)) -} - -/// Read the metadata from an IPC file. -pub async fn read_file_metadata_async(reader: &mut R) -> PolarsResult -where - R: AsyncRead + AsyncSeek + Unpin, -{ - let footer_size = read_footer_len(reader).await?; - // Read footer - reader.seek(SeekFrom::End(-10 - footer_size as i64)).await?; - - let mut footer = vec![]; - footer.try_reserve(footer_size)?; - reader - .take(footer_size as u64) - .read_to_end(&mut footer) - .await?; - - deserialize_footer(&footer, u64::MAX) -} - -#[allow(clippy::too_many_arguments)] -async fn read_batch( - mut reader: R, - dictionaries: &mut Dictionaries, - metadata: &FileMetadata, - projection: Option<&[usize]>, - limit: Option, - block: usize, - meta_buffer: &mut Vec, - block_buffer: &mut Vec, - scratch: &mut Vec, -) -> PolarsResult>> -where - R: AsyncRead + AsyncSeek + Unpin, -{ - let block = metadata.blocks[block]; - - let offset: u64 = block - .offset - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - - reader.seek(SeekFrom::Start(offset)).await?; - let mut meta_buf = [0; 4]; - reader.read_exact(&mut meta_buf).await?; - if meta_buf == CONTINUATION_MARKER { - reader.read_exact(&mut meta_buf).await?; - } - - let meta_len = i32::from_le_bytes(meta_buf) - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?; - - meta_buffer.clear(); - meta_buffer.try_reserve(meta_len)?; - (&mut reader) - .take(meta_len as u64) - .read_to_end(meta_buffer) - .await?; - - let message = arrow_format::ipc::MessageRef::read_as_root(meta_buffer) - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - - let batch = get_record_batch(message)?; - - let block_length: usize = message - .body_length() - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))? - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?; - - block_buffer.clear(); - block_buffer.try_reserve(block_length)?; - reader - .take(block_length as u64) - .read_to_end(block_buffer) - .await?; - - let mut cursor = std::io::Cursor::new(&block_buffer); - - read_record_batch( - batch, - &metadata.schema, - &metadata.ipc_schema, - projection, - limit, - dictionaries, - message - .version() - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferVersion(err)))?, - &mut cursor, - 0, - metadata.size, - scratch, - ) -} - -async fn read_dictionaries( - mut reader: R, - fields: &ArrowSchema, - ipc_schema: &IpcSchema, - blocks: &[Block], - scratch: &mut Vec, -) -> PolarsResult -where - R: AsyncRead + AsyncSeek + Unpin, -{ - let mut dictionaries = Default::default(); - let mut data: Vec = vec![]; - let mut buffer: Vec = vec![]; - - for block in blocks { - let offset: u64 = block - .offset - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - - let length: usize = block - .body_length - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - - read_dictionary_message(&mut reader, offset, &mut data).await?; - - let message = arrow_format::ipc::MessageRef::read_as_root(data.as_ref()) - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - - let header = message - .header() - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferHeader(err)))? - .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?; - - match header { - MessageHeaderRef::DictionaryBatch(batch) => { - buffer.clear(); - buffer.try_reserve(length)?; - (&mut reader) - .take(length as u64) - .read_to_end(&mut buffer) - .await?; - let mut cursor = std::io::Cursor::new(&buffer); - read_dictionary( - batch, - fields, - ipc_schema, - &mut dictionaries, - &mut cursor, - 0, - u64::MAX, - scratch, - )?; - }, - _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType), - } - } - Ok(dictionaries) -} - -async fn read_dictionary_message( - mut reader: R, - offset: u64, - data: &mut Vec, -) -> PolarsResult<()> -where - R: AsyncRead + AsyncSeek + Unpin, -{ - let mut message_size = [0; 4]; - reader.seek(SeekFrom::Start(offset)).await?; - reader.read_exact(&mut message_size).await?; - if message_size == CONTINUATION_MARKER { - reader.read_exact(&mut message_size).await?; - } - let footer_size = i32::from_le_bytes(message_size); - - let footer_size: usize = footer_size - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - - data.clear(); - data.try_reserve(footer_size)?; - (&mut reader) - .take(footer_size as u64) - .read_to_end(data) - .await?; - - Ok(()) -} - -async fn cached_read_dictionaries( - reader: &mut R, - metadata: &FileMetadata, - dictionaries: &mut Option, -) -> PolarsResult<()> { - match (&dictionaries, metadata.dictionaries.as_deref()) { - (None, Some(blocks)) => { - let new_dictionaries: hashbrown::HashMap, ahash::RandomState> = - read_dictionaries( - reader, - &metadata.schema, - &metadata.ipc_schema, - blocks, - &mut Default::default(), - ) - .await?; - *dictionaries = Some(new_dictionaries); - }, - (None, None) => { - *dictionaries = Some(Default::default()); - }, - _ => {}, - }; - Ok(()) -} diff --git a/crates/polars-arrow/src/io/ipc/read/mod.rs b/crates/polars-arrow/src/io/ipc/read/mod.rs index 74d9a93a9309..68e806ca8946 100644 --- a/crates/polars-arrow/src/io/ipc/read/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/mod.rs @@ -16,22 +16,13 @@ mod reader; mod schema; mod stream; -pub use error::OutOfSpecKind; -pub use file::get_row_count; - -#[cfg(feature = "io_ipc_read_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] -pub mod stream_async; - -#[cfg(feature = "io_ipc_read_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_read_async")))] -pub mod file_async; - pub(crate) use common::first_dict_field; #[cfg(feature = "io_flight")] pub(crate) use common::{read_dictionary, read_record_batch}; +pub use error::OutOfSpecKind; pub use file::{ - deserialize_footer, read_batch, read_file_dictionaries, read_file_metadata, FileMetadata, + deserialize_footer, get_row_count, read_batch, read_file_dictionaries, read_file_metadata, + FileMetadata, }; use polars_utils::aliases::PlHashMap; pub use reader::FileReader; diff --git a/crates/polars-arrow/src/io/ipc/read/stream_async.rs b/crates/polars-arrow/src/io/ipc/read/stream_async.rs deleted file mode 100644 index ab29550d8a14..000000000000 --- a/crates/polars-arrow/src/io/ipc/read/stream_async.rs +++ /dev/null @@ -1,238 +0,0 @@ -//! APIs to read Arrow streams asynchronously - -use arrow_format::ipc::planus::ReadAsRoot; -use futures::future::BoxFuture; -use futures::{AsyncRead, AsyncReadExt, FutureExt, Stream}; -use polars_error::*; - -use super::super::CONTINUATION_MARKER; -use super::common::{read_dictionary, read_record_batch}; -use super::schema::deserialize_stream_metadata; -use super::{Dictionaries, OutOfSpecKind, StreamMetadata}; -use crate::array::*; -use crate::record_batch::RecordBatchT; - -/// A (private) state of stream messages -struct ReadState { - pub reader: R, - pub metadata: StreamMetadata, - pub dictionaries: Dictionaries, - /// The internal buffer to read data inside the messages (records and dictionaries) to - pub data_buffer: Vec, - /// The internal buffer to read messages to - pub message_buffer: Vec, -} - -/// The state of an Arrow stream -enum StreamState { - /// The stream does not contain new chunks (and it has not been closed) - Waiting(ReadState), - /// The stream contain a new chunk - Some((ReadState, RecordBatchT>)), -} - -/// Reads the [`StreamMetadata`] of the Arrow stream asynchronously -pub async fn read_stream_metadata_async( - reader: &mut R, -) -> PolarsResult { - // determine metadata length - let mut meta_size: [u8; 4] = [0; 4]; - reader.read_exact(&mut meta_size).await?; - let meta_len = { - // If a continuation marker is encountered, skip over it and read - // the size from the next four bytes. - if meta_size == CONTINUATION_MARKER { - reader.read_exact(&mut meta_size).await?; - } - i32::from_le_bytes(meta_size) - }; - - let meta_len: usize = meta_len.try_into().map_err( - |_| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength), - )?; - - let mut meta_buffer = vec![]; - meta_buffer.try_reserve(meta_len)?; - reader - .take(meta_len as u64) - .read_to_end(&mut meta_buffer) - .await?; - - deserialize_stream_metadata(&meta_buffer) -} - -/// Reads the next item, yielding `None` if the stream has been closed, -/// or a [`StreamState`] otherwise. -async fn maybe_next( - mut state: ReadState, -) -> PolarsResult>> { - let mut scratch = Default::default(); - // determine metadata length - let mut meta_length: [u8; 4] = [0; 4]; - - match state.reader.read_exact(&mut meta_length).await { - Ok(()) => (), - Err(e) => { - return if e.kind() == std::io::ErrorKind::UnexpectedEof { - // Handle EOF without the "0xFFFFFFFF 0x00000000" - // valid according to: - // https://arrow.apache.org/docs/format/Columnar.html#ipc-streaming-format - Ok(Some(StreamState::Waiting(state))) - } else { - Err(PolarsError::from(e)) - }; - }, - } - - let meta_length = { - // If a continuation marker is encountered, skip over it and read - // the size from the next four bytes. - if meta_length == CONTINUATION_MARKER { - state.reader.read_exact(&mut meta_length).await?; - } - i32::from_le_bytes(meta_length) - }; - - let meta_length: usize = meta_length.try_into().map_err( - |_| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::NegativeFooterLength), - )?; - - if meta_length == 0 { - // the stream has ended, mark the reader as finished - return Ok(None); - } - - state.message_buffer.clear(); - state.message_buffer.try_reserve(meta_length)?; - (&mut state.reader) - .take(meta_length as u64) - .read_to_end(&mut state.message_buffer) - .await?; - - let message = arrow_format::ipc::MessageRef::read_as_root(state.message_buffer.as_ref()) - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferMessage(err)))?; - - let header = message - .header() - .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferHeader(err)))? - .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageHeader))?; - - let block_length: usize = message - .body_length() - .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBodyLength(err)))? - .try_into() - .map_err(|_err| polars_err!(oos = OutOfSpecKind::UnexpectedNegativeInteger))?; - - match header { - arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => { - state.data_buffer.clear(); - state.data_buffer.try_reserve(block_length)?; - (&mut state.reader) - .take(block_length as u64) - .read_to_end(&mut state.data_buffer) - .await?; - - let chunk = read_record_batch( - batch, - &state.metadata.schema, - &state.metadata.ipc_schema, - None, - None, - &state.dictionaries, - state.metadata.version, - &mut std::io::Cursor::new(&state.data_buffer), - 0, - state.data_buffer.len() as u64, - &mut scratch, - )?; - - Ok(Some(StreamState::Some((state, chunk)))) - }, - arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { - state.data_buffer.clear(); - state.data_buffer.try_reserve(block_length)?; - (&mut state.reader) - .take(block_length as u64) - .read_to_end(&mut state.data_buffer) - .await?; - - let file_size = state.data_buffer.len() as u64; - - let mut dict_reader = std::io::Cursor::new(&state.data_buffer); - - read_dictionary( - batch, - &state.metadata.schema, - &state.metadata.ipc_schema, - &mut state.dictionaries, - &mut dict_reader, - 0, - file_size, - &mut scratch, - )?; - - // read the next message until we encounter a Chunk> message - Ok(Some(StreamState::Waiting(state))) - }, - _ => polars_bail!(oos = OutOfSpecKind::UnexpectedMessageType), - } -} - -/// A [`Stream`] over an Arrow IPC stream that asynchronously yields [`RecordBatchT`]s. -pub struct AsyncStreamReader<'a, R: AsyncRead + Unpin + Send + 'a> { - metadata: StreamMetadata, - future: Option>>>>, -} - -impl<'a, R: AsyncRead + Unpin + Send + 'a> AsyncStreamReader<'a, R> { - /// Creates a new [`AsyncStreamReader`] - pub fn new(reader: R, metadata: StreamMetadata) -> Self { - let state = ReadState { - reader, - metadata: metadata.clone(), - dictionaries: Default::default(), - data_buffer: Default::default(), - message_buffer: Default::default(), - }; - let future = Some(maybe_next(state).boxed()); - Self { metadata, future } - } - - /// Return the schema of the stream - pub fn metadata(&self) -> &StreamMetadata { - &self.metadata - } -} - -impl<'a, R: AsyncRead + Unpin + Send> Stream for AsyncStreamReader<'a, R> { - type Item = PolarsResult>>; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - use std::pin::Pin; - use std::task::Poll; - let me = Pin::into_inner(self); - - match &mut me.future { - Some(fut) => match fut.as_mut().poll(cx) { - Poll::Ready(Ok(None)) => { - me.future = None; - Poll::Ready(None) - }, - Poll::Ready(Ok(Some(StreamState::Some((state, batch))))) => { - me.future = Some(Box::pin(maybe_next(state))); - Poll::Ready(Some(Ok(batch))) - }, - Poll::Ready(Ok(Some(StreamState::Waiting(_)))) => Poll::Pending, - Poll::Ready(Err(err)) => { - me.future = None; - Poll::Ready(Some(Err(err))) - }, - Poll::Pending => Poll::Pending, - }, - None => Poll::Ready(None), - } - } -} diff --git a/crates/polars-arrow/src/io/ipc/write/common_async.rs b/crates/polars-arrow/src/io/ipc/write/common_async.rs deleted file mode 100644 index daadfcb5e25e..000000000000 --- a/crates/polars-arrow/src/io/ipc/write/common_async.rs +++ /dev/null @@ -1,66 +0,0 @@ -use futures::{AsyncWrite, AsyncWriteExt}; -use polars_error::PolarsResult; - -use super::super::CONTINUATION_MARKER; -use super::common::{pad_to_64, EncodedData}; - -/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written -pub async fn write_message( - mut writer: W, - encoded: EncodedData, -) -> PolarsResult<(usize, usize)> { - let arrow_data_len = encoded.arrow_data.len(); - - let a = 64 - 1; - let buffer = encoded.ipc_message; - let flatbuf_size = buffer.len(); - let prefix_size = 8; // the message length - let aligned_size = (flatbuf_size + prefix_size + a) & !a; - let padding_bytes = aligned_size - flatbuf_size - prefix_size; - - write_continuation(&mut writer, (aligned_size - prefix_size) as i32).await?; - - // write the flatbuf - if flatbuf_size > 0 { - writer.write_all(&buffer).await?; - } - // write padding - writer.write_all(&vec![0; padding_bytes]).await?; - - // write arrow data - let body_len = if arrow_data_len > 0 { - write_body_buffers(writer, &encoded.arrow_data).await? - } else { - 0 - }; - - Ok((aligned_size, body_len)) -} - -/// Write a record batch to the writer, writing the message size before the message -/// if the record batch is being written to a stream -pub async fn write_continuation( - mut writer: W, - total_len: i32, -) -> PolarsResult { - writer.write_all(&CONTINUATION_MARKER).await?; - writer.write_all(&total_len.to_le_bytes()[..]).await?; - Ok(8) -} - -async fn write_body_buffers( - mut writer: W, - data: &[u8], -) -> PolarsResult { - let len = data.len(); - let pad_len = pad_to_64(data.len()); - let total_len = len + pad_len; - - // write body buffer - writer.write_all(data).await?; - if pad_len > 0 { - writer.write_all(&vec![0u8; pad_len][..]).await?; - } - - Ok(total_len) -} diff --git a/crates/polars-arrow/src/io/ipc/write/file_async.rs b/crates/polars-arrow/src/io/ipc/write/file_async.rs deleted file mode 100644 index aaae101785bc..000000000000 --- a/crates/polars-arrow/src/io/ipc/write/file_async.rs +++ /dev/null @@ -1,210 +0,0 @@ -//! Async writer for IPC files. - -use std::task::Poll; - -use arrow_format::ipc::planus::Builder; -use arrow_format::ipc::{Block, Footer, MetadataVersion}; -use futures::future::BoxFuture; -use futures::{AsyncWrite, AsyncWriteExt, FutureExt, Sink}; -use polars_error::{PolarsError, PolarsResult}; - -use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}; -use super::common_async::{write_continuation, write_message}; -use super::schema::serialize_schema; -use super::{default_ipc_fields, schema_to_bytes, Record}; -use crate::datatypes::*; -use crate::io::ipc::{IpcField, ARROW_MAGIC_V2}; - -type WriteOutput = (usize, Option, Vec, Option); - -/// Sink that writes array [`chunks`](crate::record_batch::RecordBatchT) as an IPC file. -/// -/// The file header is automatically written before writing the first chunk, and the file footer is -/// automatically written when the sink is closed. -pub struct FileSink<'a, W: AsyncWrite + Unpin + Send + 'a> { - writer: Option, - task: Option>>>, - options: WriteOptions, - dictionary_tracker: DictionaryTracker, - offset: usize, - fields: Vec, - record_blocks: Vec, - dictionary_blocks: Vec, - schema: ArrowSchema, -} - -impl<'a, W> FileSink<'a, W> -where - W: AsyncWrite + Unpin + Send + 'a, -{ - /// Create a new file writer. - pub fn new( - writer: W, - schema: ArrowSchema, - ipc_fields: Option>, - options: WriteOptions, - ) -> Self { - let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(schema.iter_values())); - let encoded = EncodedData { - ipc_message: schema_to_bytes(&schema, &fields), - arrow_data: vec![], - }; - let task = Some(Self::start(writer, encoded).boxed()); - Self { - writer: None, - task, - options, - fields, - offset: 0, - schema, - dictionary_tracker: DictionaryTracker { - dictionaries: Default::default(), - cannot_replace: true, - }, - record_blocks: vec![], - dictionary_blocks: vec![], - } - } - - async fn start(mut writer: W, encoded: EncodedData) -> PolarsResult> { - writer.write_all(&ARROW_MAGIC_V2[..]).await?; - writer.write_all(&[0, 0]).await?; - let (meta, data) = write_message(&mut writer, encoded).await?; - - Ok((meta + data + 8, None, vec![], Some(writer))) - } - - async fn write( - mut writer: W, - mut offset: usize, - record: EncodedData, - dictionaries: Vec, - ) -> PolarsResult> { - let mut dict_blocks = vec![]; - for dict in dictionaries { - let (meta, data) = write_message(&mut writer, dict).await?; - let block = Block { - offset: offset as i64, - meta_data_length: meta as i32, - body_length: data as i64, - }; - dict_blocks.push(block); - offset += meta + data; - } - let (meta, data) = write_message(&mut writer, record).await?; - let block = Block { - offset: offset as i64, - meta_data_length: meta as i32, - body_length: data as i64, - }; - offset += meta + data; - Ok((offset, Some(block), dict_blocks, Some(writer))) - } - - async fn finish(mut writer: W, footer: Footer) -> PolarsResult> { - write_continuation(&mut writer, 0).await?; - let footer = { - let mut builder = Builder::new(); - builder.finish(&footer, None).to_owned() - }; - writer.write_all(&footer[..]).await?; - writer - .write_all(&(footer.len() as i32).to_le_bytes()) - .await?; - writer.write_all(&ARROW_MAGIC_V2).await?; - writer.close().await?; - - Ok((0, None, vec![], None)) - } - - fn poll_write(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - if let Some(task) = &mut self.task { - match futures::ready!(task.poll_unpin(cx)) { - Ok((offset, record, mut dictionaries, writer)) => { - self.task = None; - self.writer = writer; - self.offset = offset; - if let Some(block) = record { - self.record_blocks.push(block); - } - self.dictionary_blocks.append(&mut dictionaries); - Poll::Ready(Ok(())) - }, - Err(error) => { - self.task = None; - Poll::Ready(Err(error)) - }, - } - } else { - Poll::Ready(Ok(())) - } - } -} - -impl<'a, W> Sink> for FileSink<'a, W> -where - W: AsyncWrite + Unpin + Send + 'a, -{ - type Error = PolarsError; - - fn poll_ready( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.get_mut().poll_write(cx) - } - - fn start_send(self: std::pin::Pin<&mut Self>, item: Record<'_>) -> PolarsResult<()> { - let this = self.get_mut(); - - if let Some(writer) = this.writer.take() { - let fields = item.fields().unwrap_or_else(|| &this.fields[..]); - - let (dictionaries, record) = encode_chunk( - item.columns(), - fields, - &mut this.dictionary_tracker, - &this.options, - )?; - - this.task = Some(Self::write(writer, this.offset, record, dictionaries).boxed()); - Ok(()) - } else { - let io_err = std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "writer is closed"); - Err(PolarsError::from(io_err)) - } - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.get_mut().poll_write(cx) - } - - fn poll_close( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let this = self.get_mut(); - match futures::ready!(this.poll_write(cx)) { - Ok(()) => { - if let Some(writer) = this.writer.take() { - let schema = serialize_schema(&this.schema, &this.fields); - let footer = Footer { - version: MetadataVersion::V5, - schema: Some(Box::new(schema)), - dictionaries: Some(std::mem::take(&mut this.dictionary_blocks)), - record_batches: Some(std::mem::take(&mut this.record_blocks)), - custom_metadata: None, - }; - this.task = Some(Self::finish(writer, footer).boxed()); - this.poll_write(cx) - } else { - Poll::Ready(Ok(())) - } - }, - Err(error) => Poll::Ready(Err(error)), - } - } -} diff --git a/crates/polars-arrow/src/io/ipc/write/mod.rs b/crates/polars-arrow/src/io/ipc/write/mod.rs index d8afc1571721..99f6fcc3f355 100644 --- a/crates/polars-arrow/src/io/ipc/write/mod.rs +++ b/crates/polars-arrow/src/io/ipc/write/mod.rs @@ -14,16 +14,6 @@ pub use writer::FileWriter; pub(crate) mod common_sync; -#[cfg(feature = "io_ipc_write_async")] -mod common_async; -#[cfg(feature = "io_ipc_write_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] -pub mod stream_async; - -#[cfg(feature = "io_ipc_write_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_write_async")))] -pub mod file_async; - use super::IpcField; use crate::datatypes::{ArrowDataType, Field}; diff --git a/crates/polars-arrow/src/io/ipc/write/stream_async.rs b/crates/polars-arrow/src/io/ipc/write/stream_async.rs deleted file mode 100644 index 3718d6f82b29..000000000000 --- a/crates/polars-arrow/src/io/ipc/write/stream_async.rs +++ /dev/null @@ -1,158 +0,0 @@ -//! `async` writing of arrow streams - -use std::pin::Pin; -use std::task::Poll; - -use futures::future::BoxFuture; -use futures::{AsyncWrite, AsyncWriteExt, FutureExt, Sink}; -use polars_error::{PolarsError, PolarsResult}; - -use super::super::IpcField; -pub use super::common::WriteOptions; -use super::common::{encode_chunk, DictionaryTracker, EncodedData}; -use super::common_async::{write_continuation, write_message}; -use super::{default_ipc_fields, schema_to_bytes, Record}; -use crate::datatypes::*; - -/// A sink that writes array [`chunks`](crate::record_batch::RecordBatchT) as an IPC stream. -/// -/// The stream header is automatically written before writing the first chunk. -pub struct StreamSink<'a, W: AsyncWrite + Unpin + Send + 'a> { - writer: Option, - task: Option>>>, - options: WriteOptions, - dictionary_tracker: DictionaryTracker, - fields: Vec, -} - -impl<'a, W> StreamSink<'a, W> -where - W: AsyncWrite + Unpin + Send + 'a, -{ - /// Create a new [`StreamSink`]. - pub fn new( - writer: W, - schema: &ArrowSchema, - ipc_fields: Option>, - write_options: WriteOptions, - ) -> Self { - let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(schema.iter_values())); - let task = Some(Self::start(writer, schema, &fields[..])); - Self { - writer: None, - task, - fields, - dictionary_tracker: DictionaryTracker { - dictionaries: Default::default(), - cannot_replace: false, - }, - options: write_options, - } - } - - fn start( - mut writer: W, - schema: &ArrowSchema, - ipc_fields: &[IpcField], - ) -> BoxFuture<'a, PolarsResult>> { - let message = EncodedData { - ipc_message: schema_to_bytes(schema, ipc_fields), - arrow_data: vec![], - }; - async move { - write_message(&mut writer, message).await?; - Ok(Some(writer)) - } - .boxed() - } - - fn write(&mut self, record: Record<'_>) -> PolarsResult<()> { - let fields = record.fields().unwrap_or(&self.fields[..]); - let (dictionaries, message) = encode_chunk( - record.columns(), - fields, - &mut self.dictionary_tracker, - &self.options, - )?; - - if let Some(mut writer) = self.writer.take() { - self.task = Some( - async move { - for d in dictionaries { - write_message(&mut writer, d).await?; - } - write_message(&mut writer, message).await?; - Ok(Some(writer)) - } - .boxed(), - ); - Ok(()) - } else { - let io_err = std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "writer closed".to_string(), - ); - Err(PolarsError::from(io_err)) - } - } - - fn poll_complete(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { - if let Some(task) = &mut self.task { - match futures::ready!(task.poll_unpin(cx)) { - Ok(writer) => { - self.writer = writer; - self.task = None; - Poll::Ready(Ok(())) - }, - Err(error) => { - self.task = None; - Poll::Ready(Err(error)) - }, - } - } else { - Poll::Ready(Ok(())) - } - } -} - -impl<'a, W> Sink> for StreamSink<'a, W> -where - W: AsyncWrite + Unpin + Send, -{ - type Error = PolarsError; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - self.get_mut().poll_complete(cx) - } - - fn start_send(self: Pin<&mut Self>, item: Record<'_>) -> PolarsResult<()> { - self.get_mut().write(item) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - self.get_mut().poll_complete(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { - let this = self.get_mut(); - match this.poll_complete(cx) { - Poll::Ready(Ok(())) => { - if let Some(mut writer) = this.writer.take() { - this.task = Some( - async move { - write_continuation(&mut writer, 0).await?; - writer.flush().await?; - writer.close().await?; - Ok(None) - } - .boxed(), - ); - this.poll_complete(cx) - } else { - Poll::Ready(Ok(())) - } - }, - res => res, - } - } -} diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index ca3d313e08ae..c3ed9b93bd5c 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -108,7 +108,6 @@ async = [ "futures", "tokio", "tokio-util", - "arrow/io_ipc_write_async", "polars-error/regex", "polars-parquet?/async", ] diff --git a/crates/polars-io/src/ipc/mod.rs b/crates/polars-io/src/ipc/mod.rs index d78362f5555f..1e341de98c56 100644 --- a/crates/polars-io/src/ipc/mod.rs +++ b/crates/polars-io/src/ipc/mod.rs @@ -7,9 +7,6 @@ mod ipc_stream; #[cfg(feature = "ipc")] mod mmap; mod write; -#[cfg(all(feature = "async", feature = "ipc"))] -mod write_async; - #[cfg(feature = "ipc")] pub use ipc_file::{IpcReader, IpcScanOptions}; #[cfg(feature = "cloud")] diff --git a/crates/polars-io/src/ipc/write_async.rs b/crates/polars-io/src/ipc/write_async.rs deleted file mode 100644 index 5ed459a715d2..000000000000 --- a/crates/polars-io/src/ipc/write_async.rs +++ /dev/null @@ -1,59 +0,0 @@ -use arrow::io::ipc::write::file_async::FileSink; -use arrow::io::ipc::write::WriteOptions; -use futures::{AsyncWrite, SinkExt}; -use polars_core::prelude::*; - -use crate::ipc::IpcWriter; - -impl IpcWriter { - pub fn new_async(writer: W) -> Self { - IpcWriter { - writer, - compression: None, - compat_level: CompatLevel::oldest(), - } - } - - pub fn batched_async(self, schema: &Schema) -> PolarsResult> { - let writer = FileSink::new( - self.writer, - schema.to_arrow(CompatLevel::oldest()), - None, - WriteOptions { - compression: self.compression.map(|c| c.into()), - }, - ); - - Ok(BatchedWriterAsync { writer }) - } -} - -pub struct BatchedWriterAsync<'a, W> -where - W: AsyncWrite + Unpin + Send + 'a, -{ - writer: FileSink<'a, W>, -} - -impl<'a, W> BatchedWriterAsync<'a, W> -where - W: AsyncWrite + Unpin + Send + 'a, -{ - /// Write a batch to the parquet writer. - /// - /// # Panics - /// The caller must ensure the chunks in the given [`DataFrame`] are aligned. - pub async fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> { - let iter = df.iter_chunks(CompatLevel::oldest(), true); - for batch in iter { - self.writer.feed(batch.into()).await?; - } - Ok(()) - } - - /// Writes the footer of the IPC file. - pub async fn finish(&mut self) -> PolarsResult<()> { - self.writer.close().await?; - Ok(()) - } -}