From cc124b37b89dbf6896fbdca172338b237f3ea9fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Havelund=20Welling?= Date: Sat, 2 Sep 2023 23:53:24 +0200 Subject: [PATCH] support extendr_polars (#326) --- NEWS.md | 3 + R/extendr-wrappers.R | 10 ++- src/rust/src/arrow_interop/to_rust.rs | 88 +++++++++++++++------- src/rust/src/lazy/dsl.rs | 17 ----- src/rust/src/rdataframe/mod.rs | 2 +- src/rust/src/rlib.rs | 43 ++++++++++- src/rust/src/utils/mod.rs | 16 ++++ tests/testthat/test-arrow_extendr_polars.R | 71 +++++++++++++++++ 8 files changed, 199 insertions(+), 51 deletions(-) create mode 100644 tests/testthat/test-arrow_extendr_polars.R diff --git a/NEWS.md b/NEWS.md index a1e030651..60a952db7 100644 --- a/NEWS.md +++ b/NEWS.md @@ -83,6 +83,9 @@ features. Unrelated breaking changes and new features are put in separate sectio - Added an S3 generic `as_polars_series()` where users or developers of extensions can define a custom way to convert their format to Polars format. This generic must return a Polars series. See #368 for an example (#369). +- Private API Support for Arrow Stream import/export of DataFrame between two R packages that uses + rust-polars. [See R package example here](https://github.com/rpolars/extendrpolarsexamples) + (#326). # polars 0.7.0 diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index dc1cd2682..0613f37f5 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -43,10 +43,16 @@ struct_ <- function(exprs, eager, schema) .Call(wrap__struct_, exprs, eager, sch rb_list_to_df <- function(r_batches, names) .Call(wrap__rb_list_to_df, r_batches, names) -arrow_stream_to_rust <- function(rbr) invisible(.Call(wrap__arrow_stream_to_rust, rbr)) - dtype_str_repr <- function(dtype) .Call(wrap__dtype_str_repr, dtype) +new_arrow_stream <- function() .Call(wrap__new_arrow_stream) + +arrow_stream_to_df <- function(robj_str) .Call(wrap__arrow_stream_to_df, robj_str) + +arrow_stream_to_series <- function(robj_str) .Call(wrap__arrow_stream_to_series, robj_str) + +export_df_to_arrow_stream <- function(robj_df, robj_str) .Call(wrap__export_df_to_arrow_stream, robj_df, robj_str) + mem_address <- function(robj) .Call(wrap__mem_address, robj) clone_robj <- function(robj) .Call(wrap__clone_robj, robj) diff --git a/src/rust/src/arrow_interop/to_rust.rs b/src/rust/src/arrow_interop/to_rust.rs index 8c46c690c..4ec71243a 100644 --- a/src/rust/src/arrow_interop/to_rust.rs +++ b/src/rust/src/arrow_interop/to_rust.rs @@ -1,3 +1,4 @@ +use crate::rpolarserr::*; use extendr_api::prelude::*; use polars::prelude as pl; use polars_core::export::rayon::prelude::*; @@ -39,33 +40,6 @@ unsafe fn wrap_make_external_ptr(t: &mut T) -> Robj { //use extendr_api::{Integers, Rinternals}; unsafe { ::make_external_ptr(t, r!(extendr_api::NULL)) } } -//does not support chunked array -pub fn arrow_array_stream_to_rust( - arrow_stream_reader: Robj, - opt_f: Option<&Function>, -) -> Result { - let mut stream = Box::new(ffi::ArrowArrayStream::empty()); - //let mut schema = Box::new(ffi::ArrowSchema::empty()); - let ext_stream = unsafe { wrap_make_external_ptr(&mut *stream) }; - - if let Some(f) = opt_f { - f.call(pairlist!(arrow_stream_reader, ext_stream))?; - } else { - call!(r"\(x,y) x$export_to_c(y)", arrow_stream_reader, ext_stream)?; - }; - dbg!("after export"); - - let mut iter = - unsafe { ffi::ArrowArrayStreamReader::try_new(stream) }.map_err(|err| err.to_string())?; - dbg!("after reader"); - - while let Some(array_res) = unsafe { iter.next() } { - let array = array_res.map_err(|err| err.to_string())?; - dbg!(&array); - } - - todo!("not more for now"); -} pub fn rb_to_rust_df(r_rb_columns: List, names: &[String]) -> Result { let n_col = r_rb_columns.len(); @@ -160,3 +134,63 @@ pub fn to_rust_df(rb: Robj) -> Result { let dfs = crate::utils::collect_hinted_result(rb_len, dfs_iter)?; Ok(accumulate_dataframes_vertical_unchecked(dfs)) } + +// r-polars as consumer 1: create a new stream and wrap pointer in Robj as str. +pub fn new_arrow_stream_internal() -> Robj { + let aas = Box::new(ffi::ArrowArrayStream::empty()); + let x = Box::leak(aas); // leak box to make lifetime static + let x = x as *mut ffi::ArrowArrayStream; + crate::utils::usize_to_robj_str(x as usize) +} + +// r-polars as consumer 2: recieve to pointer to own stream, which producer has exported to. Consume it. Return Series. +pub fn arrow_stream_to_series_internal(robj_str: Robj) -> RResult { + // reclaim ownership of leaked box, and then drop/release it when consumed. + let us = crate::utils::robj_str_ptr_to_usize(&robj_str)?; + let boxed_stream = unsafe { Box::from_raw(us as *mut ffi::ArrowArrayStream) }; + + //consume stream and produce a r-polars Series return as Robj + let s = consume_arrow_stream_to_series(boxed_stream)?; + Ok(s) +} + +// implementation of consuming stream to Series. Stream is drop/released hereafter. +fn consume_arrow_stream_to_series(boxed_stream: Box) -> RResult { + let mut iter = unsafe { ffi::ArrowArrayStreamReader::try_new(boxed_stream) }?; + + //import first array into pl::Series + let mut s = if let Some(array_res) = unsafe { iter.next() } { + let array = array_res?; + let series_res: pl::PolarsResult = + std::convert::TryFrom::try_from(("df", array)); + let series = series_res.map_err(polars_to_rpolars_err)?; + series + } else { + rerr() + .plain("Arrow array stream was empty") + .hint("producer did not export to stream") + .when("consuming arrow array stream")?; + unreachable!(); + }; + + // append any other arrays to Series + while let Some(array_res) = unsafe { iter.next() } { + let array = array_res?; + let series_res: pl::PolarsResult = + std::convert::TryFrom::try_from(("df", array)); + let series = series_res.map_err(polars_to_rpolars_err)?; + s.append(&series).map_err(polars_to_rpolars_err)?; + } + Ok(s) +} + +pub unsafe fn export_df_as_stream(df: pl::DataFrame, robj_str_ref: &Robj) -> RResult<()> { + let stream_ptr = + crate::utils::robj_str_ptr_to_usize(robj_str_ref)? as *mut ffi::ArrowArrayStream; + let schema = df.schema().to_arrow(); + let data_type = pl::ArrowDataType::Struct(schema.fields); + let field = pl::ArrowField::new("", data_type, false); + let iter_boxed = Box::new(crate::rdataframe::OwnedDataFrameIterator::new(df)); + unsafe { *stream_ptr = ffi::export_iterator(iter_boxed, field) }; + Ok(()) +} diff --git a/src/rust/src/lazy/dsl.rs b/src/rust/src/lazy/dsl.rs index 36efd2954..5668f4386 100644 --- a/src/rust/src/lazy/dsl.rs +++ b/src/rust/src/lazy/dsl.rs @@ -1987,23 +1987,6 @@ impl Expr { let infer_schema_len = robj_to!(Option, usize, infer_schema_len)?; Ok(self .0 - // ======= - // pub fn str_json_extract(&self, dtype: Nullable<&RPolarsDataType>) -> Self { - // let dtype = null_to_opt(dtype).map(|dt| dt.0.clone()); - // use pl::*; - // let output_type = match dtype.clone() { - // Some(dtype) => pl::GetOutput::from_type(dtype), - // None => pl::GetOutput::from_type(DataType::Unknown), - // }; - // let function = move |s: Series| { - // let ca = s.utf8()?; - // match ca.json_extract(dtype.clone()) { - // Ok(ca) => Ok(Some(ca.into_series())), - // Err(e) => Err(PolarsError::ComputeError(format!("{e:?}").into())), - // } - // }; - // self.0 - // >>>>>>> origin/main .clone() .str() .json_extract(dtype, infer_schema_len) diff --git a/src/rust/src/rdataframe/mod.rs b/src/rust/src/rdataframe/mod.rs index e23b63fc9..3c23d132e 100644 --- a/src/rust/src/rdataframe/mod.rs +++ b/src/rust/src/rdataframe/mod.rs @@ -35,7 +35,7 @@ pub struct OwnedDataFrameIterator { } impl OwnedDataFrameIterator { - fn new(df: polars::frame::DataFrame) -> Self { + pub fn new(df: polars::frame::DataFrame) -> Self { let schema = df.schema().to_arrow(); let data_type = DataType::Struct(schema.fields); let vs = df.get_columns().to_vec(); diff --git a/src/rust/src/rlib.rs b/src/rust/src/rlib.rs index 90d111127..07a25529c 100644 --- a/src/rust/src/rlib.rs +++ b/src/rust/src/rlib.rs @@ -4,6 +4,7 @@ use crate::rdataframe::DataFrame; use crate::robj_to; use crate::rpolarserr::{rdbg, RResult}; +use crate::series::Series; use crate::{rdataframe::VecDataFrame, utils::r_result_list}; use extendr_api::prelude::*; use polars::prelude as pl; @@ -154,9 +155,37 @@ fn struct_(exprs: Robj, eager: Robj, schema: Robj) -> Result { } #[extendr] -fn arrow_stream_to_rust(rbr: Robj) { - let x = crate::arrow_interop::to_rust::arrow_array_stream_to_rust(rbr, None).unwrap(); - dbg!(x); +fn new_arrow_stream() -> Robj { + crate::arrow_interop::to_rust::new_arrow_stream_internal() +} +use crate::rpolarserr::*; +#[extendr] +fn arrow_stream_to_df(robj_str: Robj) -> RResult { + let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?; + let ca = s + .struct_() + .map_err(polars_to_rpolars_err) + .when("unpack struct from producer") + .hint("producer exported a plain Series not a Struct series")?; + let df: pl::DataFrame = ca.clone().into(); + Ok(DataFrame(df).into_robj()) +} + +#[extendr] +fn arrow_stream_to_series(robj_str: Robj) -> RResult { + let s = crate::arrow_interop::to_rust::arrow_stream_to_series_internal(robj_str)?; + Ok(Series(s).into_robj()) +} + +#[extendr] +unsafe fn export_df_to_arrow_stream(robj_df: Robj, robj_str: Robj) -> RResult { + let res: ExternalPtr = robj_df.try_into()?; + let pl_df = DataFrame(res.0.clone()).0; + //safety robj_str must be ptr to a arrow2 stream ready to export into + unsafe { + crate::arrow_interop::to_rust::export_df_as_stream(pl_df, &robj_str)?; + } + Ok(robj_str) } #[extendr] @@ -256,9 +285,15 @@ extendr_module! { //fn series_from_arrow; //fn rb_to_df; fn rb_list_to_df; - fn arrow_stream_to_rust; + fn dtype_str_repr; + // arrow conversions + fn new_arrow_stream; + fn arrow_stream_to_df; + fn arrow_stream_to_series; + fn export_df_to_arrow_stream; + //robj meta fn mem_address; fn clone_robj; diff --git a/src/rust/src/utils/mod.rs b/src/rust/src/utils/mod.rs index 0779f2444..5b3a0e87b 100644 --- a/src/rust/src/utils/mod.rs +++ b/src/rust/src/utils/mod.rs @@ -1006,3 +1006,19 @@ pub fn collect_hinted_result_rerr( } Ok(new_vec) } + +//keep error simple to interface with other libs +pub fn robj_str_ptr_to_usize(robj: &Robj) -> RResult { + || -> RResult { + let str: &str = robj + .as_str() + .ok_or(RPolarsErr::new().plain("robj str ptr not a str".into()))?; + let us: usize = str.parse()?; + Ok(us) + }() + .when("converting robj str pointer to usize") +} + +pub fn usize_to_robj_str(us: usize) -> Robj { + format!("{us}").into() +} diff --git a/tests/testthat/test-arrow_extendr_polars.R b/tests/testthat/test-arrow_extendr_polars.R new file mode 100644 index 000000000..73a970cec --- /dev/null +++ b/tests/testthat/test-arrow_extendr_polars.R @@ -0,0 +1,71 @@ +test_that("rust-polars DataFrame import/export via arrow stream", { + # this round trip conversion is only a unit test, not an integration test. + # Arrow export/import of DataFrame is mainly useful to interface with other R packages using + # rust-polars + + # see https://github.com/rpolars/extendrpolarsexamples/blob/main/src/rust/src/lib.rs + # for simple example of use to import/export polars DataFrames to another rust-polars + # compilation unit in another R package. Version of rust-polars does not have to match. + + # These function are not a part of the public user API. But package developer can use them to + # import/export df's. + + # ARROW STREAM HAS AN CONTRACT TO UPHOLD BY PRODUCER AND CONSUMER. WRONG BEHAVOIR CAUSES SEGFAULT. + # SEE OUTCOMMENTED EXAMPLES OF ILLEGAL BEHAVIOR LEADING TO SEGFAULT BELOW. + + # PRODUCER has some df which could be chunked as here. Categoricals with global string cache + # are also ok. + pl$with_string_cache({ + df_export = pl$concat(lapply(1:3, \(i) pl$DataFrame(iris))) + }) + + # CONSUMER creates a new arrow stream and return ptr which is passed to PRODUCER + str_ptr = new_arrow_stream() + + # PRODUCER exports the df into CONSUMERs stream + export_df_to_arrow_stream(df_export, str_ptr) |> unwrap() + + # CONSUMER can now import the df from stream + pl$with_string_cache({ + df_import = arrow_stream_to_df(str_ptr) |> unwrap() + }) + + # check imported/exported df's are identical + expect_identical(df_import$to_list(), df_export$to_list()) + + ##UNSAFE / Undefined behavior / will blow up eventually / STUFF NOT TO DO + # examples below of did segfault ~every 5-10th time, during development + + # 1: DO NOT EXPORT TO STREAM MORE THAN ONCE + # new DataFrame can be exported to stream, but only the latest # BUT THIS SEGFAULTs sometimes + # export_df_to_arrow_stream(df_export, str_ptr) |> unwrap() + # export_df_to_arrow_stream(pl$DataFrame(mtcars), str_ptr) |> unwrap() + # mtcars_import = arrow_stream_to_df(str_ptr) |> unwrap() + + # 2: DO NOT IMPORT FROM STREAM MORE THAN ONCE + # reading from released(exhuasted) stream results in error most times + # BUT THIS SEGFAULTs sometimes + #ctx = arrow_stream_to_df(str_ptr)$err$contexts() + #expect_equal( + # ctx$PlainErrorMessage, + # r"{InvalidArgumentError("The C stream was already released")}" + # ) + + # 3: DO NOT IMPORT/EXPORT ARROW STREAM ACROSS PROCESSES (use IPC for that, see $map() docs) + # background process willSEGFAULT HERE + # str_ptr = new_arrow_stream() + # rsess = callr::r_bg(func = \(str_ptr) { + # library(polars) + # pl$with_string_cache({ + # df_export = pl$concat(lapply(1:3, \(i) pl$DataFrame(iris))) + # }) + # polars:::export_df_to_arrow_stream(df_export, str_ptr) + # },args = list(str_ptr=str_ptr)) + # + # Sys.sleep(3) + # df_import = arrow_stream_to_df(str_ptr) + # print(df_import) + # str_ptr = new_arrow_stream() + # rsess$get_result() + +})