From a97cf818bf879edd78512b1bdeddf29a0672973f Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 9 Oct 2023 22:13:15 +0300 Subject: [PATCH 1/8] improve pl$mem_address --- R/zzz.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/R/zzz.R b/R/zzz.R index 3d2bad0a3..8ee5b72db 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -105,7 +105,8 @@ move_env_elements(Expr, pl, c("lit"), remove = FALSE) #' Get Memory Address #' @name pl_mem_address -#' @description mimics pl$mem_address +#' @description Get underlying mem address a rust object (via ExtPtr). Expert use only. +#' @details Does not give meaningful answers for regular R objects. #' @param robj an R object #' @aliases mem_address #' @return String of mem address From ca515ca4623fe5da1f372088c1a4a29f0b4aace7 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 9 Oct 2023 22:48:53 +0300 Subject: [PATCH 2/8] reduce sys.sleep in examples --- R/expr__expr.R | 6 +++--- R/lazyframe__lazy.R | 2 +- R/rbackground.R | 2 +- man/Expr_map.Rd | 14 +++++++------- man/LazyFrame_collect_in_background.Rd | 2 +- man/RThreadHandle_RThreadHandle_class.Rd | 2 +- man/nanoarrow.Rd | 8 ++++---- man/pl_mem_address.Rd | 5 ++++- tests/testthat/test-rbackground.R | 2 +- 9 files changed, 23 insertions(+), 20 deletions(-) diff --git a/R/expr__expr.R b/R/expr__expr.R index 06a0c0ca2..27c2eeb93 100644 --- a/R/expr__expr.R +++ b/R/expr__expr.R @@ -701,7 +701,7 @@ construct_ProtoExprArray = function(...) { #' # map a,b,c,d sequentially #' pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( #' pl$all()$map(\(s) { -#' Sys.sleep(.5) +#' Sys.sleep(.1) #' s * 2 #' }) #' )$collect() |> system.time() @@ -712,7 +712,7 @@ construct_ProtoExprArray = function(...) { #' pl$options$rpool_cap #' pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( #' pl$all()$map(\(s) { -#' Sys.sleep(.5) +#' Sys.sleep(.1) #' s * 2 #' }, in_background = TRUE) #' )$collect() |> system.time() @@ -721,7 +721,7 @@ construct_ProtoExprArray = function(...) { #' pl$options$rpool_cap #' pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( #' pl$all()$map(\(s) { -#' Sys.sleep(.5) +#' Sys.sleep(.1) #' s * 2 #' }, in_background = TRUE) #' )$collect() |> system.time() diff --git a/R/lazyframe__lazy.R b/R/lazyframe__lazy.R index 527997b93..4d9fb8fb0 100644 --- a/R/lazyframe__lazy.R +++ b/R/lazyframe__lazy.R @@ -384,7 +384,7 @@ LazyFrame_collect = function( #' # Some expression which does contain a map #' expr = pl$col("mpg")$map( #' \(x) { -#' Sys.sleep(.5) +#' Sys.sleep(.1) #' x * 0.43 #' }, #' in_background = TRUE # set TRUE if collecting in background queries with $map or $apply diff --git a/R/rbackground.R b/R/rbackground.R index 8f222000c..a9491c7a4 100644 --- a/R/rbackground.R +++ b/R/rbackground.R @@ -65,7 +65,7 @@ print.RThreadHandle = function(x, ...) as.character(x) |> cat("\n") #' [`$apply()`][Expr_apply] #' @examples #' prexpr = pl$col("mpg")$map(\(x) { -#' Sys.sleep(1.5) +#' Sys.sleep(.1) #' x * 0.43 #' }, in_background = TRUE)$alias("kml") #' handle = pl$LazyFrame(mtcars)$with_columns(prexpr)$collect_in_background() diff --git a/man/Expr_map.Rd b/man/Expr_map.Rd index 6a693d74a..348d3ab21 100644 --- a/man/Expr_map.Rd +++ b/man/Expr_map.Rd @@ -49,10 +49,10 @@ to see and view number of parallel R sessions. \examples{ pl$DataFrame(iris)$ select( - pl$col("Sepal.Length")$map(\(x) { - paste("cheese", as.character(x$to_vector())) - }, pl$dtypes$Utf8) - ) + pl$col("Sepal.Length")$map(\(x) { + paste("cheese", as.character(x$to_vector())) + }, pl$dtypes$Utf8) +) # R parallel process example, use Sys.sleep() to imitate some CPU expensive # computation. @@ -60,7 +60,7 @@ pl$DataFrame(iris)$ # map a,b,c,d sequentially pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( pl$all()$map(\(s) { - Sys.sleep(.5) + Sys.sleep(.1) s * 2 }) )$collect() |> system.time() @@ -71,7 +71,7 @@ pl$set_options(rpool_cap = 4) # set back to 4, the default pl$options$rpool_cap pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( pl$all()$map(\(s) { - Sys.sleep(.5) + Sys.sleep(.1) s * 2 }, in_background = TRUE) )$collect() |> system.time() @@ -80,7 +80,7 @@ pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( pl$options$rpool_cap pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( pl$all()$map(\(s) { - Sys.sleep(.5) + Sys.sleep(.1) s * 2 }, in_background = TRUE) )$collect() |> system.time() diff --git a/man/LazyFrame_collect_in_background.Rd b/man/LazyFrame_collect_in_background.Rd index 75822e26a..6dba86dbe 100644 --- a/man/LazyFrame_collect_in_background.Rd +++ b/man/LazyFrame_collect_in_background.Rd @@ -28,7 +28,7 @@ R session is not available for polars execution. See also examples below. # Some expression which does contain a map expr = pl$col("mpg")$map( \(x) { - Sys.sleep(.5) + Sys.sleep(.1) x * 0.43 }, in_background = TRUE # set TRUE if collecting in background queries with $map or $apply diff --git a/man/RThreadHandle_RThreadHandle_class.Rd b/man/RThreadHandle_RThreadHandle_class.Rd index 95a7bb512..b1afd3588 100644 --- a/man/RThreadHandle_RThreadHandle_class.Rd +++ b/man/RThreadHandle_RThreadHandle_class.Rd @@ -29,7 +29,7 @@ session. } \examples{ prexpr = pl$col("mpg")$map(\(x) { - Sys.sleep(1.5) + Sys.sleep(.1) x * 0.43 }, in_background = TRUE)$alias("kml") handle = pl$LazyFrame(mtcars)$with_columns(prexpr)$collect_in_background() diff --git a/man/nanoarrow.Rd b/man/nanoarrow.Rd index 3ecaf02a4..7af2018a2 100644 --- a/man/nanoarrow.Rd +++ b/man/nanoarrow.Rd @@ -16,13 +16,13 @@ \alias{as_record_batch_reader.DataFrame} \title{polars to nanoarrow and arrow} \usage{ -\method{as_nanoarrow_array_stream}{DataFrame}(x, ..., schema = NULL) +as_nanoarrow_array_stream.DataFrame(x, ..., schema = NULL) -\method{infer_nanoarrow_schema}{DataFrame}(x, ...) +infer_nanoarrow_schema.DataFrame(x, ...) -\method{as_arrow_table}{DataFrame}(x, ...) +as_arrow_table.DataFrame(x, ...) -\method{as_record_batch_reader}{DataFrame}(x, ..., schema = NULL) +as_record_batch_reader.DataFrame(x, ..., schema = NULL) } \arguments{ \item{x}{a polars DataFrame} diff --git a/man/pl_mem_address.Rd b/man/pl_mem_address.Rd index c64f49955..2a0e31b31 100644 --- a/man/pl_mem_address.Rd +++ b/man/pl_mem_address.Rd @@ -11,7 +11,10 @@ String of mem address } \description{ -mimics pl$mem_address +Get underlying mem address a rust object (via ExtPtr). Expert use only. +} +\details{ +Does not give meaningful answers for regular R objects. } \examples{ pl$mem_address(pl$Series(1:3)) diff --git a/tests/testthat/test-rbackground.R b/tests/testthat/test-rbackground.R index 8e2a866c9..fb354df22 100644 --- a/tests/testthat/test-rbackground.R +++ b/tests/testthat/test-rbackground.R @@ -22,7 +22,7 @@ test_that("Test using $map() in background", { compute = lf$select(pl$col("y")$map(\(x) x * x, in_background = FALSE)) compute_bg = lf$select(pl$col("y")$map(\(x) { - Sys.sleep(2) + Sys.sleep(.3) x * x }, in_background = TRUE)) res_ref = compute$collect()$to_data_frame() From 8e0ce7ba3293f4f168acdb8c84b437063da76bc9 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 9 Oct 2023 23:13:00 +0300 Subject: [PATCH 3/8] restore nanoarrow docs --- man/nanoarrow.Rd | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/man/nanoarrow.Rd b/man/nanoarrow.Rd index 7af2018a2..3ecaf02a4 100644 --- a/man/nanoarrow.Rd +++ b/man/nanoarrow.Rd @@ -16,13 +16,13 @@ \alias{as_record_batch_reader.DataFrame} \title{polars to nanoarrow and arrow} \usage{ -as_nanoarrow_array_stream.DataFrame(x, ..., schema = NULL) +\method{as_nanoarrow_array_stream}{DataFrame}(x, ..., schema = NULL) -infer_nanoarrow_schema.DataFrame(x, ...) +\method{infer_nanoarrow_schema}{DataFrame}(x, ...) -as_arrow_table.DataFrame(x, ...) +\method{as_arrow_table}{DataFrame}(x, ...) -as_record_batch_reader.DataFrame(x, ..., schema = NULL) +\method{as_record_batch_reader}{DataFrame}(x, ..., schema = NULL) } \arguments{ \item{x}{a polars DataFrame} From 4d0d7510c530d22f34bd096939e13597e1d90389 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 9 Oct 2023 23:13:16 +0300 Subject: [PATCH 4/8] roxygen with previous --- man/global_rpool_cap.Rd | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/man/global_rpool_cap.Rd b/man/global_rpool_cap.Rd index b99b4bbb6..aea602d42 100644 --- a/man/global_rpool_cap.Rd +++ b/man/global_rpool_cap.Rd @@ -8,15 +8,15 @@ \item{n}{Integer, the capacity limit R sessions to process R code.} } \value{ -\code{pl$get_global_rpool_cap()} returns a list with two elements \code{available} -and \code{capacity}. \code{available} is the number of R sessions are already spawned -in pool. \code{capacity} is the limit of new R sessions to spawn. Anytime a polars +\code{pl$options$rpool_cap} returns the capacity ("limit") of co-running external R sessions / +processes. \code{pl$options$rpool_active} is the number of R sessions are already spawned +in the pool. \code{rpool_cap} is the limit of new R sessions to spawn. Anytime a polars thread worker needs a background R session specifically to run R code embedded in a query via \verb{$map(..., in_background = TRUE)} or \verb{$apply(..., in_background = TRUE)}, it will obtain any R session idling in -rpool, or spawn a new R session (process) and add it to pool if \code{capacity} +rpool, or spawn a new R session (process) if \code{capacity} is not already reached. If \code{capacity} is already reached, the thread worker -will sleep until an R session is idling. +will sleep and in a R job queue until an R session is idle. } \description{ Deprecated. Use pl$options to get, and pl$set_options() to set. @@ -27,13 +27,15 @@ serialize + shared memory buffers via the rust crate \code{ipc-channel}. Multi-process communication has overhead because all data must be serialized/de-serialized and sent via buffers. Using multiple R sessions will likely only give a speed-up in a \verb{low io - high cpu} scenario. Native -polars query syntax runs in threads and have no overhead. +polars query syntax runs in threads and have no overhead. Polars has as default +double as many thread workers as cores. If any worker are queuing for or using R sessions, +other workers can still continue any native polars parts as much as possible. } \examples{ -default = pl$get_global_rpool_cap() -print(default) -pl$set_global_rpool_cap(8) -pl$get_global_rpool_cap() -pl$set_global_rpool_cap(default$capacity) +default = pl$options$rpool_cap |> print() +pl$set_options(rpool_cap = 8) +pl$options$rpool_cap +pl$set_options(rpool_cap = default) +pl$options$rpool_cap } \keyword{options} From 578370626eb64ac2e4e44de33d13449ba59dcb08 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 9 Oct 2023 23:14:10 +0300 Subject: [PATCH 5/8] disallow pl$set_options(42), silently did nothing --- R/options.R | 29 ++++++++++++++++++----------- tests/testthat/test-options.R | 11 +++++++++++ 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/R/options.R b/R/options.R index f954a4315..052c4426b 100644 --- a/R/options.R +++ b/R/options.R @@ -100,6 +100,11 @@ pl$set_options = function( # modified in the first call) args_modified = names(as.list(sys.call()[-1])) + if (is.null(args_modified) || any(nchar(args_modified) == 0L)) { + Err_plain("all args must be named")|> + unwrap("in pl$set_options") + } + for (i in seq_along(args_modified)) { value = get(args_modified[i]) @@ -262,26 +267,28 @@ pl$with_string_cache = function(expr) { #' Multi-process communication has overhead because all data must be #' serialized/de-serialized and sent via buffers. Using multiple R sessions #' will likely only give a speed-up in a `low io - high cpu` scenario. Native -#' polars query syntax runs in threads and have no overhead. +#' polars query syntax runs in threads and have no overhead. Polars has as default +#' double as many thread workers as cores. If any worker are queuing for or using R sessions, +#' other workers can still continue any native polars parts as much as possible. #' #' @return -#' `pl$get_global_rpool_cap()` returns a list with two elements `available` -#' and `capacity`. `available` is the number of R sessions are already spawned -#' in pool. `capacity` is the limit of new R sessions to spawn. Anytime a polars +#' `pl$options$rpool_cap` returns the capacity ("limit") of co-running external R sessions / +#' processes. `pl$options$rpool_active` is the number of R sessions are already spawned +#' in the pool. `rpool_cap` is the limit of new R sessions to spawn. Anytime a polars #' thread worker needs a background R session specifically to run R code embedded #' in a query via `$map(..., in_background = TRUE)` or #' `$apply(..., in_background = TRUE)`, it will obtain any R session idling in -#' rpool, or spawn a new R session (process) and add it to pool if `capacity` +#' rpool, or spawn a new R session (process) if `capacity` #' is not already reached. If `capacity` is already reached, the thread worker -#' will sleep until an R session is idling. +#' will sleep and in a R job queue until an R session is idle. #' #' @keywords options #' @examples -#' default = pl$get_global_rpool_cap() -#' print(default) -#' pl$set_global_rpool_cap(8) -#' pl$get_global_rpool_cap() -#' pl$set_global_rpool_cap(default$capacity) +#' default = pl$options$rpool_cap |> print() +#' pl$set_options(rpool_cap = 8) +#' pl$options$rpool_cap +#' pl$set_options(rpool_cap = default) +#' pl$options$rpool_cap pl$get_global_rpool_cap = function() { warning( "in pl$get_global_rpool_cap(): Deprecated. Use pl$options$rpool_cap instead.", diff --git a/tests/testthat/test-options.R b/tests/testthat/test-options.R index b0e0f1bb4..f9620fe1a 100644 --- a/tests/testthat/test-options.R +++ b/tests/testthat/test-options.R @@ -35,4 +35,15 @@ test_that("pl$options$ read-write", { # reset_options() works pl$reset_options() expect_identical(pl$options, old_options) + + # all set_options args must be named + expect_identical( + pl$set_options(42) |> get_err_ctx("Plain"), + "all args must be named" + ) + expect_identical( + pl$set_options(rpool_cap = 42, 42)|> get_err_ctx("Plain"), + "all args must be named" + ) + }) From 105d7bcee178093784dff380013b3c32e7511ddf Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 9 Oct 2023 23:53:17 +0300 Subject: [PATCH 6/8] set_options: improve error for incorrect arg --- R/options.R | 10 +++++++++- man/polars_options.Rd | 2 ++ tests/testthat/test-options.R | 6 ++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/R/options.R b/R/options.R index 052c4426b..ad99a6c75 100644 --- a/R/options.R +++ b/R/options.R @@ -52,6 +52,8 @@ polars_optreq$rpool_cap = list() # rust-side options already check args #' @docType NULL #' #' @details +#' All args must be explicitly and fully named. +#' #' `pl$options$rpool_active` indicates the number of R sessions already #' spawned in pool. `pl$options$rpool_cap` indicates the maximum number of new R #' sessions that can be spawned. Anytime a polars thread worker needs a background @@ -106,7 +108,13 @@ pl$set_options = function( } for (i in seq_along(args_modified)) { - value = get(args_modified[i]) + value = result(get(args_modified[i])) |> + map_err(\(rp_err) { + rp_err$ + hint("arg-name does not match any defined args of `?set_options`")$ + bad_arg(args_modified[i]) + }) |> + unwrap("in pl$set_options") # each argument has its own input requirements validation = c() diff --git a/man/polars_options.Rd b/man/polars_options.Rd index ea3394215..78365d810 100644 --- a/man/polars_options.Rd +++ b/man/polars_options.Rd @@ -35,6 +35,8 @@ Get and set polars options. See sections "Value" and "Examples" below for more details. } \details{ +All args must be explicitly and fully named. + \code{pl$options$rpool_active} indicates the number of R sessions already spawned in pool. \code{pl$options$rpool_cap} indicates the maximum number of new R sessions that can be spawned. Anytime a polars thread worker needs a background diff --git a/tests/testthat/test-options.R b/tests/testthat/test-options.R index f9620fe1a..531097060 100644 --- a/tests/testthat/test-options.R +++ b/tests/testthat/test-options.R @@ -46,4 +46,10 @@ test_that("pl$options$ read-write", { "all args must be named" ) + # incomplete/misspelled name not allowed + expect_identical( + pl$set_options(rpo = 42) |> get_err_ctx("Hint"), + "arg-name does not match any options" + ) + }) From 6c5cd173ba435cd70922374747b7de076cfd4a10 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 23 Oct 2023 22:21:02 +0200 Subject: [PATCH 7/8] fix a utest --- tests/testthat/test-options.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testthat/test-options.R b/tests/testthat/test-options.R index 531097060..5c3ddb49e 100644 --- a/tests/testthat/test-options.R +++ b/tests/testthat/test-options.R @@ -49,7 +49,7 @@ test_that("pl$options$ read-write", { # incomplete/misspelled name not allowed expect_identical( pl$set_options(rpo = 42) |> get_err_ctx("Hint"), - "arg-name does not match any options" + "arg-name does not match any defined args of `?set_options`" ) }) From a6c5bd7e059c185bc8e4cf0ab3cc89928f70cb87 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Thu, 26 Oct 2023 13:49:13 +0000 Subject: [PATCH 8/8] auto formatting --- R/lazyframe__lazy.R | 27 +++++++++++++-------------- R/options.R | 2 +- R/zzz.R | 10 ++++++---- tests/testthat/test-options.R | 3 +-- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/R/lazyframe__lazy.R b/R/lazyframe__lazy.R index ffff5366f..95a8d9d60 100644 --- a/R/lazyframe__lazy.R +++ b/R/lazyframe__lazy.R @@ -1408,20 +1408,19 @@ LazyFrame_fetch = function( #' agg(pl$col(pl$Float64)$apply(r_func))$ #' profile() LazyFrame_profile = function( - type_coercion = TRUE, - predicate_pushdown = TRUE, - projection_pushdown = TRUE, - simplify_expression = TRUE, - slice_pushdown = TRUE, - comm_subplan_elim = TRUE, - comm_subexpr_elim = TRUE, - streaming = FALSE, - no_optimization = FALSE, - inherit_optimization = FALSE, - collect_in_background = FALSE, - show_plot = FALSE, - truncate_nodes = 0) { - + type_coercion = TRUE, + predicate_pushdown = TRUE, + projection_pushdown = TRUE, + simplify_expression = TRUE, + slice_pushdown = TRUE, + comm_subplan_elim = TRUE, + comm_subexpr_elim = TRUE, + streaming = FALSE, + no_optimization = FALSE, + inherit_optimization = FALSE, + collect_in_background = FALSE, + show_plot = FALSE, + truncate_nodes = 0) { if (isTRUE(no_optimization)) { predicate_pushdown = FALSE projection_pushdown = FALSE diff --git a/R/options.R b/R/options.R index d95c685c1..b2f86d37a 100644 --- a/R/options.R +++ b/R/options.R @@ -103,7 +103,7 @@ pl$set_options = function( args_modified = names(as.list(sys.call()[-1])) if (is.null(args_modified) || any(nchar(args_modified) == 0L)) { - Err_plain("all args must be named")|> + Err_plain("all args must be named") |> unwrap("in pl$set_options") } diff --git a/R/zzz.R b/R/zzz.R index 8ee5b72db..383f78724 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -148,22 +148,24 @@ pl$mem_address = mem_address makeActiveBinding( "rpool_cap", \(arg) { - if(missing(arg)) { + if (missing(arg)) { unwrap(get_global_rpool_cap())$capacity } else { unwrap(set_global_rpool_cap(arg)) } - }, env = polars_optenv + }, + env = polars_optenv ) makeActiveBinding( "rpool_active", \(arg) { - if(missing(arg)) { + if (missing(arg)) { unwrap(get_global_rpool_cap())$active } else { unwrap(stop("internal error: polars_optenv$rpool_active cannot be set directly")) } - }, env = polars_optenv + }, + env = polars_optenv ) setup_renv() diff --git a/tests/testthat/test-options.R b/tests/testthat/test-options.R index 5c3ddb49e..e21224c29 100644 --- a/tests/testthat/test-options.R +++ b/tests/testthat/test-options.R @@ -42,7 +42,7 @@ test_that("pl$options$ read-write", { "all args must be named" ) expect_identical( - pl$set_options(rpool_cap = 42, 42)|> get_err_ctx("Plain"), + pl$set_options(rpool_cap = 42, 42) |> get_err_ctx("Plain"), "all args must be named" ) @@ -51,5 +51,4 @@ test_that("pl$options$ read-write", { pl$set_options(rpo = 42) |> get_err_ctx("Hint"), "arg-name does not match any defined args of `?set_options`" ) - })