Skip to content

Commit

Permalink
feat: Add CSV sink to new streaming engine
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed Oct 25, 2024
1 parent 425e251 commit 0e8bcad
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 0 deletions.
116 changes: 116 additions & 0 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use std::fs::{File, OpenOptions};
use std::path::Path;

use polars_core::schema::SchemaRef;
use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_io::csv::write::BatchedWriter;
use polars_io::prelude::{CsvWriter, CsvWriterOptions, SerializeOptions};
use polars_io::SerWriter;

use crate::nodes::{ComputeNode, JoinHandle, PortState, TaskPriority, TaskScope};
use crate::pipe::{RecvPort, SendPort};

pub struct CsvSinkNode {
is_finished: bool,
writer: BatchedWriter<File>,
}

impl CsvSinkNode {
pub fn new(
input_schema: SchemaRef,
path: &Path,
write_options: &CsvWriterOptions,
) -> PolarsResult<Self> {
let file = OpenOptions::new().write(true).open(path)?;

let CsvWriterOptions {
include_bom,
include_header,
batch_size,
maintain_order: _, // @TODO
serialize_options,
} = write_options;

let SerializeOptions {
date_format,
time_format,
datetime_format,
float_scientific,
float_precision,
separator,
quote_char,
null,
line_terminator,
quote_style,
} = serialize_options;

let writer = CsvWriter::new(file)
.include_bom(*include_bom)
.include_header(*include_header)
.with_batch_size(*batch_size)
.with_date_format(date_format.clone())
.with_time_format(time_format.clone())
.with_datetime_format(datetime_format.clone())
.with_float_scientific(*float_scientific)
.with_float_precision(*float_precision)
.with_separator(*separator)
.with_quote_char(*quote_char)
.with_null_value(null.clone())
.with_line_terminator(line_terminator.clone())
.with_quote_style(*quote_style)
.batched(&input_schema)?;

Ok(Self {
is_finished: false,
writer,
})
}
}

impl ComputeNode for CsvSinkNode {
fn name(&self) -> &str {
"csv_sink"
}

fn update_state(&mut self, recv: &mut [PortState], send: &mut [PortState]) -> PolarsResult<()> {
assert!(send.is_empty());
assert!(recv.len() == 1);

if recv[0] == PortState::Done && !self.is_finished {
// @NOTE: This function can be called afterwards multiple times. So make sure to only
// finish the writer once.
self.is_finished = true;
self.writer.finish()?;
}

// We are always ready to receive, unless the sender is done, then we're
// also done.
if recv[0] != PortState::Done {
recv[0] = PortState::Ready;
}

Ok(())
}

fn spawn<'env, 's>(
&'env mut self,
scope: &'s TaskScope<'s, 'env>,
recv: &mut [Option<RecvPort<'_>>],
send: &mut [Option<SendPort<'_>>],
_state: &'s ExecutionState,
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
) {
assert!(send.is_empty());
assert!(recv.len() == 1);
let mut receiver = recv[0].take().unwrap().serial();

join_handles.push(scope.spawn_task(TaskPriority::High, async move {
while let Ok(morsel) = receiver.recv().await {
self.writer.write_batch(&morsel.into_df())?;
}

Ok(())
}));
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/nodes/io_sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod csv;
pub mod ipc;
8 changes: 8 additions & 0 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ pub fn lower_ir(
input: phys_input,
}
},
FileType::Csv(_) => {
let phys_input = lower_ir!(*input)?;
PhysNodeKind::FileSink {
path,
file_type,
input: phys_input,
}
},
_ => todo!(),
}
},
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-stream/src/physical_plan/to_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ fn to_graph_rec<'a>(
nodes::io_sinks::ipc::IpcSinkNode::new(input_schema, path, ipc_writer_options)?,
[input_key],
),
FileType::Csv(csv_writer_options) => ctx.graph.add_node(
nodes::io_sinks::csv::CsvSinkNode::new(input_schema, path, csv_writer_options)?,
[input_key],
),
_ => todo!(),
}
},
Expand Down

0 comments on commit 0e8bcad

Please sign in to comment.