Skip to content

Commit

Permalink
Implement lazyframe profiling and optimization toggles (#323)
Browse files Browse the repository at this point in the history
Co-authored-by: sorhawell <[email protected]>
Co-authored-by: Etienne Bacher <[email protected]>
  • Loading branch information
3 people authored Aug 8, 2023
1 parent 35d6af0 commit b49e629
Show file tree
Hide file tree
Showing 16 changed files with 520 additions and 129 deletions.
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
into one (#349).
- New stat functions `pl$cov()`, `pl$rolling_cov()` `pl$corr()`, `pl$rolling_corr()` (#351).
- Fix bug to allow using polars without library(polars) (#355).

- New methods `<LazyFrame>$optimization_toggle()` + `$profile()` and enable rust-polars feature
CSE: "Activate common subplan elimination optimization" (#323)

# polars 0.7.0

## BREAKING CHANGES
Expand Down
3 changes: 1 addition & 2 deletions R/expr__expr.R
Original file line number Diff line number Diff line change
Expand Up @@ -1120,8 +1120,7 @@ Expr_map_alias = function(fun) {
) {
assign(".warn_map_alias", 1L, envir = runtime_state)
# it does not seem map alias is executed multi-threaded but rather immediately during building lazy query
# if ever crashing, any lazy method like select, filter, with_columns must use something like handle_thread_r_requests()
# then handle_thread_r_requests should be rewritten to handle any type.
# if ever crashing, any lazy method like select, filter, with_columns must use something like filter_with_r_func_support()
message("map_alias function is experimentally without some thread-safeguards, please report any crashes") # TODO resolve
}
if (!is.function(fun)) pstop(err = "alias_map fun must be a function")
Expand Down
10 changes: 8 additions & 2 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ clone_robj <- function(robj) .Call(wrap__clone_robj, robj)

test_robj_to_usize <- function(robj) .Call(wrap__test_robj_to_usize, robj)

test_robj_to_f64 <- function(robj) .Call(wrap__test_robj_to_f64, robj)

test_robj_to_i64 <- function(robj) .Call(wrap__test_robj_to_i64, robj)

test_robj_to_u32 <- function(robj) .Call(wrap__test_robj_to_u32, robj)

test_robj_to_i32 <- function(robj) .Call(wrap__test_robj_to_i32, robj)

test_print_string <- function(s) invisible(.Call(wrap__test_print_string, s))

test_robj_to_expr <- function(robj) .Call(wrap__test_robj_to_expr, robj)
Expand Down Expand Up @@ -899,8 +903,6 @@ LazyFrame$collect_background <- function() .Call(wrap__LazyFrame__collect_backgr

LazyFrame$collect <- function() .Call(wrap__LazyFrame__collect, self)

LazyFrame$collect_handled <- function() .Call(wrap__LazyFrame__collect_handled, self)

LazyFrame$first <- function() .Call(wrap__LazyFrame__first, self)

LazyFrame$last <- function() .Call(wrap__LazyFrame__last, self)
Expand Down Expand Up @@ -967,6 +969,10 @@ LazyFrame$rename <- function(existing, new) .Call(wrap__LazyFrame__rename, self,

LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self)

LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, cse, streaming) .Call(wrap__LazyFrame__optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, cse, streaming)

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)

LazyFrame$explode <- function(columns, dotdotdot_args) .Call(wrap__LazyFrame__explode, self, columns, dotdotdot_args)

LazyFrame$clone_see_me_macro <- function() .Call(wrap__LazyFrame__clone_see_me_macro, self)
Expand Down
114 changes: 108 additions & 6 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,79 @@ LazyFrame_filter = "use_extendr_wrapper"

#' @title New DataFrame from LazyFrame_object$collect()
#' @description collect DataFrame by lazy query
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param projection_pushdown Boolean. Applies filters as early as possible / at
#' scan level.
#' @param simplify_expression Boolean. Cache subtrees/file scans that are used
#' by multiple subtrees in the query plan.
#' @param slice_pushdown Boolean. Only load the required slice from the scan
#' level. Don't materialize sliced outputs (e.g. `join$head(10)`).
#' @param common_subplan_elimination Boolean. Cache subtrees/file scans that
#' are used by multiple subtrees in the query plan.
#' @param no_optimization Boolean. Turn off the following optimizations:
#' predicate_pushdown = FALSE
#' projection_pushdown = FALSE
#' slice_pushdown = FALSE
#' common_subplan_elimination = FALSE
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @param collect_in_background Boolean. Detach this query from R session.
#' Computation will start in background. Get a handle which later can be converted
#' into the resulting DataFrame. Useful in interactive mode to not lock R session.
#' @details
#' Note: use `$fetch(n)` if you want to run your query on the first `n` rows only.
#' This can be a huge time saver in debugging queries.
#' @keywords LazyFrame DataFrame_new
#' @return collected `DataFrame`
#' @examples pl$DataFrame(iris)$lazy()$filter(pl$col("Species") == "setosa")$collect()
LazyFrame_collect = function() {
unwrap(.pr$LazyFrame$collect_handled(self), "in $collect():")
#' @return A `DataFrame`
#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect()
LazyFrame_collect = function(
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
common_subplan_elimination = TRUE,
no_optimization = FALSE,
streaming = FALSE,
collect_in_background = FALSE) {

if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
common_subplan_elimination = FALSE
}

if (isTRUE(streaming)) {
common_subplan_elimination = FALSE
}

collect_f = if (isTRUE(collect_in_background)) {
.pr$LazyFrame$collect_background
} else {
.pr$LazyFrame$collect
}

self |>
.pr$LazyFrame$optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
common_subplan_elimination,
streaming
) |>
and_then(collect_f) |>
unwrap("in $collect():")
}

#' @title New DataFrame from LazyFrame_object$collect()
#' @description collect DataFrame by lazy query
#' @description collect DataFrame by lazy query (SOFT DEPRECATED)
#' @details This function is soft deprecated. Use $collect(collect_in_background = TRUE) instead
#' @keywords LazyFrame DataFrame_new
#' @return collected `DataFrame`
#' @examples pl$DataFrame(iris)$lazy()$filter(pl$col("Species") == "setosa")$collect()
Expand Down Expand Up @@ -926,6 +990,45 @@ LazyFrame_dtypes = method_as_property(function() {
unwrap("in $dtypes()")
})

#' @title Collect and profile a lazy query.
#' @description This will run the query and return a list containing the materialized DataFrame and
#' a DataFrame that contains profiling information of each node that is executed.
#' @details The units of the timings are microseconds.
#'
#' @keywords LazyFrame
#' @return List of two `DataFrame`s: one with the collected result, the other with the timings of each step.
#' @examples
#'
#' ## Simplest use case
#' pl$LazyFrame()$select(pl$lit(2) + 2)$profile()
#'
#' ## Use $profile() to compare two queries
#'
#' # -1- map each Species-group with native polars, takes ~120us only
#' pl$LazyFrame(iris)$
#' sort("Sepal.Length")$
#' groupby("Species", maintain_order = TRUE)$
#' agg(pl$col(pl$Float64)$first() + 5)$
#' profile()
#'
#' # -2- map each Species-group of each numeric column with an R function, takes ~7000us (slow!)
#'
#' # some R function, prints `.` for each time called by polars
#' r_func = \(s) {
#' cat(".")
#' s$to_r()[1] + 5
#' }
#'
#' pl$LazyFrame(iris)$
#' sort("Sepal.Length")$
#' groupby("Species", maintain_order = TRUE)$
#' agg(pl$col(pl$Float64)$apply(r_func))$
#' profile()
#'
LazyFrame_profile = function() {
.pr$LazyFrame$profile(self) |> unwrap("in $profile()")
}

#' @title Explode the DataFrame to long format by exploding the given columns
#' @keywords LazyFrame
#'
Expand Down Expand Up @@ -954,7 +1057,6 @@ LazyFrame_explode = function(columns = list(), ...) {
#' @return A LazyFrame
#' @examples
#' pl$LazyFrame(mtcars)$clone()

LazyFrame_clone = function() {
.pr$LazyFrame$clone_see_me_macro(self)
}
8 changes: 4 additions & 4 deletions man/Expr_pow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 49 additions & 3 deletions man/LazyFrame_collect.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion man/LazyFrame_collect_background.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions man/LazyFrame_profile.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ features = [
"repeat_by",
"interpolate",
#"list",
"cse",
"ewma",
"rank",
"diff",
Expand Down
Loading

0 comments on commit b49e629

Please sign in to comment.