Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unit test optimization toggles from LazyFrame #405

Merged
merged 13 commits into from
Oct 10, 2023
4 changes: 3 additions & 1 deletion R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,9 @@ LazyFrame$schema <- function() .Call(wrap__LazyFrame__schema, self)

LazyFrame$fetch <- function(n_rows) .Call(wrap__LazyFrame__fetch, self, n_rows)

LazyFrame$optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming) .Call(wrap__LazyFrame__optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expr, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming)
LazyFrame$set_optimization_toggle <- function(type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming) .Call(wrap__LazyFrame__set_optimization_toggle, self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, streaming)

LazyFrame$get_optimization_toggle <- function() .Call(wrap__LazyFrame__get_optimization_toggle, self)

LazyFrame$profile <- function() .Call(wrap__LazyFrame__profile, self)

Expand Down
129 changes: 94 additions & 35 deletions R/lazyframe__lazy.R
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,20 @@ LazyFrame_with_row_count = function(name, offset = NULL) {
#' @examples pl$LazyFrame(iris)$filter(pl$col("Species") == "setosa")$collect()
LazyFrame_filter = "use_extendr_wrapper"

#' @title Collect a query into a DataFrame
#' @description `$collect()` performs the query on the LazyFrame. It returns a
#' DataFrame
#' @title Get optimization settings
#' @description Get the current optimization toggles for the lazy query
#' @keywords LazyFrame
#' @return List of optimization toggles
#' @examples
#' pl$LazyFrame(mtcars)$get_optimization_toggle()
LazyFrame_get_optimization_toggle = function() {
self |>
.pr$LazyFrame$get_optimization_toggle()
}

#' @title Configure optimization toggles
#' @description Configure the optimization toggles for the lazy query
#' @keywords LazyFrame
#' @param type_coercion Boolean. Coerce types such that operations succeed and
#' run on minimal required memory.
#' @param predicate_pushdown Boolean. Applies filters as early as possible at
Expand All @@ -293,11 +304,42 @@ LazyFrame_filter = "use_extendr_wrapper"
#' occur on self-joins or unions.
#' @param comm_subexpr_elim Boolean. Common subexpressions will be cached and
#' reused.
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @return LazyFrame with specified optimization toggles
#' @examples
#' pl$LazyFrame(mtcars)$set_optimization_toggle(type_coercion = FALSE)
LazyFrame_set_optimization_toggle = function(
type_coercion = TRUE,
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
streaming = FALSE) {
self |>
.pr$LazyFrame$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim,
comm_subexpr_elim,
streaming
)
}

#' @title Collect a query into a DataFrame
#' @description `$collect()` performs the query on the LazyFrame. It returns a
#' DataFrame
#' @inheritParams LazyFrame_set_optimization_toggle
#' @param no_optimization Boolean. Sets the following parameters to `FALSE`:
#' `predicate_pushdown`, `projection_pushdown`, `slice_pushdown`,
#' `comm_subplan_elim`, `comm_subexpr_elim`.
#' @param streaming Boolean. Run parts of the query in a streaming fashion
#' (this is in an alpha state).
#' @param inherit_optimization Boolean. Use existing optimization settings
#' regardless the settings specified in this function call.
#' @param collect_in_background Boolean. Detach this query from R session.
#' Computation will start in background. Get a handle which later can be converted
#' into the resulting DataFrame. Useful in interactive mode to not lock R session.
Expand Down Expand Up @@ -326,8 +368,9 @@ LazyFrame_collect = function(
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
no_optimization = FALSE,
streaming = FALSE,
no_optimization = FALSE,
inherit_optimization = FALSE,
collect_in_background = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
Expand All @@ -341,14 +384,12 @@ LazyFrame_collect = function(
comm_subplan_elim = FALSE
}

collect_f = if (isTRUE(collect_in_background)) {
\(...) Ok(.pr$LazyFrame$collect_in_background(...))
} else {
.pr$LazyFrame$collect
}
collect_f = ifelse(isTRUE(collect_in_background), \(...) Ok(.pr$LazyFrame$collect_in_background(...)), .pr$LazyFrame$collect)

self |>
.pr$LazyFrame$optimization_toggle(
lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
Expand All @@ -357,8 +398,11 @@ LazyFrame_collect = function(
comm_subplan_elim,
comm_subexpr_elim,
streaming
) |>
and_then(collect_f) |>
) |> unwrap("in $collect():")
}

