Skip to content

Commit

Permalink
Unit test optimization toggles from LazyFrame (#405)
Browse files Browse the repository at this point in the history
Co-authored-by: sorhawell <[email protected]>
  • Loading branch information
Sicheng-Pan and sorhawell authored Oct 10, 2023
1 parent abb4bd1 commit b64a700
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 75 deletions.
4 changes: 3 additions & 1 deletion R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,9 @@ LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self)

LazyFrame$fetch <- function(n_rows) .Call(wrap__LazyFrame__fetch, self, n_rows)

LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming) .Call(wrap__LazyFrame__optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming)
LazyFrame$set_optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming) .Call(wrap__LazyFrame__set_optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming)

LazyFrame$get_optimization_toggle <- function() .Call(wrap__LazyFrame__get_optimization_toggle, self)

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)

Expand Down
129 changes: 94 additions & 35 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,20 @@ LazyFrame_with_row_count = function(name, offset = NULL) {
#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect()
LazyFrame_filter = "use_extendr_wrapper"

#' @title Collect a query into a DataFrame
#' @description `$collect()` performs the query on the LazyFrame. It returns a
#' DataFrame
#' @title Get optimization settings
#' @description Get the current optimization toggles for the lazy query
#' @keywords LazyFrame
#' @return List of optimization toggles
#' @examples
#' pl$LazyFrame(mtcars)$get_optimization_toggle()
LazyFrame_get_optimization_toggle = function() {
self |>
.pr$LazyFrame$get_optimization_toggle()
}

#' @title Configure optimization toggles
#' @description Configure the optimization toggles for the lazy query
#' @keywords LazyFrame
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
Expand All @@ -293,11 +304,42 @@ LazyFrame_filter = "use_extendr_wrapper"
#' occur on self-joins or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and
#' reused.
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @return LazyFrame with specified optimization toggles
#' @examples
#' pl$LazyFrame(mtcars)$set_optimization_toggle(type_coercion = FALSE)
LazyFrame_set_optimization_toggle = function(
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
streaming = FALSE) {
self |>
.pr$LazyFrame$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
streaming
)
}

#' @title Collect a query into a DataFrame
#' @description `$collect()` performs the query on the LazyFrame. It returns a
#' DataFrame
#' @inheritParams LazyFrame_set_optimization_toggle
#' @param no_optimization Boolean. Sets the following parameters to `FALSE`:
#' `predicate_pushdown`, `projection_pushdown`, `slice_pushdown`,
#' `comm_subplan_elim`, `comm_subexpr_elim`.
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @param inherit_optimization Boolean. Use existing optimization settings
#' regardless the settings specified in this function call.
#' @param collect_in_background Boolean. Detach this query from R session.
#' Computation will start in background. Get a handle which later can be converted
#' into the resulting DataFrame. Useful in interactive mode to not lock R session.
Expand Down Expand Up @@ -326,8 +368,9 @@ LazyFrame_collect = function(
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
no_optimization = FALSE,
streaming = FALSE,
no_optimization = FALSE,
inherit_optimization = FALSE,
collect_in_background = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
Expand All @@ -341,14 +384,12 @@ LazyFrame_collect = function(
comm_subplan_elim = FALSE
}

collect_f = if (isTRUE(collect_in_background)) {
\(...) Ok(.pr$LazyFrame$collect_in_background(...))
} else {
.pr$LazyFrame$collect
}
collect_f = ifelse(isTRUE(collect_in_background), \(...) Ok(.pr$LazyFrame$collect_in_background(...)), .pr$LazyFrame$collect)

self |>
.pr$LazyFrame$optimization_toggle(
lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
Expand All @@ -357,8 +398,11 @@ LazyFrame_collect = function(
comm_subplan_elim,
comm_subexpr_elim,
streaming
) |>
and_then(collect_f) |>
) |> unwrap("in $collect():")
}

lf |>
collect_f() |>
unwrap("in $collect():")
}

Expand Down Expand Up @@ -459,26 +503,31 @@ LazyFrame_sink_parquet = function(
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
slice_pushdown = TRUE) {
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}
call_ctx = "in $sink_parquet(...)"
self |>
.pr$LazyFrame$optimization_toggle(

lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim = FALSE,
comm_subexpr_elim = FALSE,
streaming = TRUE
) |>
unwrap(call_ctx) |>
streaming = FALSE
) |> unwrap("in $sink_parquet()")
}

lf |>
.pr$LazyFrame$sink_parquet(
path,
compression,
Expand All @@ -488,7 +537,7 @@ LazyFrame_sink_parquet = function(
data_pagesize_limit,
maintain_order
) |>
unwrap(call_ctx) |>
unwrap("in $sink_parquet()") |>
invisible()
}

Expand Down Expand Up @@ -526,32 +575,37 @@ LazyFrame_sink_ipc = function(
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
slice_pushdown = TRUE) {
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim = FALSE,
comm_subexpr_elim = FALSE,
streaming = TRUE
) |>
unwrap("in $sink_ipc(...)") |>
streaming = FALSE
) |> unwrap("in $sink_ipc()")
}

lf |>
.pr$LazyFrame$sink_ipc(
path,
compression,
maintain_order
) |>
unwrap("in LazyFrame$sink_ipc(...)") |>
unwrap("in $sink_ipc()") |>
invisible()
}

Expand Down Expand Up @@ -1179,8 +1233,9 @@ LazyFrame_fetch = function(
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
streaming = FALSE,
no_optimization = FALSE,
streaming = FALSE) {
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
Expand All @@ -1193,8 +1248,10 @@ LazyFrame_fetch = function(
comm_subplan_elim = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
Expand All @@ -1203,8 +1260,10 @@ LazyFrame_fetch = function(
comm_subplan_elim,
comm_subexpr_elim,
streaming
) |>
and_then(\(self) .pr$LazyFrame$fetch(self, n_rows)) |>
) |> unwrap("in $fetch()")
}

.pr$LazyFrame$fetch(lf, n_rows) |>
unwrap("in $fetch()")
}

Expand Down
4 changes: 2 additions & 2 deletions R/rust_result.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Err = function(x) {
#' @param f a closure that takes the err part as input
#' @return same R object wrapped in a Err-result
map_err = function(x, f) {
if (is_err(x)) x$err = f(x$err)
if (is_err(x)) x$err <- f(x$err)
x
}

Expand All @@ -66,7 +66,7 @@ map_err = function(x, f) {
#' @return same R object wrapped in a Err-result
#' @noRd
map = function(x, f) {
if (is_ok(x)) x$ok = f(x$ok)
if (is_ok(x)) x$ok <- f(x$ok)
x
}

Expand Down
30 changes: 15 additions & 15 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions man/Expr_map.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions man/IO_sink_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b64a700

Please sign in to comment.