Skip to content

Commit

Permalink
Merge branch 'main' into profile-plot
Browse files Browse the repository at this point in the history
  • Loading branch information
etiennebacher authored Oct 19, 2023
2 parents 87e313d + f8d6ce4 commit e95c274
Show file tree
Hide file tree
Showing 7 changed files with 840 additions and 0 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
encoded as `NULL` to aid conversion to polars binary Series. Support back and forth conversion
from polars binary literal and Series to R raw (#417).
- New method `$write_csv()` for `DataFrame` (#414).
- New method `$sink_csv()` for `LazyFrame` (#432).
- New method `$dt$time()` to extract the time from a `datetime` variable (#428).

# polars 0.8.1
Expand Down
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,8 @@ LazyFrame$sink_parquet <- function(path, compression_method, compression_level,

LazyFrame$sink_ipc <- function(path, compression_method, maintain_order) .Call(wrap__LazyFrame__sink_ipc, self, path, compression_method, maintain_order)

LazyFrame$sink_csv <- function(path, has_header, separator, line_terminator, quote, batch_size, datetime_format, date_format, time_format, float_precision, null_values, quote_style, maintain_order) .Call(wrap__LazyFrame__sink_csv, self, path, has_header, separator, line_terminator, quote, batch_size, datetime_format, date_format, time_format, float_precision, null_values, quote_style, maintain_order)

LazyFrame$first <- function() .Call(wrap__LazyFrame__first, self)

LazyFrame$last <- function() .Call(wrap__LazyFrame__last, self)
Expand Down
86 changes: 86 additions & 0 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,92 @@ LazyFrame_sink_ipc = function(
}


#' @title Stream the output of a query to a CSV file
#' @description
#' This writes the output of a query directly to a CSV file without collecting
#' it in the R session first. This is useful if the output of the query is still
#' larger than RAM as it would crash the R session if it was collected into R.
#'
#' @inheritParams DataFrame_write_csv
#' @inheritParams LazyFrame_collect
#' @inheritParams DataFrame_unique
#'
#' @rdname IO_sink_csv
#'
#' @examples
#' # sink table 'mtcars' from mem to CSV
#' tmpf = tempfile()
#' pl$LazyFrame(mtcars)$sink_csv(tmpf)
#'
#' # stream a query end-to-end
#' tmpf2 = tempfile()
#' pl$scan_csv(tmpf)$select(pl$col("cyl") * 2)$sink_csv(tmpf2)
#'
#' # load parquet directly into a DataFrame / memory
#' pl$scan_csv(tmpf2)$collect()
LazyFrame_sink_csv = function(
path,
has_header = TRUE,
separator = ",",
line_terminator = "\n",
quote = '"',
batch_size = 1024,
datetime_format = NULL,
date_format = NULL,
time_format = NULL,
float_precision = NULL,
null_values = "",
quote_style = "necessary",
maintain_order = TRUE,
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}

lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim = FALSE,
comm_subexpr_elim = FALSE,
streaming = FALSE
) |> unwrap("in $sink_csv()")
}

lf |>
.pr$LazyFrame$sink_csv(
path,
has_header,
separator,
line_terminator,
quote,
batch_size,
datetime_format,
date_format,
time_format,
float_precision,
null_values,
quote_style,
maintain_order
) |>
unwrap("in $sink_csv()") |>
invisible()
}


#' @title Limit a LazyFrame
#' @inherit DataFrame_limit description params details
#' @return A `LazyFrame`
Expand Down
116 changes: 116 additions & 0 deletions man/IO_sink_csv.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 58 additions & 0 deletions src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use polars::frame::hash_join::JoinType;
use polars::prelude as pl;
use polars::prelude::AsOfOptions;

use polars::io::csv::SerializeOptions;
use polars_lazy::prelude::CsvWriterOptions;

#[allow(unused_imports)]
use std::result::Result;

Expand Down Expand Up @@ -114,6 +117,61 @@ impl LazyFrame {
.map_err(polars_to_rpolars_err)
}

fn sink_csv(
&self,
path: Robj,
has_header: Robj,
separator: Robj,
line_terminator: Robj,
quote: Robj,
batch_size: Robj,
datetime_format: Robj,
date_format: Robj,
time_format: Robj,
float_precision: Robj,
null_value: Robj,
quote_style: Robj,
maintain_order: Robj,
) -> RResult<()> {
// using robj_to!() directly in SerializeOptions doesn't work
let date_format = robj_to!(Option, String, date_format)?;
let time_format = robj_to!(Option, String, time_format)?;
let datetime_format = robj_to!(Option, String, datetime_format)?;
let float_precision = robj_to!(Option, usize, float_precision)?;
let separator = robj_to!(Utf8Byte, separator)?;
let quote = robj_to!(Utf8Byte, quote)?;
let null_value = robj_to!(String, null_value)?;
let line_terminator = robj_to!(String, line_terminator)?;
let quote_style = robj_to!(QuoteStyle, quote_style)?;
let has_header = robj_to!(bool, has_header)?;
let maintain_order = robj_to!(bool, maintain_order)?;
let batch_size = robj_to!(usize, batch_size)?;

let serialize_options = SerializeOptions {
date_format,
time_format,
datetime_format,
float_precision,
delimiter: separator,
quote,
null: null_value,
line_terminator,
quote_style,
};

let options = CsvWriterOptions {
has_header,
maintain_order,
batch_size,
serialize_options,
};

self.0
.clone()
.sink_csv(robj_to!(String, path)?.into(), options)
.map_err(polars_to_rpolars_err)
}

fn first(&self) -> Self {
self.0.clone().first().into()
}
Expand Down
Loading

0 comments on commit e95c274

Please sign in to comment.