From 930bd38aa7315b883c6b195d042e6878e764b09f Mon Sep 17 00:00:00 2001 From: eitsupi <50911393+eitsupi@users.noreply.github.com> Date: Sun, 5 May 2024 12:55:36 +0900 Subject: [PATCH 1/2] feat: serialize/deserialize LazyFrame (#1073) --- NEWS.md | 4 + R/extendr-wrappers.R | 4 + R/lazyframe__lazy.R | 37 +++++++ man/LazyFrame_serialize.Rd | 28 ++++++ man/pl_deserialize_lf.Rd | 27 +++++ man/pl_pl.Rd | 2 +- src/rust/src/lazy/dataframe.rs | 12 +++ tests/testthat/_snaps/after-wrappers.md | 128 ++++++++++++------------ tests/testthat/_snaps/lazy.md | 79 +++++++++++++++ tests/testthat/test-lazy.R | 27 +++++ 10 files changed, 284 insertions(+), 64 deletions(-) create mode 100644 man/LazyFrame_serialize.Rd create mode 100644 man/pl_deserialize_lf.Rd diff --git a/NEWS.md b/NEWS.md index 1b8c38a08..0879f59a4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -7,6 +7,10 @@ - `pl$read_ipc()` can read a raw vector of Apache Arrow IPC file (#1072). - New method `$to_raw_ipc()` to serialize a DataFrame to a raw vector of Apache Arrow IPC file format (#1072). +- New method `$serialize()` to serialize a LazyFrame to a character + vector of JSON representation (#1073). +- New function `pl$deserialize_lf()` to deserialize a LazyFrame from a character + vector of JSON representation (#1073). ## Polars R Package 0.16.3 diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index 2fe094416..f75fad6ae 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -1148,6 +1148,10 @@ RPolarsLazyFrame$collect <- function() .Call(wrap__RPolarsLazyFrame__collect, se RPolarsLazyFrame$collect_in_background <- function() .Call(wrap__RPolarsLazyFrame__collect_in_background, self) +RPolarsLazyFrame$serialize <- function() .Call(wrap__RPolarsLazyFrame__serialize, self) + +RPolarsLazyFrame$deserialize <- function(json) .Call(wrap__RPolarsLazyFrame__deserialize, json) + RPolarsLazyFrame$sink_parquet <- function(path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_parquet, self, path, compression_method, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order) RPolarsLazyFrame$sink_ipc <- function(path, compression, maintain_order) .Call(wrap__RPolarsLazyFrame__sink_ipc, self, path, compression, maintain_order) diff --git a/R/lazyframe__lazy.R b/R/lazyframe__lazy.R index 7c00410fd..21faf278e 100644 --- a/R/lazyframe__lazy.R +++ b/R/lazyframe__lazy.R @@ -258,6 +258,43 @@ LazyFrame_describe_optimized_plan = function() { #' @rdname LazyFrame_describe_plan LazyFrame_describe_plan = use_extendr_wrapper + +#' Serialize the logical plan of this LazyFrame to a file or string in JSON format +#' +#' Note that not all LazyFrames can be serialized. For example, LazyFrames that +#' contain UDFs such as [`$map_elements()`][Expr_map_elements] cannot be serialized. +#' +#' @return A character of the JSON representation of the logical plan +#' @seealso +#' - [`pl$deserialize_lf()`][pl_deserialize_lf] +#' @examples +#' lf = pl$LazyFrame(a = 1:3)$sum() +#' json = lf$serialize() +#' json +#' +#' # The logical plan can later be deserialized back into a LazyFrame. +#' pl$deserialize_lf(json)$collect() +LazyFrame_serialize = function() { + .pr$LazyFrame$serialize(self) |> + unwrap("in $serialize():") +} + + +#' Read a logical plan from a JSON file to construct a LazyFrame +#' @inherit pl_LazyFrame return +#' @param json A character of the JSON representation of the logical plan. +#' @seealso +#' - [`$serialize()`][LazyFrame_serialize] +#' @examples +#' lf = pl$LazyFrame(a = 1:3)$sum() +#' json = lf$serialize() +#' pl$deserialize_lf(json)$collect() +pl_deserialize_lf = function(json) { + .pr$LazyFrame$deserialize(json) |> + unwrap("in pl$deserialize_lf():") +} + + #' @title Select and modify columns of a LazyFrame #' @inherit DataFrame_select description params #' @return A LazyFrame diff --git a/man/LazyFrame_serialize.Rd b/man/LazyFrame_serialize.Rd new file mode 100644 index 000000000..b6fb26553 --- /dev/null +++ b/man/LazyFrame_serialize.Rd @@ -0,0 +1,28 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/lazyframe__lazy.R +\name{LazyFrame_serialize} +\alias{LazyFrame_serialize} +\title{Serialize the logical plan of this LazyFrame to a file or string in JSON format} +\usage{ +LazyFrame_serialize() +} +\value{ +A character of the JSON representation of the logical plan +} +\description{ +Note that not all LazyFrames can be serialized. For example, LazyFrames that +contain UDFs such as \code{\link[=Expr_map_elements]{$map_elements()}} cannot be serialized. +} +\examples{ +lf = pl$LazyFrame(a = 1:3)$sum() +json = lf$serialize() +json + +# The logical plan can later be deserialized back into a LazyFrame. +pl$deserialize_lf(json)$collect() +} +\seealso{ +\itemize{ +\item \code{\link[=pl_deserialize_lf]{pl$deserialize_lf()}} +} +} diff --git a/man/pl_deserialize_lf.Rd b/man/pl_deserialize_lf.Rd new file mode 100644 index 000000000..e7bbe4f0f --- /dev/null +++ b/man/pl_deserialize_lf.Rd @@ -0,0 +1,27 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/lazyframe__lazy.R +\name{pl_deserialize_lf} +\alias{pl_deserialize_lf} +\title{Read a logical plan from a JSON file to construct a LazyFrame} +\usage{ +pl_deserialize_lf(json) +} +\arguments{ +\item{json}{A character of the JSON representation of the logical plan.} +} +\value{ +\link[=LazyFrame_class]{LazyFrame} +} +\description{ +Read a logical plan from a JSON file to construct a LazyFrame +} +\examples{ +lf = pl$LazyFrame(a = 1:3)$sum() +json = lf$serialize() +pl$deserialize_lf(json)$collect() +} +\seealso{ +\itemize{ +\item \code{\link[=LazyFrame_serialize]{$serialize()}} +} +} diff --git a/man/pl_pl.Rd b/man/pl_pl.Rd index 8f4582750..f324b4bf5 100644 --- a/man/pl_pl.Rd +++ b/man/pl_pl.Rd @@ -6,7 +6,7 @@ \alias{pl} \title{The complete polars public API.} \format{ -An object of class \code{pl_polars_env} (inherits from \code{environment}) of length 107. +An object of class \code{pl_polars_env} (inherits from \code{environment}) of length 108. } \usage{ pl diff --git a/src/rust/src/lazy/dataframe.rs b/src/rust/src/lazy/dataframe.rs index 17b84b9c3..37c5873dc 100644 --- a/src/rust/src/lazy/dataframe.rs +++ b/src/rust/src/lazy/dataframe.rs @@ -80,6 +80,18 @@ impl RPolarsLazyFrame { }) } + fn serialize(&self) -> RResult { + serde_json::to_string(&self.0.logical_plan) + .map_err(|err| RPolarsErr::new().plain(format!("{err:?}"))) + } + + fn deserialize(json: Robj) -> RResult { + let json = robj_to!(str, json)?; + let lp = serde_json::from_str::(&json) + .map_err(|err| RPolarsErr::new().plain(format!("{err:?}")))?; + Ok(RPolarsLazyFrame(pl::LazyFrame::from(lp))) + } + #[allow(clippy::too_many_arguments)] pub fn sink_parquet( &self, diff --git a/tests/testthat/_snaps/after-wrappers.md b/tests/testthat/_snaps/after-wrappers.md index 6cb0462cb..62de43a96 100644 --- a/tests/testthat/_snaps/after-wrappers.md +++ b/tests/testthat/_snaps/after-wrappers.md @@ -29,34 +29,34 @@ [47] "date" "date_range" [49] "date_ranges" "datetime" [51] "datetime_range" "datetime_ranges" - [53] "disable_string_cache" "dtypes" - [55] "duration" "element" - [57] "enable_string_cache" "first" - [59] "fold" "from_epoch" - [61] "get_global_rpool_cap" "head" - [63] "implode" "int_range" - [65] "int_ranges" "is_schema" - [67] "last" "len" - [69] "lit" "max" - [71] "max_horizontal" "mean" - [73] "mean_horizontal" "median" - [75] "mem_address" "min" - [77] "min_horizontal" "n_unique" - [79] "numeric_dtypes" "raw_list" - [81] "read_csv" "read_ipc" - [83] "read_ndjson" "read_parquet" - [85] "reduce" "rolling_corr" - [87] "rolling_cov" "same_outer_dt" - [89] "scan_csv" "scan_ipc" - [91] "scan_ndjson" "scan_parquet" - [93] "select" "set_global_rpool_cap" - [95] "show_all_public_functions" "show_all_public_methods" - [97] "std" "struct" - [99] "sum" "sum_horizontal" - [101] "tail" "thread_pool_size" - [103] "time" "using_string_cache" - [105] "var" "when" - [107] "with_string_cache" + [53] "deserialize_lf" "disable_string_cache" + [55] "dtypes" "duration" + [57] "element" "enable_string_cache" + [59] "first" "fold" + [61] "from_epoch" "get_global_rpool_cap" + [63] "head" "implode" + [65] "int_range" "int_ranges" + [67] "is_schema" "last" + [69] "len" "lit" + [71] "max" "max_horizontal" + [73] "mean" "mean_horizontal" + [75] "median" "mem_address" + [77] "min" "min_horizontal" + [79] "n_unique" "numeric_dtypes" + [81] "raw_list" "read_csv" + [83] "read_ipc" "read_ndjson" + [85] "read_parquet" "reduce" + [87] "rolling_corr" "rolling_cov" + [89] "same_outer_dt" "scan_csv" + [91] "scan_ipc" "scan_ndjson" + [93] "scan_parquet" "select" + [95] "set_global_rpool_cap" "show_all_public_functions" + [97] "show_all_public_methods" "std" + [99] "struct" "sum" + [101] "sum_horizontal" "tail" + [103] "thread_pool_size" "time" + [105] "using_string_cache" "var" + [107] "when" "with_string_cache" --- @@ -161,17 +161,18 @@ [33] "rename" "reverse" [35] "rolling" "schema" [37] "select" "select_seq" - [39] "set_optimization_toggle" "shift" - [41] "shift_and_fill" "sink_csv" - [43] "sink_ipc" "sink_ndjson" - [45] "sink_parquet" "slice" - [47] "sort" "sql" - [49] "std" "sum" - [51] "tail" "to_dot" - [53] "unique" "unnest" - [55] "var" "width" - [57] "with_columns" "with_columns_seq" - [59] "with_context" "with_row_index" + [39] "serialize" "set_optimization_toggle" + [41] "shift" "shift_and_fill" + [43] "sink_csv" "sink_ipc" + [45] "sink_ndjson" "sink_parquet" + [47] "slice" "sort" + [49] "sql" "std" + [51] "sum" "tail" + [53] "to_dot" "unique" + [55] "unnest" "var" + [57] "width" "with_columns" + [59] "with_columns_seq" "with_context" + [61] "with_row_index" --- @@ -181,30 +182,31 @@ [1] "clone_in_rust" "collect" [3] "collect_in_background" "debug_plan" [5] "describe_optimized_plan" "describe_plan" - [7] "drop" "drop_nulls" - [9] "explode" "fetch" - [11] "fill_nan" "fill_null" - [13] "filter" "first" - [15] "get_optimization_toggle" "group_by" - [17] "group_by_dynamic" "join" - [19] "join_asof" "last" - [21] "max" "mean" - [23] "median" "melt" - [25] "min" "print" - [27] "profile" "quantile" - [29] "rename" "reverse" - [31] "rolling" "schema" - [33] "select" "select_seq" - [35] "set_optimization_toggle" "shift" - [37] "shift_and_fill" "sink_csv" - [39] "sink_ipc" "sink_json" - [41] "sink_parquet" "slice" - [43] "sort_by_exprs" "std" - [45] "sum" "tail" - [47] "to_dot" "unique" - [49] "unnest" "var" - [51] "with_columns" "with_columns_seq" - [53] "with_context" "with_row_index" + [7] "deserialize" "drop" + [9] "drop_nulls" "explode" + [11] "fetch" "fill_nan" + [13] "fill_null" "filter" + [15] "first" "get_optimization_toggle" + [17] "group_by" "group_by_dynamic" + [19] "join" "join_asof" + [21] "last" "max" + [23] "mean" "median" + [25] "melt" "min" + [27] "print" "profile" + [29] "quantile" "rename" + [31] "reverse" "rolling" + [33] "schema" "select" + [35] "select_seq" "serialize" + [37] "set_optimization_toggle" "shift" + [39] "shift_and_fill" "sink_csv" + [41] "sink_ipc" "sink_json" + [43] "sink_parquet" "slice" + [45] "sort_by_exprs" "std" + [47] "sum" "tail" + [49] "to_dot" "unique" + [51] "unnest" "var" + [53] "with_columns" "with_columns_seq" + [55] "with_context" "with_row_index" # public and private methods of each class Expr diff --git a/tests/testthat/_snaps/lazy.md b/tests/testthat/_snaps/lazy.md index 2652ede5d..0f38e1aad 100644 --- a/tests/testthat/_snaps/lazy.md +++ b/tests/testthat/_snaps/lazy.md @@ -10,3 +10,82 @@ FILTER [(col("a")) == (2)] FROM DF ["a", "b"]; PROJECT */2 COLUMNS; SELECTION: "None" +# LazyFrame serialize/deseialize + + Code + jsonlite::prettify(json) + Output + { + "Select": { + "expr": [ + { + "Column": "b" + } + ], + "input": { + "Filter": { + "input": { + "DataFrameScan": { + "df": { + "columns": [ + { + "name": "a", + "datatype": "Int32", + "bit_settings": "", + "values": [ + 1, + 2, + 3 + ] + }, + { + "name": "b", + "datatype": "String", + "bit_settings": "", + "values": [ + "a", + "b", + "c" + ] + } + ] + }, + "schema": { + "inner": { + "a": "Int32", + "b": "String" + } + }, + "output_schema": null, + "projection": null, + "selection": null + } + }, + "predicate": { + "BinaryExpr": { + "left": { + "Column": "a" + }, + "op": "GtEq", + "right": { + "Literal": { + "Float64": 2.0 + } + } + } + } + } + }, + "schema": { + "inner": { + "b": "String" + } + }, + "options": { + "run_parallel": true, + "duplicate_check": true + } + } + } + + diff --git a/tests/testthat/test-lazy.R b/tests/testthat/test-lazy.R index 7e91f22c9..01531d261 100644 --- a/tests/testthat/test-lazy.R +++ b/tests/testthat/test-lazy.R @@ -31,6 +31,33 @@ test_that("create LazyFrame", { ) }) + +test_that("LazyFrame serialize/deseialize", { + skip_if_not_installed("jsonlite") + + df = pl$DataFrame( + a = 1:3, + b = letters[1:3] + ) + + lf = df$lazy()$filter(pl$col("a") >= 2)$select("b") + json = lf$serialize() + + expect_snapshot(jsonlite::prettify(json)) + + expect_true(lf$collect()$equals( + pl$deserialize_lf(json)$collect() + )) + + expect_grepl_error( + df$lazy()$select( + pl$col("a")$map_elements(\(x) -abs(x)) + )$serialize(), + "serialize not supported for this 'opaque' function" + ) +}) + + test_that("LazyFrame, custom schema", { df = pl$LazyFrame( iris, From 53cc0e7af471747348663068998d8bba541e4f95 Mon Sep 17 00:00:00 2001 From: eitsupi <50911393+eitsupi@users.noreply.github.com> Date: Sun, 5 May 2024 16:20:12 +0900 Subject: [PATCH 2/2] feat: allow to export polars internal types via the Arrow C Stream interface of DataFrame (#1075) --- DESCRIPTION | 4 ++-- R/extendr-wrappers.R | 4 +--- R/pkg-arrow.R | 15 +++++++++++---- R/pkg-nanoarrow.R | 26 ++++++++++++++++++++------ man/S3_as_arrow_table.Rd | 7 ++++++- man/S3_as_nanoarrow_array_stream.Rd | 7 ++++++- man/S3_as_record_batch_reader.Rd | 7 ++++++- man/S3_infer_nanoarrow_schema.Rd | 7 ++++++- src/rust/src/arrow_interop/to_rust.rs | 11 ----------- src/rust/src/rdataframe/mod.rs | 14 ++++++++------ src/rust/src/rlib.rs | 12 ------------ tests/testthat/test-pkg-arrow.R | 11 +++++++---- tests/testthat/test-pkg-nanoarrow.R | 8 ++++++++ 13 files changed, 81 insertions(+), 52 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 542490ba7..b49462c77 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -22,7 +22,7 @@ URL: https://pola-rs.github.io/r-polars/, https://github.com/pola-rs/r-polars, https://rpolars.r-universe.dev/polars Suggests: - arrow, + arrow (>= 15.0.1), bench, bit64, callr, @@ -32,7 +32,7 @@ Suggests: jsonlite, knitr, lubridate, - nanoarrow, + nanoarrow (>= 0.4.0), nycflights13, patrick, pillar, diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index f75fad6ae..cd880f567 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -64,8 +64,6 @@ arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_st arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str) -export_df_to_arrow_stream <- function(robj_df, robj_str) .Call(wrap__export_df_to_arrow_stream, robj_df, robj_str) - mem_address <- function(robj) .Call(wrap__mem_address, robj) clone_robj <- function(robj) .Call(wrap__clone_robj, robj) @@ -204,7 +202,7 @@ RPolarsDataFrame$unnest <- function(names) .Call(wrap__RPolarsDataFrame__unnest, RPolarsDataFrame$partition_by <- function(by, maintain_order, include_key) .Call(wrap__RPolarsDataFrame__partition_by, self, by, maintain_order, include_key) -RPolarsDataFrame$export_stream <- function(stream_ptr) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr)) +RPolarsDataFrame$export_stream <- function(stream_ptr, pl_flavor) invisible(.Call(wrap__RPolarsDataFrame__export_stream, self, stream_ptr, pl_flavor)) RPolarsDataFrame$from_arrow_record_batches <- function(rbr) .Call(wrap__RPolarsDataFrame__from_arrow_record_batches, rbr) diff --git a/R/pkg-arrow.R b/R/pkg-arrow.R index 4d5013778..ea9910c64 100644 --- a/R/pkg-arrow.R +++ b/R/pkg-arrow.R @@ -1,5 +1,6 @@ #' Create a arrow Table from a Polars object #' +#' @inheritParams DataFrame_write_ipc #' @param x [A Polars DataFrame][DataFrame_class] #' @param ... Ignored #' @rdname S3_as_arrow_table @@ -9,8 +10,9 @@ #' pl_df = as_polars_df(mtcars) #' as_arrow_table(pl_df) # exported in zzz.R -as_arrow_table.RPolarsDataFrame = function(x, ...) { - reader = as_record_batch_reader.RPolarsDataFrame(x) +as_arrow_table.RPolarsDataFrame = function(x, ..., future = FALSE) { + reader = result(as_record_batch_reader.RPolarsDataFrame(x, future = future)) |> + unwrap("in as_arrow_table():") reader$read_table() } @@ -25,12 +27,17 @@ as_arrow_table.RPolarsDataFrame = function(x, ...) { #' pl_df = as_polars_df(mtcars) #' as_record_batch_reader(pl_df) # exported in zzz.R -as_record_batch_reader.RPolarsDataFrame = function(x, ...) { +as_record_batch_reader.RPolarsDataFrame = function(x, ..., future = FALSE) { + if (!is_scalar_bool(future)) { + Err_plain("`future` argument must be `TRUE` or `FALSE`") |> + unwrap("in as_record_batch_reader():") + } + # https://github.com/apache/arrow/issues/39793 allocate_arrow_array_stream = utils::getFromNamespace("allocate_arrow_array_stream", "arrow") external_pointer_addr_character = utils::getFromNamespace("external_pointer_addr_character", "arrow") stream = allocate_arrow_array_stream() - .pr$DataFrame$export_stream(x, external_pointer_addr_character(stream)) + .pr$DataFrame$export_stream(x, external_pointer_addr_character(stream), future) arrow::RecordBatchReader$import_from_c(stream) } diff --git a/R/pkg-nanoarrow.R b/R/pkg-nanoarrow.R index 947493277..f8b4bc540 100644 --- a/R/pkg-nanoarrow.R +++ b/R/pkg-nanoarrow.R @@ -1,6 +1,7 @@ #' Create a nanoarrow_array_stream from a Polars object #' #' @inheritParams as_arrow_table.RPolarsDataFrame +#' @inheritParams DataFrame_write_ipc #' @param schema must stay at default value NULL #' @rdname S3_as_nanoarrow_array_stream #' @examples @@ -10,18 +11,29 @@ #' nanoarrow_array_stream = as_nanoarrow_array_stream(pl_df) #' as.data.frame(nanoarrow_array_stream) # exported in zzz.R -as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL) { +as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL, future = FALSE) { + uw = \(res) unwrap("in as_nanoarrow_array_stream():") + # Don't support the schema argument yet - stopifnot(is.null(schema)) + if (!is.null(schema)) { + Err_plain("The `schema` argument is not supported yet") |> + uw() + } + + if (!is_scalar_bool(future)) { + Err_plain("`future` argument must be `TRUE` or `FALSE`") |> + uw() + } + stream = nanoarrow::nanoarrow_allocate_array_stream() - .pr$DataFrame$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream)) + .pr$DataFrame$export_stream(x, nanoarrow::nanoarrow_pointer_addr_chr(stream), future) stream } #' Infer nanoarrow schema from a Polars object #' -#' @inheritParams as_arrow_table.RPolarsDataFrame +#' @inheritParams as_nanoarrow_array_stream.RPolarsDataFrame #' @rdname S3_infer_nanoarrow_schema #' @examples #' library(nanoarrow) @@ -29,6 +41,8 @@ as_nanoarrow_array_stream.RPolarsDataFrame = function(x, ..., schema = NULL) { #' #' infer_nanoarrow_schema(pl_df) # exported in zzz.R -infer_nanoarrow_schema.RPolarsDataFrame = function(x, ...) { - as_nanoarrow_array_stream.RPolarsDataFrame(x)$get_schema() +infer_nanoarrow_schema.RPolarsDataFrame = function(x, ..., future = FALSE) { + as_nanoarrow_array_stream.RPolarsDataFrame(x, future = future)$get_schema() |> + result() |> + unwrap("in infer_nanoarrow_schema():") } diff --git a/man/S3_as_arrow_table.Rd b/man/S3_as_arrow_table.Rd index 0eac43626..e1e2a38dd 100644 --- a/man/S3_as_arrow_table.Rd +++ b/man/S3_as_arrow_table.Rd @@ -4,12 +4,17 @@ \alias{as_arrow_table.RPolarsDataFrame} \title{Create a arrow Table from a Polars object} \usage{ -\method{as_arrow_table}{RPolarsDataFrame}(x, ...) +\method{as_arrow_table}{RPolarsDataFrame}(x, ..., future = FALSE) } \arguments{ \item{x}{\link[=DataFrame_class]{A Polars DataFrame}} \item{...}{Ignored} + +\item{future}{Setting this to \code{TRUE} will write Polars' internal data structures that +might not be available by other Arrow implementations. +This functionality is considered \strong{unstable}. +It may be changed at any point without it being considered a breaking change.} } \description{ Create a arrow Table from a Polars object diff --git a/man/S3_as_nanoarrow_array_stream.Rd b/man/S3_as_nanoarrow_array_stream.Rd index 7d7014efd..1e68603f0 100644 --- a/man/S3_as_nanoarrow_array_stream.Rd +++ b/man/S3_as_nanoarrow_array_stream.Rd @@ -4,7 +4,7 @@ \alias{as_nanoarrow_array_stream.RPolarsDataFrame} \title{Create a nanoarrow_array_stream from a Polars object} \usage{ -\method{as_nanoarrow_array_stream}{RPolarsDataFrame}(x, ..., schema = NULL) +\method{as_nanoarrow_array_stream}{RPolarsDataFrame}(x, ..., schema = NULL, future = FALSE) } \arguments{ \item{x}{\link[=DataFrame_class]{A Polars DataFrame}} @@ -12,6 +12,11 @@ \item{...}{Ignored} \item{schema}{must stay at default value NULL} + +\item{future}{Setting this to \code{TRUE} will write Polars' internal data structures that +might not be available by other Arrow implementations. +This functionality is considered \strong{unstable}. +It may be changed at any point without it being considered a breaking change.} } \description{ Create a nanoarrow_array_stream from a Polars object diff --git a/man/S3_as_record_batch_reader.Rd b/man/S3_as_record_batch_reader.Rd index c27514e02..af9de19eb 100644 --- a/man/S3_as_record_batch_reader.Rd +++ b/man/S3_as_record_batch_reader.Rd @@ -4,12 +4,17 @@ \alias{as_record_batch_reader.RPolarsDataFrame} \title{Create a arrow RecordBatchReader from a Polars object} \usage{ -\method{as_record_batch_reader}{RPolarsDataFrame}(x, ...) +\method{as_record_batch_reader}{RPolarsDataFrame}(x, ..., future = FALSE) } \arguments{ \item{x}{\link[=DataFrame_class]{A Polars DataFrame}} \item{...}{Ignored} + +\item{future}{Setting this to \code{TRUE} will write Polars' internal data structures that +might not be available by other Arrow implementations. +This functionality is considered \strong{unstable}. +It may be changed at any point without it being considered a breaking change.} } \description{ Create a arrow RecordBatchReader from a Polars object diff --git a/man/S3_infer_nanoarrow_schema.Rd b/man/S3_infer_nanoarrow_schema.Rd index afb52c98f..cb9eeab0f 100644 --- a/man/S3_infer_nanoarrow_schema.Rd +++ b/man/S3_infer_nanoarrow_schema.Rd @@ -4,12 +4,17 @@ \alias{infer_nanoarrow_schema.RPolarsDataFrame} \title{Infer nanoarrow schema from a Polars object} \usage{ -\method{infer_nanoarrow_schema}{RPolarsDataFrame}(x, ...) +\method{infer_nanoarrow_schema}{RPolarsDataFrame}(x, ..., future = FALSE) } \arguments{ \item{x}{\link[=DataFrame_class]{A Polars DataFrame}} \item{...}{Ignored} + +\item{future}{Setting this to \code{TRUE} will write Polars' internal data structures that +might not be available by other Arrow implementations. +This functionality is considered \strong{unstable}. +It may be changed at any point without it being considered a breaking change.} } \description{ Infer nanoarrow schema from a Polars object diff --git a/src/rust/src/arrow_interop/to_rust.rs b/src/rust/src/arrow_interop/to_rust.rs index ecbc73657..a38126256 100644 --- a/src/rust/src/arrow_interop/to_rust.rs +++ b/src/rust/src/arrow_interop/to_rust.rs @@ -154,14 +154,3 @@ fn consume_arrow_stream_to_series(boxed_stream: Box) -> R } Ok(s) } - -pub unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> { - let stream_ptr = - crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream; - let schema = df.schema().to_arrow(true); - let data_type = pl::ArrowDataType::Struct(schema.fields); - let field = pl::ArrowField::new("", data_type, false); - let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df)); - unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) }; - Ok(()) -} diff --git a/src/rust/src/rdataframe/mod.rs b/src/rust/src/rdataframe/mod.rs index 3290f748d..9b2884119 100644 --- a/src/rust/src/rdataframe/mod.rs +++ b/src/rust/src/rdataframe/mod.rs @@ -33,11 +33,12 @@ pub struct OwnedDataFrameIterator { data_type: arrow::datatypes::ArrowDataType, idx: usize, n_chunks: usize, + pl_flavor: bool, } impl OwnedDataFrameIterator { - pub fn new(df: polars::frame::DataFrame) -> Self { - let schema = df.schema().to_arrow(false); + pub fn new(df: polars::frame::DataFrame, pl_flavor: bool) -> Self { + let schema = df.schema().to_arrow(pl_flavor); let data_type = ArrowDataType::Struct(schema.fields); let vs = df.get_columns().to_vec(); Self { @@ -45,6 +46,7 @@ impl OwnedDataFrameIterator { data_type, idx: 0, n_chunks: df.n_chunks(), + pl_flavor, } } } @@ -60,7 +62,7 @@ impl Iterator for OwnedDataFrameIterator { let batch_cols = self .columns .iter() - .map(|s| s.to_arrow(self.idx, false)) + .map(|s| s.to_arrow(self.idx, self.pl_flavor)) .collect(); self.idx += 1; @@ -346,12 +348,12 @@ impl RPolarsDataFrame { Ok(List::from_values(vec)) } - pub fn export_stream(&self, stream_ptr: &str) { - let schema = self.0.schema().to_arrow(false); + pub fn export_stream(&self, stream_ptr: &str, pl_flavor: bool) { + let schema = self.0.schema().to_arrow(pl_flavor); let data_type = ArrowDataType::Struct(schema.fields); let field = ArrowField::new("", data_type, false); - let iter_boxed = Box::new(OwnedDataFrameIterator::new(self.0.clone())); + let iter_boxed = Box::new(OwnedDataFrameIterator::new(self.0.clone(), pl_flavor)); let mut stream = arrow::ffi::export_iterator(iter_boxed, field); let stream_out_ptr_addr: usize = stream_ptr.parse().unwrap(); let stream_out_ptr = stream_out_ptr_addr as *mut arrow::ffi::ArrowArrayStream; diff --git a/src/rust/src/rlib.rs b/src/rust/src/rlib.rs index b31edb78b..377f666cb 100644 --- a/src/rust/src/rlib.rs +++ b/src/rust/src/rlib.rs @@ -218,17 +218,6 @@ fn arrow_stream_to_series(robj_str: Robj) -> RResult { Ok(RPolarsSeries(s).into_robj()) } -#[extendr] -unsafe fn export_df_to_arrow_stream(robj_df: Robj, robj_str: Robj) -> RResult { - let res: ExternalPtr = robj_df.try_into()?; - let pl_df = RPolarsDataFrame(res.0.clone()).0; - //safety robj_str must be ptr to a arrow2 stream ready to export into - unsafe { - crate::arrow_interop::to_rust::export_df_as_stream(pl_df, &robj_str)?; - } - Ok(robj_str) -} - #[extendr] pub fn dtype_str_repr(dtype: Robj) -> RResult { let dtype = robj_to!(RPolarsDataType, dtype)?.0; @@ -474,7 +463,6 @@ extendr_module! { fn new_arrow_stream; fn arrow_stream_to_df; fn arrow_stream_to_series; - fn export_df_to_arrow_stream; //robj meta fn mem_address; diff --git a/tests/testthat/test-pkg-arrow.R b/tests/testthat/test-pkg-arrow.R index 336ddc4df..c8ce5965b 100644 --- a/tests/testthat/test-pkg-arrow.R +++ b/tests/testthat/test-pkg-arrow.R @@ -6,8 +6,7 @@ test_that("as_record_batch_reader() works for DataFrame", { expect_s3_class(reader, "RecordBatchReader") expect_identical( - # two as.data.frame()s because arrow sometimes returns a tibble here - as.data.frame(as.data.frame(reader)), + as.data.frame(reader), data.frame(a = 1L, b = "two") ) }) @@ -20,8 +19,12 @@ test_that("as_arrow_table() works for DataFrame", { expect_s3_class(table, "Table") expect_identical( - # two as.data.frame()s because arrow sometimes returns a tibble here - as.data.frame(as.data.frame(table)), + as.data.frame(table), data.frame(a = 1L, b = "two") ) + + expect_identical( + arrow::as_arrow_table(df, future = TRUE)$b$type$ToString(), + "string_view" + ) }) diff --git a/tests/testthat/test-pkg-nanoarrow.R b/tests/testthat/test-pkg-nanoarrow.R index 93b5d2dbf..2216e60b6 100644 --- a/tests/testthat/test-pkg-nanoarrow.R +++ b/tests/testthat/test-pkg-nanoarrow.R @@ -8,6 +8,14 @@ test_that("as_nanoarrow_array_stream() works for DataFrame", { as.data.frame(stream), data.frame(a = 1L, b = "two") ) + + # nanoarrow does not support the string view type yet + # https://github.com/apache/arrow-nanoarrow/pull/367 + expect_grepl_error( + nanoarrow::as_nanoarrow_array_stream(df, future = TRUE) |> + as.data.frame(), + "Unknown format: 'vu'" + ) }) test_that("infer_nanoarrow_schema() works for DataFrame", {