lf |>
collect_f() |>
unwrap("in $collect():")
}

Expand Down Expand Up @@ -459,26 +503,31 @@ LazyFrame_sink_parquet = function(
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
slice_pushdown = TRUE) {
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}
call_ctx = "in $sink_parquet(...)"
self |>
.pr$LazyFrame$optimization_toggle(

lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim = FALSE,
comm_subexpr_elim = FALSE,
streaming = TRUE
) |>
unwrap(call_ctx) |>
streaming = FALSE
) |> unwrap("in $sink_parquet()")
}

lf |>
.pr$LazyFrame$sink_parquet(
path,
compression,
Expand All @@ -488,7 +537,7 @@ LazyFrame_sink_parquet = function(
data_pagesize_limit,
maintain_order
) |>
unwrap(call_ctx) |>
unwrap("in $sink_parquet()") |>
invisible()
}

Expand Down Expand Up @@ -526,32 +575,37 @@ LazyFrame_sink_ipc = function(
predicate_pushdown = TRUE,
projection_pushdown = TRUE,
simplify_expression = TRUE,
slice_pushdown = TRUE,
no_optimization = FALSE,
slice_pushdown = TRUE) {
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
slice_pushdown = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
simplify_expression,
slice_pushdown,
comm_subplan_elim = FALSE,
comm_subexpr_elim = FALSE,
streaming = TRUE
) |>
unwrap("in $sink_ipc(...)") |>
streaming = FALSE
) |> unwrap("in $sink_ipc()")
}

lf |>
.pr$LazyFrame$sink_ipc(
path,
compression,
maintain_order
) |>
unwrap("in LazyFrame$sink_ipc(...)") |>
unwrap("in $sink_ipc()") |>
invisible()
}

Expand Down Expand Up @@ -1179,8 +1233,9 @@ LazyFrame_fetch = function(
slice_pushdown = TRUE,
comm_subplan_elim = TRUE,
comm_subexpr_elim = TRUE,
streaming = FALSE,
no_optimization = FALSE,
streaming = FALSE) {
inherit_optimization = FALSE) {
if (isTRUE(no_optimization)) {
predicate_pushdown = FALSE
projection_pushdown = FALSE
Expand All @@ -1193,8 +1248,10 @@ LazyFrame_fetch = function(
comm_subplan_elim = FALSE
}

self |>
.pr$LazyFrame$optimization_toggle(
lf = self

if (isFALSE(inherit_optimization)) {
lf = self$set_optimization_toggle(
type_coercion,
predicate_pushdown,
projection_pushdown,
Expand All @@ -1203,8 +1260,10 @@ LazyFrame_fetch = function(
comm_subplan_elim,
comm_subexpr_elim,
streaming
) |>
and_then(\(self) .pr$LazyFrame$fetch(self, n_rows)) |>
) |> unwrap("in $fetch()")
}

.pr$LazyFrame$fetch(lf, n_rows) |>
unwrap("in $fetch()")
}

Expand Down
4 changes: 2 additions & 2 deletions R/rust_result.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Err = function(x) {
#' @param f a closure that takes the err part as input
#' @return same R object wrapped in a Err-result
map_err = function(x, f) {
if (is_err(x)) x$err = f(x$err)
if (is_err(x)) x$err <- f(x$err)
x
}

Expand All @@ -66,7 +66,7 @@ map_err = function(x, f) {
#' @return same R object wrapped in a Err-result
#' @noRd
map = function(x, f) {
if (is_ok(x)) x$ok = f(x$ok)
if (is_ok(x)) x$ok <- f(x$ok)
x
}

Expand Down
30 changes: 15 additions & 15 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions man/Expr_map.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions man/IO_sink_ipc.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading