From 2c9aac8852d5e1444e839fa96f858c718ecdd82a Mon Sep 17 00:00:00 2001 From: Sicheng Pan Date: Wed, 12 Jul 2023 14:46:54 -0700 Subject: [PATCH] Implement lazyframe profiling and optimization toggles --- R/extendr-wrappers.R | 18 +++++++++ src/rust/src/lazy/dataframe.rs | 67 ++++++++++++++++++++++++++++++++-- 2 files changed, 81 insertions(+), 4 deletions(-) diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index cde638634..26c7406f3 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -945,6 +945,24 @@ LazyFrame$rename <- function(existing, new) .Call(wrap__LazyFrame__rename, self, LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self) +LazyFrame$without_optimization <- function() .Call(wrap__LazyFrame__without_optimization, self) + +LazyFrame$with_projection_pushdown <- function(toggle) .Call(wrap__LazyFrame__with_projection_pushdown, self, toggle) + +LazyFrame$with_predicate_pushdown <- function(toggle) .Call(wrap__LazyFrame__with_predicate_pushdown, self, toggle) + +LazyFrame$with_type_coercion <- function(toggle) .Call(wrap__LazyFrame__with_type_coercion, self, toggle) + +LazyFrame$with_simplify_expr <- function(toggle) .Call(wrap__LazyFrame__with_simplify_expr, self, toggle) + +LazyFrame$with_slice_pushdown <- function(toggle) .Call(wrap__LazyFrame__with_slice_pushdown, self, toggle) + +LazyFrame$with_common_subplan_elimination <- function(toggle) .Call(wrap__LazyFrame__with_common_subplan_elimination, self, toggle) + +LazyFrame$with_streaming <- function(toggle) .Call(wrap__LazyFrame__with_streaming, self, toggle) + +LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self) + #' @export `$.LazyFrame` <- function (self, name) { func <- LazyFrame[[name]]; environment(func) <- environment(); func } diff --git a/src/rust/src/lazy/dataframe.rs b/src/rust/src/lazy/dataframe.rs index c8a622788..2cea44d8a 100644 --- a/src/rust/src/lazy/dataframe.rs +++ b/src/rust/src/lazy/dataframe.rs @@ -1,13 +1,13 @@ use crate::concurrent::{handle_thread_r_requests, PolarsBackgroundHandle}; use crate::conversion::strings_to_smartstrings; use crate::lazy::dsl::*; +use crate::rdataframe::DataFrame as RDF; use crate::rdatatype::new_join_type; use crate::rdatatype::new_quantile_interpolation_option; use crate::rdatatype::new_unique_keep_strategy; use crate::rdatatype::{new_asof_strategy, RPolarsDataType}; use crate::robj_to; -use crate::rpolarserr::RResult; -use crate::rpolarserr::{Rctx, WithRctx}; +use crate::rpolarserr::{polars_to_rpolars_err, RResult, Rctx, WithRctx}; use crate::utils::wrappers::null_to_opt; use crate::utils::{r_result_list, try_f64_into_usize}; use extendr_api::prelude::*; @@ -65,7 +65,7 @@ impl LazyFrame { PolarsBackgroundHandle::new(self) } - pub fn collect(&self) -> Result { + pub fn collect(&self) -> Result { handle_thread_r_requests(self.clone().0).map_err(|err| { //improve err messages let err_string = match err { @@ -79,7 +79,7 @@ impl LazyFrame { }) } - pub fn collect_handled(&self) -> crate::rpolarserr::RResult { + pub fn collect_handled(&self) -> RResult { use crate::rpolarserr::WithRctx; handle_thread_r_requests(self.clone().0).when("calling $collect() on LazyFrame") } @@ -377,6 +377,65 @@ impl LazyFrame { pairs.map(|(name, ty)| (name, RPolarsDataType(ty.clone()))), )) } + + fn without_optimization(&self) -> Self { + self.0.clone().without_optimizations().into() + } + + fn with_projection_pushdown(&self, toggle: Robj) -> RResult { + Ok(Self( + self.0 + .clone() + .with_projection_pushdown(robj_to!(bool, toggle)?), + )) + } + + fn with_predicate_pushdown(&self, toggle: Robj) -> RResult { + Ok(Self( + self.0 + .clone() + .with_predicate_pushdown(robj_to!(bool, toggle)?), + )) + } + + fn with_type_coercion(&self, toggle: Robj) -> RResult { + Ok(Self( + self.0.clone().with_type_coercion(robj_to!(bool, toggle)?), + )) + } + + fn with_simplify_expr(&self, toggle: Robj) -> RResult { + Ok(Self( + self.0.clone().with_simplify_expr(robj_to!(bool, toggle)?), + )) + } + + fn with_slice_pushdown(&self, toggle: Robj) -> RResult { + Ok(Self( + self.0.clone().with_slice_pushdown(robj_to!(bool, toggle)?), + )) + } + + fn with_common_subplan_elimination(&self, toggle: Robj) -> RResult { + Ok(Self( + self.0 + .clone() + .with_common_subplan_elimination(robj_to!(bool, toggle)?), + )) + } + + fn with_streaming(&self, toggle: Robj) -> RResult { + Ok(Self(self.0.clone().with_streaming(robj_to!(bool, toggle)?))) + } + + fn profile(&self) -> RResult { + self.0 + .clone() + .profile() + .map(|(r, p)| pairlist!(result = RDF(r), profile = RDF(p))) + .map_err(polars_to_rpolars_err) + .when("profiling the LazyFrame") + } } #[derive(Clone)]