Skip to content

Commit

Permalink
Implement lazyframe profiling and optimization toggles
Browse files Browse the repository at this point in the history
  • Loading branch information
Sicheng-Pan committed Jul 12, 2023
1 parent 67865a0 commit 2c9aac8
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
18 changes: 18 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,24 @@ LazyFrame$rename <- function(existing, new) .Call(wrap__LazyFrame__rename, self,

LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self)

LazyFrame$without_optimization <- function() .Call(wrap__LazyFrame__without_optimization, self)

LazyFrame$with_projection_pushdown <- function(toggle) .Call(wrap__LazyFrame__with_projection_pushdown, self, toggle)

LazyFrame$with_predicate_pushdown <- function(toggle) .Call(wrap__LazyFrame__with_predicate_pushdown, self, toggle)

LazyFrame$with_type_coercion <- function(toggle) .Call(wrap__LazyFrame__with_type_coercion, self, toggle)

LazyFrame$with_simplify_expr <- function(toggle) .Call(wrap__LazyFrame__with_simplify_expr, self, toggle)

LazyFrame$with_slice_pushdown <- function(toggle) .Call(wrap__LazyFrame__with_slice_pushdown, self, toggle)

LazyFrame$with_common_subplan_elimination <- function(toggle) .Call(wrap__LazyFrame__with_common_subplan_elimination, self, toggle)

LazyFrame$with_streaming <- function(toggle) .Call(wrap__LazyFrame__with_streaming, self, toggle)

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)

#' @export
`$.LazyFrame` <- function (self, name) { func <- LazyFrame[[name]]; environment(func) <- environment(); func }

Expand Down
67 changes: 63 additions & 4 deletions src/rust/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::concurrent::{handle_thread_r_requests, PolarsBackgroundHandle};
use crate::conversion::strings_to_smartstrings;
use crate::lazy::dsl::*;
use crate::rdataframe::DataFrame as RDF;
use crate::rdatatype::new_join_type;
use crate::rdatatype::new_quantile_interpolation_option;
use crate::rdatatype::new_unique_keep_strategy;
use crate::rdatatype::{new_asof_strategy, RPolarsDataType};
use crate::robj_to;
use crate::rpolarserr::RResult;
use crate::rpolarserr::{Rctx, WithRctx};
use crate::rpolarserr::{polars_to_rpolars_err, RResult, Rctx, WithRctx};
use crate::utils::wrappers::null_to_opt;
use crate::utils::{r_result_list, try_f64_into_usize};
use extendr_api::prelude::*;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl LazyFrame {
PolarsBackgroundHandle::new(self)
}

pub fn collect(&self) -> Result<crate::rdataframe::DataFrame, String> {
pub fn collect(&self) -> Result<RDF, String> {
handle_thread_r_requests(self.clone().0).map_err(|err| {
//improve err messages
let err_string = match err {
Expand All @@ -79,7 +79,7 @@ impl LazyFrame {
})
}

pub fn collect_handled(&self) -> crate::rpolarserr::RResult<crate::rdataframe::DataFrame> {
pub fn collect_handled(&self) -> RResult<RDF> {
use crate::rpolarserr::WithRctx;
handle_thread_r_requests(self.clone().0).when("calling $collect() on LazyFrame")
}
Expand Down Expand Up @@ -377,6 +377,65 @@ impl LazyFrame {
pairs.map(|(name, ty)| (name, RPolarsDataType(ty.clone()))),
))
}

fn without_optimization(&self) -> Self {
self.0.clone().without_optimizations().into()
}

fn with_projection_pushdown(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(
self.0
.clone()
.with_projection_pushdown(robj_to!(bool, toggle)?),
))
}

fn with_predicate_pushdown(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(
self.0
.clone()
.with_predicate_pushdown(robj_to!(bool, toggle)?),
))
}

fn with_type_coercion(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(
self.0.clone().with_type_coercion(robj_to!(bool, toggle)?),
))
}

fn with_simplify_expr(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(
self.0.clone().with_simplify_expr(robj_to!(bool, toggle)?),
))
}

fn with_slice_pushdown(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(
self.0.clone().with_slice_pushdown(robj_to!(bool, toggle)?),
))
}

fn with_common_subplan_elimination(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(
self.0
.clone()
.with_common_subplan_elimination(robj_to!(bool, toggle)?),
))
}

fn with_streaming(&self, toggle: Robj) -> RResult<Self> {
Ok(Self(self.0.clone().with_streaming(robj_to!(bool, toggle)?)))
}

fn profile(&self) -> RResult<Pairlist> {
self.0
.clone()
.profile()
.map(|(r, p)| pairlist!(result = RDF(r), profile = RDF(p)))
.map_err(polars_to_rpolars_err)
.when("profiling the LazyFrame")
}
}

#[derive(Clone)]
Expand Down

0 comments on commit 2c9aac8

Please sign in to comment.