From 0e8bcad1073d7709c703e177c8118c51de1909c1 Mon Sep 17 00:00:00 2001 From: coastalwhite Date: Fri, 25 Oct 2024 09:40:18 +0200 Subject: [PATCH] feat: Add CSV sink to new streaming engine --- .../polars-stream/src/nodes/io_sinks/csv.rs | 116 ++++++++++++++++++ .../polars-stream/src/nodes/io_sinks/mod.rs | 1 + .../src/physical_plan/lower_ir.rs | 8 ++ .../src/physical_plan/to_graph.rs | 4 + 4 files changed, 129 insertions(+) create mode 100644 crates/polars-stream/src/nodes/io_sinks/csv.rs diff --git a/crates/polars-stream/src/nodes/io_sinks/csv.rs b/crates/polars-stream/src/nodes/io_sinks/csv.rs new file mode 100644 index 000000000000..3aa499f96d28 --- /dev/null +++ b/crates/polars-stream/src/nodes/io_sinks/csv.rs @@ -0,0 +1,116 @@ +use std::fs::{File, OpenOptions}; +use std::path::Path; + +use polars_core::schema::SchemaRef; +use polars_error::PolarsResult; +use polars_expr::state::ExecutionState; +use polars_io::csv::write::BatchedWriter; +use polars_io::prelude::{CsvWriter, CsvWriterOptions, SerializeOptions}; +use polars_io::SerWriter; + +use crate::nodes::{ComputeNode, JoinHandle, PortState, TaskPriority, TaskScope}; +use crate::pipe::{RecvPort, SendPort}; + +pub struct CsvSinkNode { + is_finished: bool, + writer: BatchedWriter, +} + +impl CsvSinkNode { + pub fn new( + input_schema: SchemaRef, + path: &Path, + write_options: &CsvWriterOptions, + ) -> PolarsResult { + let file = OpenOptions::new().write(true).open(path)?; + + let CsvWriterOptions { + include_bom, + include_header, + batch_size, + maintain_order: _, // @TODO + serialize_options, + } = write_options; + + let SerializeOptions { + date_format, + time_format, + datetime_format, + float_scientific, + float_precision, + separator, + quote_char, + null, + line_terminator, + quote_style, + } = serialize_options; + + let writer = CsvWriter::new(file) + .include_bom(*include_bom) + .include_header(*include_header) + .with_batch_size(*batch_size) + .with_date_format(date_format.clone()) + .with_time_format(time_format.clone()) + .with_datetime_format(datetime_format.clone()) + .with_float_scientific(*float_scientific) + .with_float_precision(*float_precision) + .with_separator(*separator) + .with_quote_char(*quote_char) + .with_null_value(null.clone()) + .with_line_terminator(line_terminator.clone()) + .with_quote_style(*quote_style) + .batched(&input_schema)?; + + Ok(Self { + is_finished: false, + writer, + }) + } +} + +impl ComputeNode for CsvSinkNode { + fn name(&self) -> &str { + "csv_sink" + } + + fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> { + assert!(send.is_empty()); + assert!(recv.len() == 1); + + if recv[0] == PortState::Done && !self.is_finished { + // @NOTE: This function can be called afterwards multiple times. So make sure to only + // finish the writer once. + self.is_finished = true; + self.writer.finish()?; + } + + // We are always ready to receive, unless the sender is done, then we're + // also done. + if recv[0] != PortState::Done { + recv[0] = PortState::Ready; + } + + Ok(()) + } + + fn spawn<'env, 's>( + &'env mut self, + scope: &'s TaskScope<'s, 'env>, + recv: &mut [Option>], + send: &mut [Option>], + _state: &'s ExecutionState, + join_handles: &mut Vec>>, + ) { + assert!(send.is_empty()); + assert!(recv.len() == 1); + let mut receiver = recv[0].take().unwrap().serial(); + + join_handles.push(scope.spawn_task(TaskPriority::High, async move { + while let Ok(morsel) = receiver.recv().await { + self.writer.write_batch(&morsel.into_df())?; + } + + Ok(()) + })); + } +} diff --git a/crates/polars-stream/src/nodes/io_sinks/mod.rs b/crates/polars-stream/src/nodes/io_sinks/mod.rs index ce14ad3b0f7a..90ce4c976c12 100644 --- a/crates/polars-stream/src/nodes/io_sinks/mod.rs +++ b/crates/polars-stream/src/nodes/io_sinks/mod.rs @@ -1 +1,2 @@ +pub mod csv; pub mod ipc; diff --git a/crates/polars-stream/src/physical_plan/lower_ir.rs b/crates/polars-stream/src/physical_plan/lower_ir.rs index 485bbf03a7fe..c9ef6421b5c7 100644 --- a/crates/polars-stream/src/physical_plan/lower_ir.rs +++ b/crates/polars-stream/src/physical_plan/lower_ir.rs @@ -220,6 +220,14 @@ pub fn lower_ir( input: phys_input, } }, + FileType::Csv(_) => { + let phys_input = lower_ir!(*input)?; + PhysNodeKind::FileSink { + path, + file_type, + input: phys_input, + } + }, _ => todo!(), } }, diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index d9253e48dfa5..473f99b68d0c 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -217,6 +217,10 @@ fn to_graph_rec<'a>( nodes::io_sinks::ipc::IpcSinkNode::new(input_schema, path, ipc_writer_options)?, [input_key], ), + FileType::Csv(csv_writer_options) => ctx.graph.add_node( + nodes::io_sinks::csv::CsvSinkNode::new(input_schema, path, csv_writer_options)?, + [input_key], + ), _ => todo!(), } },