diff --git a/NEWS.md b/NEWS.md index a67789baf..d03062584 100644 --- a/NEWS.md +++ b/NEWS.md @@ -13,7 +13,9 @@ into one (#349). - New stat functions `pl$cov()`, `pl$rolling_cov()` `pl$corr()`, `pl$rolling_corr()` (#351). - Fix bug to allow using polars without library(polars) (#355). - +- New methods `$optimization_toggle()` + `$profile()` and enable rust-polars feature + CSE: "Activate common subplan elimination optimization" (#323) + # polars 0.7.0 ## BREAKING CHANGES diff --git a/R/expr__expr.R b/R/expr__expr.R index fb7225016..9713f1a38 100644 --- a/R/expr__expr.R +++ b/R/expr__expr.R @@ -1120,8 +1120,7 @@ Expr_map_alias = function(fun) { ) { assign(".warn_map_alias", 1L, envir = runtime_state) # it does not seem map alias is executed multi-threaded but rather immediately during building lazy query - # if ever crashing, any lazy method like select, filter, with_columns must use something like handle_thread_r_requests() - # then handle_thread_r_requests should be rewritten to handle any type. + # if ever crashing, any lazy method like select, filter, with_columns must use something like filter_with_r_func_support() message("map_alias function is experimentally without some thread-safeguards, please report any crashes") # TODO resolve } if (!is.function(fun)) pstop(err = "alias_map fun must be a function") diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index 8cdbd706f..93bc1aca9 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -59,10 +59,14 @@ clone_robj <- function(robj) .Call(wrap__clone_robj, robj) test_robj_to_usize <- function(robj) .Call(wrap__test_robj_to_usize, robj) +test_robj_to_f64 <- function(robj) .Call(wrap__test_robj_to_f64, robj) + test_robj_to_i64 <- function(robj) .Call(wrap__test_robj_to_i64, robj) test_robj_to_u32 <- function(robj) .Call(wrap__test_robj_to_u32, robj) +test_robj_to_i32 <- function(robj) .Call(wrap__test_robj_to_i32, robj) + test_print_string <- function(s) invisible(.Call(wrap__test_print_string, s)) test_robj_to_expr <- function(robj) .Call(wrap__test_robj_to_expr, robj) @@ -899,8 +903,6 @@ LazyFrame$collect_background <- function() .Call(wrap__LazyFrame__collect_backgr LazyFrame$collect <- function() .Call(wrap__LazyFrame__collect, self) -LazyFrame$collect_handled <- function() .Call(wrap__LazyFrame__collect_handled, self) - LazyFrame$first <- function() .Call(wrap__LazyFrame__first, self) LazyFrame$last <- function() .Call(wrap__LazyFrame__last, self) @@ -967,6 +969,10 @@ LazyFrame$rename <- function(existing, new) .Call(wrap__LazyFrame__rename, self, LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self) +LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, cse, streaming) .Call(wrap__LazyFrame__optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, cse, streaming) + +LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self) + LazyFrame$explode <- function(columns, dotdotdot_args) .Call(wrap__LazyFrame__explode, self, columns, dotdotdot_args) LazyFrame$clone_see_me_macro <- function() .Call(wrap__LazyFrame__clone_see_me_macro, self) diff --git a/R/lazyframe__lazy.R b/R/lazyframe__lazy.R index c5b318db9..13cb2905d 100644 --- a/R/lazyframe__lazy.R +++ b/R/lazyframe__lazy.R @@ -265,15 +265,79 @@ LazyFrame_filter = "use_extendr_wrapper" #' @title New DataFrame from LazyFrame_object$collect() #' @description collect DataFrame by lazy query +#' @param type_coercion Boolean. Coerce types such that operations succeed and +#' run on minimal required memory. +#' @param predicate_pushdown Boolean. Applies filters as early as possible / at +#' scan level. +#' @param projection_pushdown Boolean. Applies filters as early as possible / at +#' scan level. +#' @param simplify_expression Boolean. Cache subtrees/file scans that are used +#' by multiple subtrees in the query plan. +#' @param slice_pushdown Boolean. Only load the required slice from the scan +#' level. Don't materialize sliced outputs (e.g. `join$head(10)`). +#' @param common_subplan_elimination Boolean. Cache subtrees/file scans that +#' are used by multiple subtrees in the query plan. +#' @param no_optimization Boolean. Turn off the following optimizations: +#' predicate_pushdown = FALSE +#' projection_pushdown = FALSE +#' slice_pushdown = FALSE +#' common_subplan_elimination = FALSE +#' @param streaming Boolean. Run parts of the query in a streaming fashion +#' (this is in an alpha state). +#' @param collect_in_background Boolean. Detach this query from R session. +#' Computation will start in background. Get a handle which later can be converted +#' into the resulting DataFrame. Useful in interactive mode to not lock R session. +#' @details +#' Note: use `$fetch(n)` if you want to run your query on the first `n` rows only. +#' This can be a huge time saver in debugging queries. #' @keywords LazyFrame DataFrame_new -#' @return collected `DataFrame` -#' @examples pl$DataFrame(iris)$lazy()$filter(pl$col("Species") == "setosa")$collect() -LazyFrame_collect = function() { - unwrap(.pr$LazyFrame$collect_handled(self), "in $collect():") +#' @return A `DataFrame` +#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect() +LazyFrame_collect = function( + type_coercion = TRUE, + predicate_pushdown = TRUE, + projection_pushdown = TRUE, + simplify_expression = TRUE, + slice_pushdown = TRUE, + common_subplan_elimination = TRUE, + no_optimization = FALSE, + streaming = FALSE, + collect_in_background = FALSE) { + + if (isTRUE(no_optimization)) { + predicate_pushdown = FALSE + projection_pushdown = FALSE + slice_pushdown = FALSE + common_subplan_elimination = FALSE + } + + if (isTRUE(streaming)) { + common_subplan_elimination = FALSE + } + + collect_f = if (isTRUE(collect_in_background)) { + .pr$LazyFrame$collect_background + } else { + .pr$LazyFrame$collect + } + + self |> + .pr$LazyFrame$optimization_toggle( + type_coercion, + predicate_pushdown, + projection_pushdown, + simplify_expression, + slice_pushdown, + common_subplan_elimination, + streaming + ) |> + and_then(collect_f) |> + unwrap("in $collect():") } #' @title New DataFrame from LazyFrame_object$collect() -#' @description collect DataFrame by lazy query +#' @description collect DataFrame by lazy query (SOFT DEPRECATED) +#' @details This function is soft deprecated. Use $collect(collect_in_background = TRUE) instead #' @keywords LazyFrame DataFrame_new #' @return collected `DataFrame` #' @examples pl$DataFrame(iris)$lazy()$filter(pl$col("Species") == "setosa")$collect() @@ -926,6 +990,45 @@ LazyFrame_dtypes = method_as_property(function() { unwrap("in $dtypes()") }) +#' @title Collect and profile a lazy query. +#' @description This will run the query and return a list containing the materialized DataFrame and +#' a DataFrame that contains profiling information of each node that is executed. +#' @details The units of the timings are microseconds. +#' +#' @keywords LazyFrame +#' @return List of two `DataFrame`s: one with the collected result, the other with the timings of each step. +#' @examples +#' +#' ## Simplest use case +#' pl$LazyFrame()$select(pl$lit(2) + 2)$profile() +#' +#' ## Use $profile() to compare two queries +#' +#' # -1- map each Species-group with native polars, takes ~120us only +#' pl$LazyFrame(iris)$ +#' sort("Sepal.Length")$ +#' groupby("Species", maintain_order = TRUE)$ +#' agg(pl$col(pl$Float64)$first() + 5)$ +#' profile() +#' +#' # -2- map each Species-group of each numeric column with an R function, takes ~7000us (slow!) +#' +#' # some R function, prints `.` for each time called by polars +#' r_func = \(s) { +#' cat(".") +#' s$to_r()[1] + 5 +#' } +#' +#' pl$LazyFrame(iris)$ +#' sort("Sepal.Length")$ +#' groupby("Species", maintain_order = TRUE)$ +#' agg(pl$col(pl$Float64)$apply(r_func))$ +#' profile() +#' +LazyFrame_profile = function() { + .pr$LazyFrame$profile(self) |> unwrap("in $profile()") +} + #' @title Explode the DataFrame to long format by exploding the given columns #' @keywords LazyFrame #' @@ -954,7 +1057,6 @@ LazyFrame_explode = function(columns = list(), ...) { #' @return A LazyFrame #' @examples #' pl$LazyFrame(mtcars)$clone() - LazyFrame_clone = function() { .pr$LazyFrame$clone_see_me_macro(self) } diff --git a/man/Expr_pow.Rd b/man/Expr_pow.Rd index b41b32a85..b818b4fbf 100644 --- a/man/Expr_pow.Rd +++ b/man/Expr_pow.Rd @@ -29,15 +29,15 @@ exponentiation operator. pl$DataFrame(a = -1:3)$select( pl$lit(2)$pow(pl$col("a"))$alias("with $pow()"), 2^pl$lit(-2:2), # brief use - pl$lit(2)$alias("left hand side name") ^ pl$lit(-3:1)$alias("right hand side name") + pl$lit(2)$alias("left hand side name")^pl$lit(-3:1)$alias("right hand side name") ) # Example on the R behavior of the `**`-'quasi operator' -2^1 # normal use +2^1 # normal use 2**1 # this works because ** is converted to the `^`-operator by the R interpreter -get("^")(2,1) #this works because there exists a function called "^" +get("^")(2, 1) # this works because there exists a function called "^" # the R interpreter will not convert "**" to "^" and there is no function named "**" -tryCatch(get("**")(2,1), error = as.character) +tryCatch(get("**")(2, 1), error = as.character) } \keyword{Expr} diff --git a/man/LazyFrame_collect.Rd b/man/LazyFrame_collect.Rd index 1de6d2e41..1ef627a86 100644 --- a/man/LazyFrame_collect.Rd +++ b/man/LazyFrame_collect.Rd @@ -4,16 +4,62 @@ \alias{LazyFrame_collect} \title{New DataFrame from LazyFrame_object$collect()} \usage{ -LazyFrame_collect() +LazyFrame_collect( + type_coercion = TRUE, + predicate_pushdown = TRUE, + projection_pushdown = TRUE, + simplify_expression = TRUE, + slice_pushdown = TRUE, + common_subplan_elimination = TRUE, + no_optimization = FALSE, + streaming = FALSE, + collect_in_background = FALSE +) +} +\arguments{ +\item{type_coercion}{Boolean. Coerce types such that operations succeed and +run on minimal required memory.} + +\item{predicate_pushdown}{Boolean. Applies filters as early as possible / at +scan level.} + +\item{projection_pushdown}{Boolean. Applies filters as early as possible / at +scan level.} + +\item{simplify_expression}{Boolean. Cache subtrees/file scans that are used +by multiple subtrees in the query plan.} + +\item{slice_pushdown}{Boolean. Only load the required slice from the scan +level. Don't materialize sliced outputs (e.g. \code{join$head(10)}).} + +\item{common_subplan_elimination}{Boolean. Cache subtrees/file scans that +are used by multiple subtrees in the query plan.} + +\item{no_optimization}{Boolean. Turn off the following optimizations: +predicate_pushdown = FALSE +projection_pushdown = FALSE +slice_pushdown = FALSE +common_subplan_elimination = FALSE} + +\item{streaming}{Boolean. Run parts of the query in a streaming fashion +(this is in an alpha state).} + +\item{collect_in_background}{Boolean. Detach this query from R session. +Computation will start in background. Get a handle which later can be converted +into the resulting DataFrame. Useful in interactive mode to not lock R session.} } \value{ -collected \code{DataFrame} +A \code{DataFrame} } \description{ collect DataFrame by lazy query } +\details{ +Note: use \verb{$fetch(n)} if you want to run your query on the first \code{n} rows only. +This can be a huge time saver in debugging queries. +} \examples{ -pl$DataFrame(iris)$lazy()$filter(pl$col("Species") == "setosa")$collect() +pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect() } \keyword{DataFrame_new} \keyword{LazyFrame} diff --git a/man/LazyFrame_collect_background.Rd b/man/LazyFrame_collect_background.Rd index 4506b06fd..9710c97b1 100644 --- a/man/LazyFrame_collect_background.Rd +++ b/man/LazyFrame_collect_background.Rd @@ -10,7 +10,10 @@ LazyFrame_collect_background() collected \code{DataFrame} } \description{ -collect DataFrame by lazy query +collect DataFrame by lazy query (SOFT DEPRECATED) +} +\details{ +This function is soft deprecated. Use $collect(collect_in_background = TRUE) instead } \examples{ pl$DataFrame(iris)$lazy()$filter(pl$col("Species") == "setosa")$collect() diff --git a/man/LazyFrame_profile.Rd b/man/LazyFrame_profile.Rd new file mode 100644 index 000000000..ae4c6a0aa --- /dev/null +++ b/man/LazyFrame_profile.Rd @@ -0,0 +1,48 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/lazyframe__lazy.R +\name{LazyFrame_profile} +\alias{LazyFrame_profile} +\title{Collect and profile a lazy query.} +\usage{ +LazyFrame_profile() +} +\value{ +List of two \code{DataFrame}s: one with the collected result, the other with the timings of each step. +} +\description{ +This will run the query and return a list containing the materialized DataFrame and +a DataFrame that contains profiling information of each node that is executed. +} +\details{ +The units of the timings are microseconds. +} +\examples{ + +## Simplest use case +pl$LazyFrame()$select(pl$lit(2) + 2)$profile() + +## Use $profile() to compare two queries + +# -1- map each Species-group with native polars, takes ~120us only +pl$LazyFrame(iris)$ + sort("Sepal.Length")$ + groupby("Species", maintain_order = TRUE)$ + agg(pl$col(pl$Float64)$first() + 5)$ + profile() + +# -2- map each Species-group of each numeric column with an R function, takes ~7000us (slow!) + +# some R function, prints `.` for each time called by polars +r_func = \(s) { + cat(".") + s$to_r()[1] + 5 +} + +pl$LazyFrame(iris)$ + sort("Sepal.Length")$ + groupby("Species", maintain_order = TRUE)$ + agg(pl$col(pl$Float64)$apply(r_func))$ + profile() + +} +\keyword{LazyFrame} diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 85e6b81e9..5a8ec4e9b 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -80,6 +80,7 @@ features = [ "repeat_by", "interpolate", #"list", + "cse", "ewma", "rank", "diff", diff --git a/src/rust/src/concurrent.rs b/src/rust/src/concurrent.rs index f39c538a9..2c90e5d11 100644 --- a/src/rust/src/concurrent.rs +++ b/src/rust/src/concurrent.rs @@ -9,76 +9,75 @@ use crate::CONFIG; use polars::prelude as pl; use crate::rdataframe::Series; +use crate::rpolarserr::*; use extendr_api::prelude::*; use extendr_api::Conversions; use std::result::Result; use std::thread::JoinHandle; -// This functions allows to call .collect() on polars lazy frame. A lazy frame may contain user defined functions -// which could call R from any spawned thread by polars. This function is a bridge between multithraedded polars -// and mostly single threaded only R +// This is the standard way the main thread which can call the R session, +// should process a request from a polars thread worker to run an R function +fn serve_r((probj, s): (ParRObj, pl::Series)) -> Result> { + //unpack user-R-function + let f = probj.0.as_function().ok_or_else(|| { + extendr_api::error::Error::Other(format!( + "provided input is not an R function but a: {:?}", + probj.0 + )) + })?; + + // call user-R-function with Series as input, return Robj (likeliy as Series) + let rseries_robj = f.call(pairlist!(Series(s)))?; -//handle_thread_r_request() is a special case of the concurrent_handler() -//handle_thread_r_request passes 2 closures to concurrent_handler() + 1 closure to lazyframe -//1: What to run (polars collect), 2: what handler to execute code -//3: Expr.map or such, passes an R user defined function in a closure to the lazyframe. This closure describes -//how to request R functions run on the mainthread with a ThreadCom object. + // return of user-R-function may not be Series, return Err if so + let s = Series::any_robj_to_pl_series_result(rseries_robj)?; -pub fn handle_thread_r_requests(lazy_df: pl::LazyFrame) -> pl::PolarsResult { - let res_res_df = concurrent_handler( - // Closure 1: start concurrent handler and get final result - // This what we want to do in the first place ... to run LazyFrame.collect() thread-safe + Ok(s) +} - //this closure gets spawned in a child of main thred, tc is a ThreadCom struct +// This functions allows to call .collect() on polars lazy frame. A lazy frame may contain user defined functions +// which could call R from any spawned thread by polars. This function is a bridge between multithraedded polars +// and mostly single threaded only R +pub fn collect_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult { + let new_df = concurrent_handler( + // closure 1: spawned by main thread + // tc is a ThreadCom which any child thread can use to submit R jobs to main thread move |tc| { - //start polars .collect, which it self can spawn many new threads + // get return value let retval = lazy_df.collect(); - //collect done, we're all done know - //drop local and global ThreadCom clone, signals to main/R-serving thread to shut down. + // drop the last two ThreadCom clones, signals to main/R-serving thread to shut down. ThreadCom::kill_global(&CONFIG); drop(tc); - //taadaaa return value retval }, - //closure 2 - //how should the R-serving mainthread handle a user function requst? - //synopsis: run the user-R-function, input/ouput is a Series - |(probj, s): (ParRObj, pl::Series)| -> Result> { - //unpack user-R-function - let f = probj.0.as_function().ok_or_else(|| { - extendr_api::error::Error::Other(format!( - "provided input is not an R function but a: {:?}", - probj.0 - )) - })?; - - // call user-R-function with Series as input, return Robj (likeliy as Series) - let rseries_robj = f.call(pairlist!(Series(s)))?; - - // return of user-R-function may not be Series, return Err if so - let s = Series::any_robj_to_pl_series_result(rseries_robj)?; - - Ok(s) - }, - //CONFIG is "global variable" where threads can request a new ThreadCom + // closure 2: how to serve polars worker R job request in main thread + serve_r, + //CONFIG is "global variable" where any new thread can request a clone of ThreadCom to establish contact with main thread &CONFIG, - ); - //concurrent handling complete - - //bubble on concurrent errors - let res_df = res_res_df.map_err(|err| { - pl::polars_err!( - ComputeError: "error via polars concurrent R handler {:?}", err, - ) - })?; - - //bubble polars errors - let new_df = res_df?; + ) + .map_err(|err| RPolarsErr::new().plain(err.to_string()))? + .map_err(polars_to_rpolars_err); //wrap ok - Ok(DataFrame(new_df)) + Ok(DataFrame(new_df?)) +} + +pub fn profile_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult<(DataFrame, DataFrame)> { + concurrent_handler( + move |tc| { + let retval = lazy_df.profile(); + ThreadCom::kill_global(&CONFIG); + drop(tc); + retval + }, + serve_r, + &CONFIG, + ) + .map_err(|err| RPolarsErr::new().plain(err.to_string()))? + .map_err(polars_to_rpolars_err) + .map(|(result_df, profile_df)| (DataFrame(result_df), DataFrame(profile_df))) } #[derive(Debug, Default)] diff --git a/src/rust/src/lazy/dataframe.rs b/src/rust/src/lazy/dataframe.rs index b17dd3d6f..76f0d103c 100644 --- a/src/rust/src/lazy/dataframe.rs +++ b/src/rust/src/lazy/dataframe.rs @@ -1,14 +1,15 @@ -use crate::concurrent::{handle_thread_r_requests, PolarsBackgroundHandle}; +use crate::concurrent::{ + collect_with_r_func_support, profile_with_r_func_support, PolarsBackgroundHandle, +}; use crate::conversion::strings_to_smartstrings; use crate::lazy::dsl::*; +use crate::rdataframe::DataFrame as RDF; use crate::rdatatype::new_join_type; use crate::rdatatype::new_quantile_interpolation_option; use crate::rdatatype::new_unique_keep_strategy; use crate::rdatatype::{new_asof_strategy, RPolarsDataType}; use crate::robj_to; -use crate::rpolarserr::rerr; -use crate::rpolarserr::RResult; -use crate::rpolarserr::{Rctx, WithRctx}; +use crate::rpolarserr::{rerr, RResult, Rctx, WithRctx}; use crate::utils::wrappers::null_to_opt; use crate::utils::{r_result_list, try_f64_into_usize}; use extendr_api::prelude::*; @@ -66,23 +67,8 @@ impl LazyFrame { PolarsBackgroundHandle::new(self) } - pub fn collect(&self) -> Result { - handle_thread_r_requests(self.clone().0).map_err(|err| { - //improve err messages - let err_string = match err { - pl::PolarsError::InvalidOperation(x) => { - format!("Something (Likely a Column) named {:?} was not found", x) - } - x => format!("{:?}", x), - }; - - format!("when calling $collect() on LazyFrame:\n{}", err_string) - }) - } - - pub fn collect_handled(&self) -> crate::rpolarserr::RResult { - use crate::rpolarserr::WithRctx; - handle_thread_r_requests(self.clone().0).when("calling $collect() on LazyFrame") + pub fn collect(&self) -> RResult { + collect_with_r_func_support(self.clone().0) } fn first(&self) -> Self { @@ -392,6 +378,35 @@ impl LazyFrame { )) } + #[allow(clippy::too_many_arguments)] + fn optimization_toggle( + &self, + type_coercion: Robj, + predicate_pushdown: Robj, + projection_pushdown: Robj, + simplify_expr: Robj, + slice_pushdown: Robj, + cse: Robj, + streaming: Robj, + ) -> RResult { + let ldf = self + .0 + .clone() + .with_type_coercion(robj_to!(bool, type_coercion)?) + .with_predicate_pushdown(robj_to!(bool, predicate_pushdown)?) + .with_simplify_expr(robj_to!(bool, simplify_expr)?) + .with_slice_pushdown(robj_to!(bool, slice_pushdown)?) + .with_streaming(robj_to!(bool, streaming)?) + .with_projection_pushdown(robj_to!(bool, projection_pushdown)?) + .with_common_subplan_elimination(robj_to!(bool, cse)?); + + Ok(ldf.into()) + } + + fn profile(&self) -> RResult { + profile_with_r_func_support(self.0.clone()).map(|(r, p)| list!(result = r, profile = p)) + } + fn explode(&self, columns: Robj, dotdotdot_args: Robj) -> RResult { let mut columns: Vec = robj_to!(Vec, PLExprCol, columns)?; let mut ddd_args: Vec = robj_to!(Vec, PLExprCol, dotdotdot_args)?; diff --git a/src/rust/src/rdataframe/mod.rs b/src/rust/src/rdataframe/mod.rs index b9090fa52..bb509f04b 100644 --- a/src/rust/src/rdataframe/mod.rs +++ b/src/rust/src/rdataframe/mod.rs @@ -279,7 +279,7 @@ impl DataFrame { } pub fn select(&mut self, exprs: Robj) -> RResult { - self.lazy().select(exprs)?.collect_handled() + self.lazy().select(exprs)?.collect() } //used in GroupBy, not DataFrame @@ -288,7 +288,7 @@ impl DataFrame { group_exprs: Robj, agg_exprs: Robj, maintain_order: Robj, - ) -> Result { + ) -> RResult { let group_exprs: Vec = robj_to!(VecPLExprCol, group_exprs)?; let agg_exprs: Vec = robj_to!(VecPLExprCol, agg_exprs)?; let maintain_order = robj_to!(Option, bool, maintain_order)?.unwrap_or(false); diff --git a/src/rust/src/rlib.rs b/src/rust/src/rlib.rs index 1c2606fb3..2f864d79f 100644 --- a/src/rust/src/rlib.rs +++ b/src/rust/src/rlib.rs @@ -250,6 +250,12 @@ pub fn clone_robj(robj: Robj) -> Robj { fn test_robj_to_usize(robj: Robj) -> RResult { robj_to!(usize, robj).map(rdbg) } + +#[extendr] +fn test_robj_to_f64(robj: Robj) -> RResult { + robj_to!(f64, robj).map(rdbg) +} + #[extendr] fn test_robj_to_i64(robj: Robj) -> RResult { robj_to!(i64, robj).map(rdbg) @@ -260,6 +266,11 @@ fn test_robj_to_u32(robj: Robj) -> RResult { robj_to!(u32, robj).map(rdbg) } +#[extendr] +fn test_robj_to_i32(robj: Robj) -> RResult { + robj_to!(i32, robj).map(rdbg) +} + #[extendr] fn test_print_string(s: String) { rprintln!("{}", s); @@ -303,8 +314,10 @@ extendr_module! { fn clone_robj; fn test_robj_to_usize; + fn test_robj_to_f64; fn test_robj_to_i64; fn test_robj_to_u32; + fn test_robj_to_i32; fn test_print_string; fn test_robj_to_expr; fn test_wrong_call_pl_lit; diff --git a/src/rust/src/utils/mod.rs b/src/rust/src/utils/mod.rs index 2fe5ac2d0..93345d221 100644 --- a/src/rust/src/utils/mod.rs +++ b/src/rust/src/utils/mod.rs @@ -248,6 +248,8 @@ const R_MIN_INTEGERISH: f64 = -4503599627370496.0; //const I64_MAX_INTO_F64: f64 = i64::MAX as f64; const USIZE_MAX_INTO_F64: f64 = usize::MAX as f64; const U32_MAX_INTO_F64: f64 = u32::MAX as f64; +const I32_MIN_INTO_F64: f64 = i32::MIN as f64; +const I32_MAX_INTO_F64: f64 = i32::MAX as f64; pub const BIT64_NA_ECODING: i64 = -9223372036854775808i64; const WITHIN_INT_MAX: &str = @@ -256,6 +258,8 @@ const WITHIN_INT_MIN: &str = "cannot exceeds double->integer unambigious conversion bound of -(2^52)= -4503599627370496.0"; const WITHIN_USIZE_MAX: &str = "cannot exceed the upper bound for usize"; const WITHIN_U32_MAX: &str = "cannot exceed the upper bound for u32 of 4294967295"; +const WITHIN_I32_MIN: &str = "cannot exceed the upper bound for i32 of 2147483647"; +const WITHIN_I32_MAX: &str = "cannot exceed the upper lower for i32 of -2147483648"; const WITHIN_U8_MAX: &str = "cannot exceed the upper bound for u8 of 255"; const NOT_NAN: &str = "cannot be NaN"; const NO_LESS_THAN_ONE: &str = "cannot be less than one"; @@ -299,17 +303,6 @@ pub fn try_f64_into_i64(x: f64) -> RResult { _ if x.is_nan() => base_err.misvalued(NOT_NAN), _ if x < R_MIN_INTEGERISH => base_err.misvalued(WITHIN_INT_MIN), _ if x > R_MAX_INTEGERISH => base_err.misvalued(WITHIN_INT_MAX), - // should not matter - // _ if x > I64_MAX_INTO_F64 => Err(format!( - // "the value {} cannot exceed i64::MAX {}", - // x, - // i64::MAX - // )), - // _ if x < I64_MIN_INTO_F64 => Err(format!( - // "the value {} cannot exceed i64::MIN {}", - // x, - // i64::MIN - // )) _ => Ok(x as i64), } } @@ -324,6 +317,16 @@ pub fn try_f64_into_u32(x: f64) -> RResult { } } +pub fn try_f64_into_i32(x: f64) -> RResult { + let f_base_err = || rerr().bad_val(rdbg(x)); + match x { + _ if x.is_nan() => f_base_err().misvalued(NOT_NAN), + _ if x < I32_MIN_INTO_F64 => f_base_err().misvalued(WITHIN_I32_MIN), + _ if x > I32_MAX_INTO_F64 => f_base_err().misvalued(WITHIN_I32_MAX), + _ => Ok(x as i32), + } +} + pub fn try_i64_into_u64(x: i64) -> RResult { let base_err = rerr().bad_val(rdbg(x)); match x { @@ -349,6 +352,15 @@ pub fn try_i64_into_u32(x: i64) -> RResult { } } +pub fn try_i64_into_i32(x: i64) -> RResult { + let f_base_err = || rerr().bad_val(rdbg(x)); + match x { + _ if x < i32::MIN as i64 => f_base_err().misvalued(WITHIN_I32_MIN), + _ if x > i32::MAX as i64 => f_base_err().misvalued(WITHIN_I32_MAX), + _ => Ok(x as i32), + } +} + pub fn try_i64_into_u8(x: i64) -> RResult { let base_err = rerr().bad_val(rdbg(x)); match x { @@ -466,6 +478,21 @@ pub fn unpack_r_result_list(robj: extendr_api::Robj) -> RResult { res } +//None if not real or Na. +pub fn robj_bit64_to_opt_i64(robj: Robj) -> Option { + robj.as_real() + .and_then(|v| i64::try_from(v.to_bits()).ok()) + .filter(|val| *val != crate::utils::BIT64_NA_ECODING) +} + +pub fn robj_parse_str_to_t(robj: Robj) -> RResult +where + T: std::str::FromStr, + ::Err: std::error::Error, +{ + Ok(robj.as_str().unwrap_or("").parse::()?) +} + pub fn robj_to_char(robj: extendr_api::Robj) -> RResult { let robj = unpack_r_result_list(robj)?; let mut fchar_iter = if let Some(char_str) = robj.as_str() { @@ -501,28 +528,79 @@ pub fn robj_to_usize(robj: extendr_api::Robj) -> RResult { robj_to_u64(robj).and_then(try_u64_into_usize) } +fn err_no_nan() -> RResult { + rerr().plain("any NA value is not allowed here".to_string()) +} + +fn err_no_scalar() -> RResult { + rerr().plain("only a scalar value is allowed here (length = 1)") +} + pub fn robj_to_i64(robj: extendr_api::Robj) -> RResult { let robj = unpack_r_result_list(robj)?; use extendr_api::*; + + return match (robj.rtype(), robj.len()) { + (_, 0 | 2..) => Some(err_no_scalar()), + (Rtype::Strings, 1) => Some(robj_parse_str_to_t(robj.clone())), + (Rtype::Doubles, 1) if robj.inherits("integer64") => { + robj_bit64_to_opt_i64(robj.clone()).map(Ok) + } + (Rtype::Doubles, 1) => robj.as_real().map(try_f64_into_i64), + (Rtype::Integers, 1) => robj.as_integer().map(i64::from).map(Ok), + + (_, _) => { + Some(rerr().plain("does not support this R type for this conversion".to_string())) + } + } + .unwrap_or_else(err_no_nan) + .bad_robj(&robj) + .mistyped(tn::()) + .when("converting into type"); +} + +pub fn robj_to_i32(robj: extendr_api::Robj) -> RResult { + let robj = unpack_r_result_list(robj)?; + use extendr_api::*; + return match (robj.rtype(), robj.len()) { - (Rtype::Strings, 1) => robj - .as_str() - .unwrap_or("") - .parse::() - .ok(), - //specialized integer64 conversion - (Rtype::Doubles, 1) if robj.inherits("integer64") => robj - .as_real() - .and_then(|v| i64::try_from(v.to_bits()).ok()) - .filter(|val| *val != crate::utils::BIT64_NA_ECODING), - //from R doubles or integers - (Rtype::Doubles, 1) => robj.as_real().and_then(|v| try_f64_into_i64(v).ok()), - (Rtype::Integers, 1) => robj.as_integer().map(i64::from), - (_, _) => None, + (_, 0 | 2..) => Some(err_no_scalar()), + (Rtype::Strings, 1) => Some(robj_parse_str_to_t(robj.clone())), + (Rtype::Doubles, 1) if robj.inherits("integer64") => { + robj_bit64_to_opt_i64(robj.clone()).map(try_i64_into_i32) + } + (Rtype::Doubles, 1) => robj.as_real().map(try_f64_into_i32), + (Rtype::Integers, 1) => robj.as_integer().map(i32::from).map(Ok), + (_, _) => { + Some(rerr().plain("does not support this R type for this conversion".to_string())) + } } - .ok_or(RPolarsErr::new()) + .unwrap_or_else(err_no_nan) .bad_robj(&robj) - .mistyped(tn::()); + .mistyped(tn::()) + .when("converting into type"); +} + +pub fn robj_to_f64(robj: extendr_api::Robj) -> RResult { + let robj = unpack_r_result_list(robj)?; + use extendr_api::*; + + return match (robj.rtype(), robj.len()) { + (_, 0 | 2..) => Some(err_no_scalar()), + (Rtype::Strings, 1) => Some(robj_parse_str_to_t(robj.clone())), + (Rtype::Doubles, 1) if robj.inherits("integer64") => { + robj_bit64_to_opt_i64(robj.clone()).map(|v| Ok(v as f64)) + } + (Rtype::Doubles, 1) => robj.as_real().map(Ok), + (Rtype::Integers, 1) => robj.as_integer().map(|v| Ok(v as f64)), + (_, _) => { + Some(rerr().plain("does not support this R type for this conversion".to_string())) + } + } + .unwrap_or_else(err_no_nan) + .bad_robj(&robj) + .mistyped(tn::()) + .when("converting into type"); } pub fn robj_to_u64(robj: extendr_api::Robj) -> RResult { @@ -692,10 +770,18 @@ macro_rules! robj_to_inner { $crate::utils::robj_to_usize($a) }; + (f64, $a:ident) => { + $crate::utils::robj_to_f64($a) + }; + (i64, $a:ident) => { $crate::utils::robj_to_i64($a) }; + (i32, $a:ident) => { + $crate::utils::robj_to_i32($a) + }; + (u64, $a:ident) => { $crate::utils::robj_to_u64($a) }; diff --git a/tests/testthat/test-bit64.R b/tests/testthat/test-bit64.R index 26b704ba7..cf32654bc 100644 --- a/tests/testthat/test-bit64.R +++ b/tests/testthat/test-bit64.R @@ -23,11 +23,22 @@ test_that("from r to series and reverse", { test_that("robj_to! from bit64", { testthat::skip_if_not_installed("bit64") + + expect_identical( + unwrap(test_robj_to_f64(bit64::as.integer64(1))), + "1.0" + ) + expect_identical( unwrap(test_robj_to_i64(bit64::as.integer64(1))), as.character(bit64::as.integer64(1)) ) + expect_identical( + unwrap(test_robj_to_i32(bit64::as.integer64(2^27))), + as.character(2^27) + ) + expect_identical( unwrap(test_robj_to_u32(bit64::as.integer64(2^27))), as.character(2^27) @@ -44,34 +55,57 @@ test_that("robj_to! from bit64", { ) # NO NA + + expect_rpolarserr( + unwrap(test_robj_to_f64(bit64::NA_integer64_), call = NULL), + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") + ) + + expect_rpolarserr( + unwrap(test_robj_to_i64(bit64::NA_integer64_), call = NULL), + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") + ) + expect_rpolarserr( - unwrap(test_robj_to_i64(bit64::as.integer64(NA)), call = NULL), - c("BadArgument", "TypeMismatch", "BadValue") + unwrap(test_robj_to_i32(bit64::NA_integer64_), call = NULL), + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") ) + expect_rpolarserr( - unwrap(test_robj_to_usize(bit64::as.integer64(NA)), call = NULL), - c("BadArgument", "TypeMismatch", "BadValue") + unwrap(test_robj_to_usize(bit64::NA_integer64_), call = NULL), + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") ) # NO OVERFLOW expect_rpolarserr( - unwrap(test_robj_to_u32(2^57), call = NULL), - c("BadArgument", "TypeMismatch", "BadValue") + unwrap(test_robj_to_u32(2^57)), + c("BadArgument", "When", "TypeMismatch", "BadValue", "ValueOutOfScope", "BadValue") + ) + + expect_rpolarserr( + unwrap(test_robj_to_i32(2^37), call = NULL), + c("BadArgument", "When", "TypeMismatch", "BadValue", "ValueOutOfScope", "BadValue") ) # NO NEGATIVE expect_rpolarserr( unwrap(test_robj_to_usize(bit64::as.integer64(-1)), call = NULL), - c("BadArgument", "TypeMismatch", "BadValue") + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") ) expect_rpolarserr( unwrap(test_robj_to_u32(bit64::as.integer64(-1)), call = NULL), - c("BadArgument", "TypeMismatch", "BadValue") + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") ) # NO length>1 expect_rpolarserr( unwrap(test_robj_to_usize(bit64::as.integer64(c(1:2))), call = NULL), - c("BadArgument", "TypeMismatch", "BadValue") + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") + ) + + # NO length<1 + expect_rpolarserr( + unwrap(test_robj_to_usize(bit64::as.integer64(numeric(0))), call = NULL), + c("BadArgument", "When", "TypeMismatch", "BadValue", "PlainErrorMessage") ) }) diff --git a/tests/testthat/test-lazy_profile.R b/tests/testthat/test-lazy_profile.R new file mode 100644 index 000000000..90a53e5a7 --- /dev/null +++ b/tests/testthat/test-lazy_profile.R @@ -0,0 +1,37 @@ +test_that("can modify lazy profile settings", { + # TODO: some way to check if .pr$LazyFrame$optimization_toggle works + + # toggle settings + # read back settings + # compare + expect_identical(1, 1) +}) + +test_that("$profile", { + # profile minimal test + p0 = pl$LazyFrame()$select(pl$lit(1:3)$alias("x"))$profile() + expect_true(inherits(p0, "list")) + expect_identical(p0$result$to_list(), list(x = 1:3)) + expect_identical(p0$profile$columns, c("node", "start", "end")) + + + # profile supports with and without R functions + p1 = pl$LazyFrame(iris)$ + sort("Sepal.Length")$ + groupby("Species", maintain_order = TRUE)$ + agg(pl$col(pl$Float64)$first()$add(5)$suffix("_apply"))$ + profile() + + r_func = \(s) s$to_r()[1] + 5 + p2 = pl$LazyFrame(iris)$ + sort("Sepal.Length")$ # for no specific reason + groupby("Species", maintain_order = TRUE)$ + agg(pl$col(pl$Float64)$apply(r_func))$ + profile() + + # map each Species-group with native polars, takes ~120us better + expect_identical( + p2$result$as_data_frame(), + p1$result$as_data_frame() + ) +})