Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
Merge branch 'main' into str-head-tail

# Conflicts:
#	NEWS.md
  • Loading branch information
etiennebacher committed May 5, 2024
2 parents 5c4ac1d + 53cc0e7 commit c0f48e2
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 116 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ URL: https://pola-rs.github.io/r-polars/,
https://github.com/pola-rs/r-polars,
https://rpolars.r-universe.dev/polars
Suggests:
arrow,
arrow (>= 15.0.1),
bench,
bit64,
callr,
Expand All @@ -32,7 +32,7 @@ Suggests:
jsonlite,
knitr,
lubridate,
nanoarrow,
nanoarrow (>= 0.4.0),
nycflights13,
patrick,
pillar,
Expand Down
4 changes: 4 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
- `pl$read_ipc()` can read a raw vector of Apache Arrow IPC file (#1072).
- New method `<DataFrame>$to_raw_ipc()` to serialize a DataFrame to a raw vector
of Apache Arrow IPC file format (#1072).
- New method `<LazyFrame>$serialize()` to serialize a LazyFrame to a character
vector of JSON representation (#1073).
- New function `pl$deserialize_lf()` to deserialize a LazyFrame from a character
vector of JSON representation (#1073).
- New methods `$str$head()` and `$str$tail()` (#1074).

## Polars R Package 0.16.3
Expand Down
8 changes: 5 additions & 3 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_st

arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str)

export_df_to_arrow_stream <- function(robj_df, robj_str) .Call(wrap__export_df_to_arrow_stream, robj_df, robj_str)

mem_address <- function(robj) .Call(wrap__mem_address, robj)

clone_robj <- function(robj) .Call(wrap__clone_robj, robj)
Expand Down Expand Up @@ -204,7 +202,7 @@ RPolarsDataFrame$unnest <- function(names) .Call(wrap__RPolarsDataFrame__unnest,

RPolarsDataFrame$partition_by <- function(by, maintain_order, include_key) .Call(wrap__RPolarsDataFrame__partition_by, self, by, maintain_order, include_key)

RPolarsDataFrame$export_stream <- function(stream_ptr) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr))
RPolarsDataFrame$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr, pl_flavor))

RPolarsDataFrame$from_arrow_record_batches <- function(rbr) .Call(wrap__RPolarsDataFrame__from_arrow_record_batches, rbr)

Expand Down Expand Up @@ -1152,6 +1150,10 @@ RPolarsLazyFrame$collect <- function() .Call(wrap__RPolarsLazyFrame__collect, se

RPolarsLazyFrame$collect_in_background <- function() .Call(wrap__RPolarsLazyFrame__collect_in_background, self)

RPolarsLazyFrame$serialize <- function() .Call(wrap__RPolarsLazyFrame__serialize, self)

RPolarsLazyFrame$deserialize <- function(json) .Call(wrap__RPolarsLazyFrame__deserialize, json)

RPolarsLazyFrame$sink_parquet <- function(path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_parquet, self, path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order)

RPolarsLazyFrame$sink_ipc <- function(path, compression, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_ipc, self, path, compression, maintain_order)
Expand Down
37 changes: 37 additions & 0 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,43 @@ LazyFrame_describe_optimized_plan = function() {
#' @rdname LazyFrame_describe_plan
LazyFrame_describe_plan = use_extendr_wrapper


#' Serialize the logical plan of this LazyFrame to a file or string in JSON format
#'
#' Note that not all LazyFrames can be serialized. For example, LazyFrames that
#' contain UDFs such as [`$map_elements()`][Expr_map_elements] cannot be serialized.
#'
#' @return A character of the JSON representation of the logical plan
#' @seealso
#' - [`pl$deserialize_lf()`][pl_deserialize_lf]
#' @examples
#' lf = pl$LazyFrame(a = 1:3)$sum()
#' json = lf$serialize()
#' json
#'
#' # The logical plan can later be deserialized back into a LazyFrame.
#' pl$deserialize_lf(json)$collect()
LazyFrame_serialize = function() {
.pr$LazyFrame$serialize(self) |>
unwrap("in $serialize():")
}


#' Read a logical plan from a JSON file to construct a LazyFrame
#' @inherit pl_LazyFrame return
#' @param json A character of the JSON representation of the logical plan.
#' @seealso
#' - [`<LazyFrame>$serialize()`][LazyFrame_serialize]
#' @examples
#' lf = pl$LazyFrame(a = 1:3)$sum()
#' json = lf$serialize()
#' pl$deserialize_lf(json)$collect()
pl_deserialize_lf = function(json) {
.pr$LazyFrame$deserialize(json) |>
unwrap("in pl$deserialize_lf():")
}


#' @title Select and modify columns of a LazyFrame
#' @inherit DataFrame_select description params
#' @return A LazyFrame
Expand Down
15 changes: 11 additions & 4 deletions R/pkg-arrow.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#' Create a arrow Table from a Polars object
#'
#' @inheritParams DataFrame_write_ipc
#' @param x [A Polars DataFrame][DataFrame_class]
#' @param ... Ignored
#' @rdname S3_as_arrow_table
Expand All @@ -9,8 +10,9 @@
#' pl_df = as_polars_df(mtcars)
#' as_arrow_table(pl_df)
# exported in zzz.R
as_arrow_table.RPolarsDataFrame = function(x, ...) {
reader = as_record_batch_reader.RPolarsDataFrame(x)
as_arrow_table.RPolarsDataFrame = function(x, ..., future = FALSE) {
reader = result(as_record_batch_reader.RPolarsDataFrame(x, future = future)) |>
unwrap("in as_arrow_table(<RPolarsDataFrame>):")
reader$read_table()
}

Expand All @@ -25,12 +27,17 @@ as_arrow_table.RPolarsDataFrame = function(x, ...) {
#' pl_df = as_polars_df(mtcars)
#' as_record_batch_reader(pl_df)
# exported in zzz.R
as_record_batch_reader.RPolarsDataFrame = function(x, ...) {
as_record_batch_reader.RPolarsDataFrame = function(x, ..., future = FALSE) {
if (!is_scalar_bool(future)) {
Err_plain("`future` argument must be `TRUE` or `FALSE`") |>
unwrap("in as_record_batch_reader(<RPolarsDataFrame>):")
}

# https://github.com/apache/arrow/issues/39793
allocate_arrow_array_stream = utils::getFromNamespace("allocate_arrow_array_stream", "arrow")
external_pointer_addr_character = utils::getFromNamespace("external_pointer_addr_character", "arrow")

stream = allocate_arrow_array_stream()
.pr$DataFrame$export_stream(x, external_pointer_addr_character(stream))
.pr$DataFrame$export_stream(x, external_pointer_addr_character(stream), future)
arrow::RecordBatchReader$import_from_c(stream)
}
26 changes: 20 additions & 6 deletions R/pkg-nanoarrow.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#' Create a nanoarrow_array_stream from a Polars object
#'
#' @inheritParams as_arrow_table.RPolarsDataFrame
#' @inheritParams DataFrame_write_ipc
#' @param schema must stay at default value NULL
#' @rdname S3_as_nanoarrow_array_stream
#' @examples
Expand All @@ -10,25 +11,38 @@
#' nanoarrow_array_stream = as_nanoarrow_array_stream(pl_df)
#' as.data.frame(nanoarrow_array_stream)
# exported in zzz.R
as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL) {
as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL, future = FALSE) {
uw = \(res) unwrap("in as_nanoarrow_array_stream(<RPolarsDataFrame>):")

# Don't support the schema argument yet
stopifnot(is.null(schema))
if (!is.null(schema)) {
Err_plain("The `schema` argument is not supported yet") |>
uw()
}

if (!is_scalar_bool(future)) {
Err_plain("`future` argument must be `TRUE` or `FALSE`") |>
uw()
}

stream = nanoarrow::nanoarrow_allocate_array_stream()
.pr$DataFrame$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream))
.pr$DataFrame$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream), future)
stream
}


#' Infer nanoarrow schema from a Polars object
#'
#' @inheritParams as_arrow_table.RPolarsDataFrame
#' @inheritParams as_nanoarrow_array_stream.RPolarsDataFrame
#' @rdname S3_infer_nanoarrow_schema
#' @examples
#' library(nanoarrow)
#' pl_df = as_polars_df(mtcars)
#'
#' infer_nanoarrow_schema(pl_df)
# exported in zzz.R
infer_nanoarrow_schema.RPolarsDataFrame = function(x, ...) {
as_nanoarrow_array_stream.RPolarsDataFrame(x)$get_schema()
infer_nanoarrow_schema.RPolarsDataFrame = function(x, ..., future = FALSE) {
as_nanoarrow_array_stream.RPolarsDataFrame(x, future = future)$get_schema() |>
result() |>
unwrap("in infer_nanoarrow_schema(<RPolarsDataFrame>):")
}
28 changes: 28 additions & 0 deletions man/LazyFrame_serialize.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_as_arrow_table.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_as_nanoarrow_array_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_as_record_batch_reader.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion man/S3_infer_nanoarrow_schema.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions man/pl_deserialize_lf.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/pl_pl.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions src/rust/src/arrow_interop/to_rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,3 @@ fn consume_arrow_stream_to_series(boxed_stream: Box<ffi::ArrowArrayStream>) -> R
}
Ok(s)
}

pub unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> {
let stream_ptr =
crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream;
let schema = df.schema().to_arrow(true);
let data_type = pl::ArrowDataType::Struct(schema.fields);
let field = pl::ArrowField::new("", data_type, false);
let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df));
unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) };
Ok(())
}
12 changes: 12 additions & 0 deletions src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ impl RPolarsLazyFrame {
})
}

fn serialize(&self) -> RResult<String> {
serde_json::to_string(&self.0.logical_plan)
.map_err(|err| RPolarsErr::new().plain(format!("{err:?}")))
}

fn deserialize(json: Robj) -> RResult<Self> {
let json = robj_to!(str, json)?;
let lp = serde_json::from_str::<pl::LogicalPlan>(&json)
.map_err(|err| RPolarsErr::new().plain(format!("{err:?}")))?;
Ok(RPolarsLazyFrame(pl::LazyFrame::from(lp)))
}

#[allow(clippy::too_many_arguments)]
pub fn sink_parquet(
&self,
Expand Down
Loading

0 comments on commit c0f48e2

Please sign in to comment.