diff --git a/DESCRIPTION b/DESCRIPTION index 0c528e251..b3926db59 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -7,7 +7,7 @@ Authors@R: c(person("Ritchie", "Vink", , "ritchie46@gmail.com", role = c("aut")), person("Soren", "Welling", , "sorhawell@gmail.com", role = c("aut", "cre")), person("Tatsuya", "Shima", , "ts1s1andn@gmail.com", role = c("aut")), - person("Etienne", "Bacher", , "etienne.bacher@protonmail.com", role = c("ctb"))) + person("Etienne", "Bacher", , "etienne.bacher@protonmail.com", role = c("aut"))) Description: Lightning-fast 'DataFrame' library written in 'Rust'. Convert R data to 'Polars' data and vice versa. Perform fast, lazy, larger-than-memory and optimized data queries. 'Polars' is interoperable with the package 'arrow', diff --git a/NEWS.md b/NEWS.md index 84ba3446a..9201a7a69 100644 --- a/NEWS.md +++ b/NEWS.md @@ -11,13 +11,16 @@ ## What's changed +- `pl$concat()` now also supports `Series`, `Expr` and `LazyFrame` (#407). - New method `$unnest()` for `LazyFrame` (#397). - New method `$sample()` for `DataFrame` (#399). - New method `$meta$tree_format()` to display an `Expr` as a tree (#401). - New argument `schema` in `pl$DataFrame()` and `pl$LazyFrame()` to override the automatic type detection (#385). -- Fix bug when calling R from polars via e.g. `$map()` where query would not complete in one edge - case (#409). +- Fix bug when calling R from polars via e.g. `$map()` where query would not + complete in one edge case (#409). +- New method `$cat$get_categories()` to list unique values of categorical + variables (#412). # polars 0.8.1 diff --git a/R/dataframe__frame.R b/R/dataframe__frame.R index d658ea8f4..bc3517100 100644 --- a/R/dataframe__frame.R +++ b/R/dataframe__frame.R @@ -133,7 +133,6 @@ DataFrame #' #' # custom schema #' pl$DataFrame(iris, schema = list(Sepal.Length = pl$Float32, Species = pl$Utf8)) - pl$DataFrame = function(..., make_names_unique = TRUE, schema = NULL) { largs = unpack_list(...) @@ -181,9 +180,9 @@ pl$DataFrame = function(..., make_names_unique = TRUE, schema = NULL) { names(largs) = keys lapply(seq_along(largs), \(x) { varname = keys[x] - out <- pl$lit(largs[[x]]) + out = pl$lit(largs[[x]]) if (!is.null(schema) && varname %in% names(schema)) { - out <- out$cast(schema[[varname]], strict = TRUE) + out = out$cast(schema[[varname]], strict = TRUE) } out$alias(varname) }) |> @@ -1033,6 +1032,88 @@ DataFrame_first = function() { self$lazy()$first()$collect() } + +#' @title Number of chunks of the Series in a DataFrame +#' @description +#' Number of chunks (memory allocations) for all or first Series in a DataFrame. +#' @details +#' A DataFrame is a vector of Series. Each Series in rust-polars is a wrapper +#' around a ChunkedArray, which is like a virtual contiguous vector physically +#' backed by an ordered set of chunks. Each chunk of values has a contiguous +#' memory layout and is an arrow array. Arrow arrays are a fast, thread-safe and +#' cross-platform memory layout. +#' +#' In R, combining with `c()` or `rbind()` requires immediate vector re-allocation +#' to place vector values in contiguous memory. This is slow and memory consuming, +#' and it is why repeatedly appending to a vector in R is discouraged. +#' +#' In polars, when we concatenate or append to Series or DataFrame, the +#' re-allocation can be avoided or delayed by simply appending chunks to each +#' individual Series. However, if chunks become many and small or are misaligned +#' across Series, this can hurt the performance of subsequent operations. +#' +#' Most places in the polars api where chunking could occur, the user have to +#' typically actively opt-out by setting an argument `rechunk = FALSE`. +#' +#' @keywords DataFrame +#' @param strategy Either `"all"` or `"first"`. `"first"` only returns chunks +#' for the first Series. +#' @return A real vector of chunk counts per Series. +#' @seealso [`$rechunk()`][DataFrame_rechunk] +#' @examples +#' # create DataFrame with misaligned chunks +#' df = pl$concat( +#' 1:10, # single chunk +#' pl$concat(1:5, 1:5, rechunk = FALSE, how = "vertical")$rename("b"), # two chunks +#' how = "horizontal" +#' ) +#' df +#' df$n_chunks() +#' +#' # rechunk a chunked DataFrame +#' df$rechunk()$n_chunks() +#' +#' # rechunk is not an in-place operation +#' df$n_chunks() +#' +#' # The following toy example emulates the Series "chunkyness" in R. Here it a +#' # S3-classed list with same type of vectors and where have all relevant S3 +#' # generics implemented to make behave as if it was a regular vector. +#' "+.chunked_vector" = \(x, y) structure(list(unlist(x) + unlist(y)), class = "chunked_vector") +#' print.chunked_vector = \(x, ...) print(unlist(x), ...) +#' c.chunked_vector = \(...) { +#' structure(do.call(c, lapply(list(...), unclass)), class = "chunked_vector") +#' } +#' rechunk = \(x) structure(unlist(x), class = "chunked_vector") +#' x = structure(list(1:4, 5L), class = "chunked_vector") +#' x +#' x + 5:1 +#' lapply(x, tracemem) # trace chunks to verify no re-allocation +#' z = c(x, x) +#' z # looks like a plain vector +#' lapply(z, tracemem) # mem allocation in z are the same from x +#' str(z) +#' z = rechunk(z) +#' str(z) +DataFrame_n_chunks = function(strategy = "all") { + .pr$DataFrame$n_chunks(self, strategy) |> + unwrap("in n_chunks():") +} + + +#' @title Rechunk a DataFrame +#' @description Rechunking re-allocates any "chunked" memory allocations to +#' speed-up e.g. vectorized operations. +#' @inherit DataFrame_n_chunks details examples +#' +#' @keywords DataFrame +#' @return A DataFrame +#' @seealso [`$n_chunks()`][DataFrame_n_chunks] +DataFrame_rechunk = function() { + .pr$DataFrame$rechunk(self) +} + + #' @title Get the last row of the DataFrame. #' @keywords DataFrame #' @return A DataFrame with one row. diff --git a/R/error__rpolarserr.R b/R/error__rpolarserr.R index 37a86b041..b851e3bb9 100644 --- a/R/error__rpolarserr.R +++ b/R/error__rpolarserr.R @@ -53,12 +53,19 @@ bad_robj = function(r) { .pr$RPolarsErr$new()$bad_robj(r) } -Err_plain = function(x) { - Err(.pr$RPolarsErr$new()$plain(x)) +Err_plain = function(...) { + Err(.pr$RPolarsErr$new()$plain(paste(..., collapse = " "))) } # short hand for extracting an error context in unit testing, will raise error if not an RPolarsErr -get_err_ctx = \(x) unwrap_err(result(x))$contexts() +get_err_ctx = \(x, select = NULL) { + ctx = unwrap_err(result(x))$contexts() + if (is.null(select)) { + ctx + } else { + ctx[[match.arg(select, names(ctx))]] + } +} # wrapper to return Result diff --git a/R/expr__categorical.R b/R/expr__categorical.R index 465ccc348..4190aec93 100644 --- a/R/expr__categorical.R +++ b/R/expr__categorical.R @@ -10,13 +10,35 @@ #' @return bool: TRUE if equal #' @examples #' df = pl$DataFrame( -#' cats = c("z", "z", "k", "a", "b"), +#' cats = factor(c("z", "z", "k", "a", "b")), #' vals = c(3, 1, 2, 2, 3) #' )$with_columns( -#' pl$col("cats")$cast(pl$Categorical)$cat$set_ordering("physical") +#' pl$col("cats")$cat$set_ordering("physical") #' ) #' df$select(pl$all()$sort()) ExprCat_set_ordering = function(ordering) { .pr$Expr$cat_set_ordering(self, ordering) |> unwrap("in $cat$set_ordering:") } # TODO use df$sort(c("cats","vals")) when implemented + + +#' Get the categories stored in this data type +#' @name ExprCat_get_categories +#' @keywords ExprCat +#' @return A polars DataFrame with the categories for each categorical Series. +#' @examples +#' df = pl$DataFrame( +#' cats = factor(c("z", "z", "k", "a", "b")), +#' vals = factor(c(3, 1, 2, 2, 3)) +#' ) +#' df +#' +#' df$select( +#' pl$col("cats")$cat$get_categories() +#' ) +#' df$select( +#' pl$col("vals")$cat$get_categories() +#' ) +ExprCat_get_categories = function() { + .pr$Expr$cat_get_categories(self) +} diff --git a/R/expr__expr.R b/R/expr__expr.R index 8e69e7348..42c99911e 100644 --- a/R/expr__expr.R +++ b/R/expr__expr.R @@ -683,8 +683,8 @@ construct_ProtoExprArray = function(...) { #' This is used to inform schema of the actual return type of the R function. Setting this wrong #' could theoretically have some downstream implications to the query. #' @param agg_list Aggregate list. Map from vector to group in groupby context. -#' @param in_background Boolean. Whether to execute the map in a background R process. Combined wit -#' setting e.g. `pl$set_global_rpool_cap(4)` it can speed up some slow R functions as they can run +#' @param in_background Boolean. Whether to execute the map in a background R process. Combined with +#' setting e.g. `pl$set_options(rpool_cap = 4)` it can speed up some slow R functions as they can run #' in parallel R sessions. The communication speed between processes is quite slower than between #' threads. Will likely only give a speed-up in a "low IO - high CPU" usecase. A single map will not #' be paralleled, only in case of multiple `$map`(s) in the query these can be run in parallel. @@ -697,7 +697,7 @@ construct_ProtoExprArray = function(...) { #' variable of the R session. But all R maps in the query sequentially share the same main R #' session. Any native polars computations can still be executed meanwhile. In #' `in_background = TRUE` the map will run in one or more other R sessions and will not have access -#' to global variables. Use `pl$set_global_rpool_cap(4)` and `pl$get_global_rpool_cap()` to see and +#' to global variables. Use `pl$set_options(rpool_cap = 4)` and `pl$options$rpool_cap` to see and #' view number of parallel R sessions. #' @name Expr_map #' @examples @@ -716,9 +716,9 @@ construct_ProtoExprArray = function(...) { #' )$collect() |> system.time() #' #' # map in parallel 1: Overhead to start up extra R processes / sessions -#' pl$set_global_rpool_cap(0) # drop any previous processes, just to show start-up overhead -#' pl$set_global_rpool_cap(4) # set back to 4, the default -#' pl$get_global_rpool_cap() +#' pl$set_options(rpool_cap = 0) # drop any previous processes, just to show start-up overhead +#' pl$set_options(rpool_cap = 4) # set back to 4, the default +#' pl$options$rpool_cap #' pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( #' pl$all()$map(\(s) { #' Sys.sleep(.5) @@ -727,7 +727,7 @@ construct_ProtoExprArray = function(...) { #' )$collect() |> system.time() #' #' # map in parallel 2: Reuse R processes in "polars global_rpool". -#' pl$get_global_rpool_cap() +#' pl$options$rpool_cap #' pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( #' pl$all()$map(\(s) { #' Sys.sleep(.5) @@ -759,7 +759,7 @@ Expr_map = function(f, output_type = NULL, agg_list = FALSE, in_background = FAL #' @param allow_fail_eval bool (default FALSE), if TRUE will not raise user function error #' but convert result to a polars Null and carry on. #' @param in_background Boolean. Whether to execute the map in a background R process. Combined wit -#' setting e.g. `pl$set_global_rpool_cap(4)` it can speed up some slow R functions as they can run +#' setting e.g. `pl$set_options(rpool_cap = 4)` it can speed up some slow R functions as they can run #' in parallel R sessions. The communication speed between processes is quite slower than between #' threads. Will likely only give a speed-up in a "low IO - high CPU" usecase. A single map will not #' be paralleled, only in case of multiple `$map`(s) in the query these can be run in parallel. @@ -865,9 +865,9 @@ Expr_map = function(f, output_type = NULL, agg_list = FALSE, in_background = FAL #' )$collect() |> system.time() #' #' # map in parallel 1: Overhead to start up extra R processes / sessions -#' pl$set_global_rpool_cap(0) # drop any previous processes, just to show start-up overhead here -#' pl$set_global_rpool_cap(4) # set back to 4, the default -#' pl$get_global_rpool_cap() +#' pl$set_options(rpool_cap = 0) # drop any previous processes, just to show start-up overhead here +#' pl$set_options(rpool_cap = 4) # set back to 4, the default +#' pl$options$rpool_cap #' pl$LazyFrame(iris)$groupby("Species")$agg( #' pl$all()$apply(\(s) { #' Sys.sleep(.1) @@ -876,7 +876,7 @@ Expr_map = function(f, output_type = NULL, agg_list = FALSE, in_background = FAL #' )$collect() |> system.time() #' #' # map in parallel 2: Reuse R processes in "polars global_rpool". -#' pl$get_global_rpool_cap() +#' pl$options$rpool_cap #' pl$LazyFrame(iris)$groupby("Species")$agg( #' pl$all()$apply(\(s) { #' Sys.sleep(.1) @@ -929,7 +929,6 @@ Expr_apply = function(f, return_type = NULL, strict_return_type = TRUE, allow_fa #' #' # vectors to literal implicitly #' (pl$lit(2) + 1:4) / 4:1 - Expr_lit = function(x) { # use .call reduces eval from 22us to 15us, not a bottle-next anyways .Call(wrap__Expr__lit, x) |> @@ -953,7 +952,6 @@ Expr_lit = function(x) { #' pl$col("mpg")$suffix("_foo"), #' pl$col("cyl", "drat")$suffix("_bar") #' ) - Expr_suffix = function(suffix) { .pr$Expr$suffix(self, suffix) } @@ -975,7 +973,6 @@ Expr_suffix = function(suffix) { #' pl$col("mpg")$prefix("foo_"), #' pl$col("cyl", "drat")$prefix("bar_") #' ) - Expr_prefix = function(prefix) { .pr$Expr$prefix(self, prefix) } @@ -987,7 +984,6 @@ Expr_prefix = function(prefix) { #' @name Expr_reverse #' @examples #' pl$DataFrame(list(a = 1:5))$select(pl$col("a")$reverse()) - Expr_reverse = function() { .pr$Expr$reverse(self) } diff --git a/R/expr__meta.R b/R/expr__meta.R index 76a6e7d8d..d88f7026c 100644 --- a/R/expr__meta.R +++ b/R/expr__meta.R @@ -166,9 +166,8 @@ ExprMeta_is_regex_projection = function() { #' @examples #' my_expr = (pl$col("foo") * pl$col("bar"))$sum()$over(pl$col("ham")) / 2 #' my_expr$meta$tree_format() - ExprMeta_tree_format = function(return_as_string = FALSE) { - out <- .pr$Expr$meta_tree_format(self) |> + out = .pr$Expr$meta_tree_format(self) |> unwrap("in $tree_format():") if (isTRUE(return_as_string)) { out diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index 89fcc6af6..b60928d9f 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -11,18 +11,6 @@ #' @useDynLib polars, .registration = TRUE NULL -rlazy_csv_reader <- function(path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates) .Call(wrap__rlazy_csv_reader, path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates) - -import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, memmap) .Call(wrap__import_arrow_ipc, path, n_rows, cache, rechunk, row_name, row_count, memmap) - -new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory) - -concat_df <- function(vdf) .Call(wrap__concat_df, vdf) - -hor_concat_df <- function(dfs) .Call(wrap__hor_concat_df, dfs) - -diag_concat_df <- function(dfs) .Call(wrap__diag_concat_df, dfs) - min_exprs <- function(exprs) .Call(wrap__min_exprs, exprs) max_exprs <- function(exprs) .Call(wrap__max_exprs, exprs) @@ -75,6 +63,20 @@ test_wrong_call_pl_lit <- function(robj) .Call(wrap__test_wrong_call_pl_lit, rob polars_features <- function() .Call(wrap__polars_features) +concat_lf <- function(l, rechunk, parallel, to_supertypes) .Call(wrap__concat_lf, l, rechunk, parallel, to_supertypes) + +diag_concat_lf <- function(l, rechunk, parallel) .Call(wrap__diag_concat_lf, l, rechunk, parallel) + +hor_concat_df <- function(l) .Call(wrap__hor_concat_df, l) + +concat_series <- function(l, rechunk, to_supertypes) .Call(wrap__concat_series, l, rechunk, to_supertypes) + +rlazy_csv_reader <- function(path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates) .Call(wrap__rlazy_csv_reader, path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates) + +import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, memmap) .Call(wrap__import_arrow_ipc, path, n_rows, cache, rechunk, row_name, row_count, memmap) + +new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory) + test_rpolarserr <- function() .Call(wrap__test_rpolarserr) setup_renv <- function() .Call(wrap__setup_renv) @@ -111,6 +113,10 @@ DataFrame <- new.env(parent = emptyenv()) DataFrame$shape <- function() .Call(wrap__DataFrame__shape, self) +DataFrame$n_chunks <- function(strategy) .Call(wrap__DataFrame__n_chunks, self, strategy) + +DataFrame$rechunk <- function() .Call(wrap__DataFrame__rechunk, self) + DataFrame$clone_see_me_macro <- function() .Call(wrap__DataFrame__clone_see_me_macro, self) DataFrame$default <- function() .Call(wrap__DataFrame__default) @@ -923,6 +929,8 @@ Expr$meta_tree_format <- function() .Call(wrap__Expr__meta_tree_format, self) Expr$cat_set_ordering <- function(ordering) .Call(wrap__Expr__cat_set_ordering, self, ordering) +Expr$cat_get_categories <- function() .Call(wrap__Expr__cat_get_categories, self) + Expr$new_count <- function() .Call(wrap__Expr__new_count) Expr$new_first <- function() .Call(wrap__Expr__new_first) diff --git a/R/functions__eager.R b/R/functions__eager.R index 7e98573c8..d925d8582 100644 --- a/R/functions__eager.R +++ b/R/functions__eager.R @@ -1,13 +1,24 @@ #' Concat polars objects #' @name pl_concat -#' @param l list of DataFrame, or Series, LazyFrame or Expr -#' @param rechunk perform a rechunk at last -#' @param how choice of bind direction "vertical"(rbind) "horizontal"(cbind) "diagonal" diagonally -#' @param parallel BOOL default TRUE, only used for LazyFrames +#' @param ... Either individual unpacked args or args wrapped in list(). Args can +#' be eager as DataFrame, Series and R vectors, or lazy as LazyFrame and Expr. +#' The first element determines the output of `$concat()`: if the first element +#' is lazy, a LazyFrame is returned; otherwise, a DataFrame is returned (note +#' that if the first element is eager, all other elements have to be eager to +#' avoid implicit collect). +#' @param rechunk Perform a rechunk at last. +#' @param how Bind direction. Can be "vertical" (like `rbind()`), "horizontal" +#' (like `cbind()`), or "diagonal". +#' @param parallel Only used for LazyFrames. If `TRUE` (default), lazy +#' computations may be executed in parallel. +#' @param to_supertypes If `TRUE` (default), cast columns shared super types, if +#' any. For example, if we try to vertically concatenate two columns of types `i32` +#' and `f64`, the column of type `i32` will be cast to `f64` beforehand. This +#' argument is equivalent to the "_relaxed" operations in Python polars. #' #' @details -#' Categorical columns/Series must have been constructed while global string cache enabled -#' [`pl$enable_string_cache()`][pl_enable_string_cache] +#' Categorical columns/Series must have been constructed while global string +#' cache enabled. See [`pl$enable_string_cache()`][pl_enable_string_cache]. #' #' #' @return DataFrame, or Series, LazyFrame or Expr @@ -23,7 +34,6 @@ #' }) #' pl$concat(l_ver, how = "vertical") #' -#' #' # horizontal #' l_hor = lapply(1:10, function(i) { #' l_internal = list( @@ -34,44 +44,116 @@ #' pl$DataFrame(l_internal) #' }) #' pl$concat(l_hor, how = "horizontal") +#' #' # diagonal #' pl$concat(l_hor, how = "diagonal") +#' +#' # if two columns don't share the same type, concat() will error unless we use +#' # `to_supertypes = TRUE`: +#' test = pl$DataFrame(x = 1L) # i32 +#' test2 = pl$DataFrame(x = 1.0) #f64 +#' +#' pl$concat(test, test2, to_supertypes = TRUE) pl$concat = function( - l, # list of DataFrames or Series or lazyFrames or expr + ..., # list of DataFrames or Series or lazyFrames or expr rechunk = TRUE, how = c("vertical", "horizontal", "diagonal"), - parallel = TRUE # not used yet - ) { + parallel = TRUE, + to_supertypes = FALSE) { + # unpack arg list + l = unpack_list(..., skip_classes = "data.frame") + + # nothing becomes NULL + if (length(l) == 0L) { + return(NULL) + } + ## Check inputs - how = match.arg(how[1L], c("vertical", "horizontal", "diagonal")) + how_args = c("vertical", "horizontal", "diagonal") # , "vertical_relaxed", "diangonal_relaxed") + + how = match.arg(how[1L], how_args) |> + result() |> + unwrap("in pl$concat()") - # dispatch on item class and how first = l[[1L]] - result = pcase( - inherits(first, "DataFrame"), + eager = !inherits(first, "LazyFrame") + args_modified = names(as.list(sys.call()[-1L])) + + # check not using any mixing of types which could lead to implicit collect + if (eager) { + for (i in seq_along(l)) { + if (inherits(l[[i]], c("LazyFrame", "Expr"))) { + .pr$RPolarsErr$new()$ + plain("tip: explicitly collect lazy inputs first, e.g. pl$concat(dataframe, lazyframe$collect())")$ + plain("LazyFrame or Expr not allowed if first arg is a DataFrame, to avoid implicit collect")$ + bad_robj(l[[i]])$ + bad_arg(paste("of those to concatenate, number", i)) |> + Err() |> + unwrap("in pl$concat()") + } + } + } + + # dispatch on item class and how + Result_out = pcase( + how == "vertical" && (inherits(first, "Series") || is.vector(first)), { - vdf = l_to_vdf(l) - pcase( - how == "vertical", concat_df(vdf), - how == "diagonal", diag_concat_df(vdf), - how == "horizontal", hor_concat_df(vdf), - or_else = stopf("Internal error") - ) + if (any(args_modified %in% c("parallel"))) { + warning( + "in pl$concat(): argument `parallel` is not used when concatenating Series", + call. = FALSE + ) + } + concat_series(l, rechunk, to_supertypes) }, - inherits(first, "Series"), + how == "vertical", + concat_lf(l, rechunk, parallel, to_supertypes), + how == "diagonal", { - stopf("not implemented Series") + if (any(args_modified %in% c("to_supertypes"))) { + warning( + "Argument `to_supertypes` is not used when how=='diagonal'", + call. = FALSE + ) + } + diag_concat_lf(l, rechunk, parallel) }, - inherits(first, "Expr"), + how == "horizontal" && !eager, { - stopf("not implemented Expr") + Err_plain( + "how=='horizontal' is not supported for lazy (first element is LazyFrame).", + "Try e.g. $join() to get Lazy join or pl$concat(lf1$collect(), lf2, lf3).", + "to get a eager horizontal concatenation" + ) + }, + how == "horizontal", + { + if (any(args_modified %in% c("parallel", "to_supertypes"))) { + warning( + "Arguments `parallel`, `rechunk`, `eager` and `to_supertypes` are not used when how=='horizontal'", + call. = FALSE + ) + } + hor_concat_df(l) }, # TODO implement Series, Expr, Lazy etc - or_else = stopf(paste0("type of first list element: '", class(first), "' is not supported")) + or_else = Err_plain("internal error:", how, "not handled") ) - unwrap(result) + # convert back from lazy if eager + and_then(Result_out, \(x) { + pcase( + # run-time assertion for future changes + inherits(x, "DataFrame") && !eager, Err_plain("internal logical error in pl$concat()"), + + # must collect as in rust side only lazy concat is implemented. Eager inputs are wrapped in + # lazy and then collected again. This does not mean any user input is collected. + inherits(x, "LazyFrame") && eager, Ok(x$collect()), + or_else = Ok(x) + ) + }) |> + unwrap("in pl$concat()") } @@ -136,8 +218,7 @@ pl$date_range = function( name = NULL, # : str | None = None, time_unit = "us", time_zone = NULL, # : str | None = None - explode = TRUE - ) { + explode = TRUE) { if (missing(end)) { end = start interval = "1h" diff --git a/R/lazyframe__lazy.R b/R/lazyframe__lazy.R index 223feb709..6b556b001 100644 --- a/R/lazyframe__lazy.R +++ b/R/lazyframe__lazy.R @@ -146,7 +146,6 @@ LazyFrame #' iris, #' schema = list(Sepal.Length = pl$Float32, Species = pl$Utf8) #' )$collect() - pl$LazyFrame = function(...) { pl$DataFrame(...)$lazy() } @@ -1326,10 +1325,10 @@ LazyFrame_clone = function() { #' b = c("one", "two", "three", "four", "five"), #' c = 6:10 #' )$ -#' select( -#' pl$col("b")$to_struct(), -#' pl$col("a", "c")$to_struct()$alias("a_and_c") -#' ) +#' select( +#' pl$col("b")$to_struct(), +#' pl$col("a", "c")$to_struct()$alias("a_and_c") +#' ) #' lf$collect() #' #' # by default, all struct columns are unnested @@ -1337,10 +1336,9 @@ LazyFrame_clone = function() { #' #' # we can specify specific columns to unnest #' lf$unnest("a_and_c")$collect() - LazyFrame_unnest = function(names = NULL) { if (is.null(names)) { - names <- names(which(dtypes_are_struct(.pr$LazyFrame$schema(self)$ok))) + names = names(which(dtypes_are_struct(.pr$LazyFrame$schema(self)$ok))) } unwrap(.pr$LazyFrame$unnest(self, names), "in $unnest():") } diff --git a/R/options.R b/R/options.R index a659ae686..f954a4315 100644 --- a/R/options.R +++ b/R/options.R @@ -8,22 +8,25 @@ polars_optreq = list() # Requirements will be used to validate inputs passed in pl$set_options() polars_optenv$strictly_immutable = TRUE -polars_optreq$strictly_immutable = list(is_bool) +polars_optreq$strictly_immutable = list(must_be_bool = is_bool) polars_optenv$no_messages = FALSE -polars_optreq$no_messages = list(is_bool) +polars_optreq$no_messages = list(must_be_bool = is_bool) polars_optenv$do_not_repeat_call = FALSE -polars_optreq$do_not_repeat_call = list(is_bool) +polars_optreq$do_not_repeat_call = list(must_be_bool = is_bool) polars_optenv$maintain_order = FALSE -polars_optreq$maintain_order = list(is_bool) +polars_optreq$maintain_order = list(must_be_bool = is_bool) polars_optenv$debug_polars = FALSE -polars_optreq$debug_polars = list(is_bool) +polars_optreq$debug_polars = list(must_be_bool = is_bool) + +#polars_optenv$rpool_cap # active binding for getting value, not for +polars_optreq$rpool_cap = list() # rust-side options already check args -## END OF DEFINED OPTIONS +## END OF DEFINED OPTIONS #' Set polars options @@ -40,16 +43,37 @@ polars_optreq$debug_polars = list(is_bool) #' messages. The default (`FALSE`) is to show them. #' @param debug_polars Print additional information to debug Polars. #' @param no_messages Hide messages. +#' @param rpool_cap The maximum number of R sessions that can be used to process +#' R code in the background. See Details. #' #' @rdname polars_options #' @name set_options #' #' @docType NULL #' +#' @details +#' `pl$options$rpool_active` indicates the number of R sessions already +#' spawned in pool. `pl$options$rpool_cap` indicates the maximum number of new R +#' sessions that can be spawned. Anytime a polars thread worker needs a background +#' R session specifically to run R code embedded in a query via +#' `$map(..., in_background = TRUE)` or `$apply(..., in_background = TRUE)`, it +#' will obtain any R session idling in rpool, or spawn a new R session (process) +#' and add it to the rpool if `rpool_cap` is not already reached. If `rpool_cap` +#' is already reached, the thread worker will sleep until an R session is idling. +#' +#' Background R sessions communicate via polars arrow IPC (series/vectors) or R +#' serialize + shared memory buffers via the rust crate `ipc-channel`. +#' Multi-process communication has overhead because all data must be +#' serialized/de-serialized and sent via buffers. Using multiple R sessions +#' will likely only give a speed-up in a `low io - high cpu` scenario. Native +#' polars query syntax runs in threads and have no overhead. +#' #' @return #' `pl$options` returns a named list with the value (`TRUE` or `FALSE`) of #' each option. +#' #' `pl$set_options()` silently modifies the options values. +#' #' `pl$reset_options()` silently resets the options to their default values. #' #' @examples @@ -64,15 +88,13 @@ polars_optreq$debug_polars = list(is_bool) #' #' # reset options to their default value #' pl$reset_options() - pl$set_options = function( strictly_immutable = TRUE, maintain_order = FALSE, do_not_repeat_call = FALSE, debug_polars = FALSE, - no_messages = FALSE - ) { - + no_messages = FALSE, + rpool_cap = 4) { # only modify arguments that were explicitly written in the function call # (otherwise calling set_options() twice in a row would reset the args # modified in the first call) @@ -82,15 +104,32 @@ pl$set_options = function( value = get(args_modified[i]) # each argument has its own input requirements - validation <- c() + validation = c() for (fun in seq_along(polars_optreq[[args_modified[i]]])) { - validation[fun] <- do.call(polars_optreq[[args_modified[i]]][[fun]], list(value)) + validation[fun] = do.call( + polars_optreq[[args_modified[i]]][[fun]], + list(value) + ) } + names(validation) = names(polars_optreq[[args_modified[i]]]) if (!all(validation)) { - stop(paste0("Incorrect input for argument `", args_modified, "`.\n")) + failures = names(which(!validation)) + failures = translate_failures(failures) + err = .pr$RPolarsErr$new() + {for(fail in failures) err = err$plain(fail)} + err$ + bad_robj(value)$ + bad_arg(args_modified[i]) |> + Err() |> + unwrap("in pl$set_options") } - assign(args_modified[i], value, envir = polars_optenv) + assign(args_modified[i], value, envir = polars_optenv) |> + result() |> + map_err(\(err) err$bad_arg(args_modified[i])) |> + unwrap("in pl$set_options") |> + invisible() + } } @@ -103,9 +142,21 @@ pl$reset_options = function() { assign("do_not_repeat_call", FALSE, envir = polars_optenv) assign("debug_polars", FALSE, envir = polars_optenv) assign("no_messages", FALSE, envir = polars_optenv) + assign("rpool_cap", 4, envir = polars_optenv) } +translate_failures = \(x) { + lookups = c( + "must_be_scalar" = "Input must be of length one.", + "must_be_integer" = "Input must be an integer.", + "must_be_bool" = "Input must be TRUE or FALSE" + ) + trans = lookups[x] + trans[is.na(trans)] = x[is.na(trans)] + unname(trans) +} + #' internal keeping of state at runtime #' @name polars_runtime_flags @@ -146,7 +197,6 @@ subtimer_ms = function(cap_name = NULL, cap = 9999) { #' @examples #' pl$enable_string_cache(TRUE) #' pl$using_string_cache() - pl$enable_string_cache = function(toggle) { enable_string_cache(toggle) |> unwrap("in pl$enable_string_cache()") |> @@ -170,7 +220,6 @@ pl$enable_string_cache = function(toggle) { #' pl$using_string_cache() #' pl$enable_string_cache(FALSE) #' pl$using_string_cache() - pl$using_string_cache = function() { using_string_cache() } @@ -194,7 +243,6 @@ pl$using_string_cache = function() { #' df2 = pl$DataFrame(tail(iris, 2)) #' }) #' pl$concat(list(df1, df2)) - pl$with_string_cache = function(expr) { increment_string_cache_counter(TRUE) on.exit(increment_string_cache_counter(FALSE)) @@ -203,9 +251,8 @@ pl$with_string_cache = function(expr) { - -#' Get/set global R session pool capacity -#' +#' Get/set global R session pool capacity (DEPRECATED) +#' @description Deprecated. Use pl$options to get, and pl$set_options() to set. #' @name global_rpool_cap #' @param n Integer, the capacity limit R sessions to process R code. #' @@ -236,12 +283,20 @@ pl$with_string_cache = function(expr) { #' pl$get_global_rpool_cap() #' pl$set_global_rpool_cap(default$capacity) pl$get_global_rpool_cap = function() { + warning( + "in pl$get_global_rpool_cap(): Deprecated. Use pl$options$rpool_cap instead.", + .Call = NULL + ) get_global_rpool_cap() |> unwrap() } #' @rdname global_rpool_cap #' @name set_global_rpool_cap pl$set_global_rpool_cap = function(n) { + warning( + "in pl$get_global_rpool_cap(): Deprecated. Use pl$set_options(rpool_cap = ?) instead.", + .Call = NULL + ) set_global_rpool_cap(n) |> unwrap() |> invisible() diff --git a/R/rbackground.R b/R/rbackground.R index 2417d41aa..8f222000c 100644 --- a/R/rbackground.R +++ b/R/rbackground.R @@ -49,7 +49,7 @@ print.RThreadHandle = function(x, ...) as.character(x) |> cat("\n") #' NOTICE: #' The background thread cannot use the main R session, but can access the pool of extra R sessions #' to process R code embedded in polars query via `$map(...,background = TRUE)` or -#' `$apply(background=TRUE)`. Use [`pl$set_global_rpool_cap()`][global_rpool_cap] to limit number of +#' `$apply(background=TRUE)`. Use [`pl$set_options(rpool_cap = XX)`][set_options] to limit number of #' parallel R sessions. #' Starting polars [`$collect_in_background()`][LazyFrame_collect_in_background] with #' e.g. some `$map(...,background = FALSE)` will raise an Error as the main R session is not diff --git a/R/series__series.R b/R/series__series.R index c06c97e8b..62f121ca8 100644 --- a/R/series__series.R +++ b/R/series__series.R @@ -759,7 +759,7 @@ Series_dtype = method_as_property(function() { #' @keywords Series #' @return DataType #' @aliases Series_flags -#' @name Series_dtype +#' @name Series_flags #' @details property sorted flags are not settable, use set_sorted #' @examples #' pl$Series(1:4)$sort()$flags diff --git a/R/utils.R b/R/utils.R index 93f9982a7..cd1fe86fb 100644 --- a/R/utils.R +++ b/R/utils.R @@ -93,6 +93,7 @@ list2 = list #' Internal unpack list #' @noRd #' @param l any list +#' @param skip_classes char vec, do not unpack list inherits skip_classes. #' @details py-polars syntax only allows e.g. `df.select([expr1, expr2,])` and not #' `df.select(expr1, expr2,)`. r-polars also allows user to directly write #' `df$select(expr1, expr2)` or `df$select(list(expr1,expr2))`. Unpack list @@ -103,9 +104,13 @@ list2 = list #' f = \(...) unpack_list(list(...)) #' identical(f(list(1L, 2L, 3L)), f(1L, 2L, 3L)) # is TRUE #' identical(f(list(1L, 2L), 3L), f(1L, 2L, 3L)) # is FALSE -unpack_list = function(...) { +unpack_list = function(..., skip_classes = NULL) { l = list2(...) - if (length(l) == 1L && is.list(l[[1L]])) { + if ( + length(l) == 1L && + is.list(l[[1L]]) && + !(!is.null(skip_classes) && inherits(l[[1L]], skip_classes)) + ) { l[[1L]] } else { l diff --git a/R/zzz.R b/R/zzz.R index fb291ef7f..3d2bad0a3 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -143,12 +143,26 @@ pl$mem_address = mem_address # create the binding for options on loading, otherwise its values are frozen # to what the default values were at build time + makeActiveBinding("options", \() as.list(polars_optenv), env = pl) makeActiveBinding( - "options", - function() { - as.list(polars_optenv) - }, - env = pl + "rpool_cap", + \(arg) { + if(missing(arg)) { + unwrap(get_global_rpool_cap())$capacity + } else { + unwrap(set_global_rpool_cap(arg)) + } + }, env = polars_optenv + ) + makeActiveBinding( + "rpool_active", + \(arg) { + if(missing(arg)) { + unwrap(get_global_rpool_cap())$active + } else { + unwrap(stop("internal error: polars_optenv$rpool_active cannot be set directly")) + } + }, env = polars_optenv ) setup_renv() diff --git a/inst/misc/benchmark_rbackground.R b/inst/misc/benchmark_rbackground.R index 4070b8d62..d43e19896 100644 --- a/inst/misc/benchmark_rbackground.R +++ b/inst/misc/benchmark_rbackground.R @@ -16,7 +16,7 @@ time_print = \(expr, name) { # many io's - low bitrate - low cpu print("test 1a - sequential") library(polars) -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) regular_lf = pl$LazyFrame(data.frame(val = 1:1e1)) long_map_fg = pl$col("val") long_map_bg = pl$col("val") @@ -41,7 +41,7 @@ long_compute_bg$collect_in_background()$join() |> time_print("- 1a +io %bitrate # many io's - high bitrate - low cpu print("test 1b - sequential") library(polars) -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) regular_lf = pl$LazyFrame(data.frame(val = 1:2e6)) long_map_fg = pl$col("val") long_map_bg = pl$col("val") @@ -65,7 +65,7 @@ long_compute_bg$collect_in_background()$join() |> time_print("- 1b -io +bitrate # low io - med bitrate - high cpu print("test 2a - sequential") library(polars) -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) regular_lf = pl$LazyFrame(data.frame(val = 1:1e5)) long_map_fg = pl$col("val") long_map_bg = pl$col("val") @@ -104,17 +104,17 @@ f_sum_all_cols = \(lf, ...) lf$select(pl$all()$map(\(x) { f_sum_all_cols(lf)$collect() |> time_print("- 3a +io %bitrate %cpu foreground") -pl$set_global_rpool_cap(8) +pl$set_options(rpool_cap = 8) f_sum_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3a +io %bitrate %cpu pool=8 background burn-in ") # burn-in start processes f_sum_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3a +io %bitrate %cpu pool=8 background") -pl$set_global_rpool_cap(4) +pl$set_options(rpool_cap = 4) f_sum_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3a +io %bitrate %cpu pool=4 background") -pl$set_global_rpool_cap(2) +pl$set_options(rpool_cap = 2) f_sum_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3a +io %bitrate %cpu pool=2 background") -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) f_sum_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3a +io %bitrate %cpu pool=1 background") @@ -131,21 +131,21 @@ f_all_cols = \(lf, ...) lf$select(pl$all()$map(\(x) { f_all_cols(lf, in_background = FALSE)$collect() |> time_print("- 3b %io %bitrate +cpu foreground") -pl$set_global_rpool_cap(8) +pl$set_options(rpool_cap = 8) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3b %io %bitrate +cpu pool=8 background burn-in ") f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3b %io %bitrate +cpu pool=8 background") -pl$set_global_rpool_cap(6) +pl$set_options(rpool_cap = 6) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3b %io %bitrate +cpu pool=6 background") -pl$set_global_rpool_cap(4) +pl$set_options(rpool_cap = 4) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3b %io %bitrate +cpu pool=4 background") -pl$set_global_rpool_cap(2) +pl$set_options(rpool_cap = 2) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3b %io %bitrate +cpu pool=2 background") -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3b %io %bitrate +cpu pool=1 background") @@ -161,21 +161,21 @@ f_all_cols = \(lf, ...) lf$select(pl$all()$map(\(x) { f_all_cols(lf, in_background = FALSE)$collect() |> time_print("- 3c %io +bitrate +cpu foreground ") -pl$set_global_rpool_cap(8) +pl$set_options(rpool_cap = 8) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3c %io +bitrate +cpu pool=8 background burn-in ") f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3c %io +bitrate +cpu pool=8 background") -pl$set_global_rpool_cap(6) +pl$set_options(rpool_cap = 6) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3c %io +bitrate +cpu pool=6 background") -pl$set_global_rpool_cap(4) +pl$set_options(rpool_cap = 4) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3c %io +bitrate +cpu pool=4 background") -pl$set_global_rpool_cap(2) +pl$set_options(rpool_cap = 2) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3c %io +bitrate +cpu pool=2 background") -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3c %io +bitrate +cpu pool=1 background") @@ -190,19 +190,19 @@ f_all_cols = \(lf, ...) lf$select(pl$all()$map(\(x) { f_all_cols(lf, in_background = FALSE)$collect() |> time_print("- 3d %io +bitrate +cpu foreground ") -pl$set_global_rpool_cap(8) +pl$set_options(rpool_cap = 8) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3d %io +bitrate +cpu pool=8 background burn-in ") f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3d %io +bitrate +cpu pool=8 background") -pl$set_global_rpool_cap(6) +pl$set_options(rpool_cap = 6) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3d %io +bitrate +cpu pool=6 background") -pl$set_global_rpool_cap(4) +pl$set_options(rpool_cap = 4) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3d %io +bitrate +cpu pool=4 background") -pl$set_global_rpool_cap(2) +pl$set_options(rpool_cap = 2) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3d %io +bitrate +cpu pool=2 background") -pl$set_global_rpool_cap(1) +pl$set_options(rpool_cap = 1) f_all_cols(lf, in_background = TRUE)$collect() |> time_print("- 3d %io +bitrate +cpu pool=1 background") diff --git a/man/DataFrame_n_chunks.Rd b/man/DataFrame_n_chunks.Rd new file mode 100644 index 000000000..4fe2e50cc --- /dev/null +++ b/man/DataFrame_n_chunks.Rd @@ -0,0 +1,77 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataframe__frame.R +\name{DataFrame_n_chunks} +\alias{DataFrame_n_chunks} +\title{Number of chunks of the Series in a DataFrame} +\usage{ +DataFrame_n_chunks(strategy = "all") +} +\arguments{ +\item{strategy}{Either \code{"all"} or \code{"first"}. \code{"first"} only returns chunks +for the first Series.} +} +\value{ +A real vector of chunk counts per Series. +} +\description{ +Number of chunks (memory allocations) for all or first Series in a DataFrame. +} +\details{ +A DataFrame is a vector of Series. Each Series in rust-polars is a wrapper +around a ChunkedArray, which is like a virtual contiguous vector physically +backed by an ordered set of chunks. Each chunk of values has a contiguous +memory layout and is an arrow array. Arrow arrays are a fast, thread-safe and +cross-platform memory layout. + +In R, combining with \code{c()} or \code{rbind()} requires immediate vector re-allocation +to place vector values in contiguous memory. This is slow and memory consuming, +and it is why repeatedly appending to a vector in R is discouraged. + +In polars, when we concatenate or append to Series or DataFrame, the +re-allocation can be avoided or delayed by simply appending chunks to each +individual Series. However, if chunks become many and small or are misaligned +across Series, this can hurt the performance of subsequent operations. + +Most places in the polars api where chunking could occur, the user have to +typically actively opt-out by setting an argument \code{rechunk = FALSE}. +} +\examples{ +# create DataFrame with misaligned chunks +df = pl$concat( + 1:10, # single chunk + pl$concat(1:5, 1:5, rechunk = FALSE, how = "vertical")$rename("b"), # two chunks + how = "horizontal" +) +df +df$n_chunks() + +# rechunk a chunked DataFrame +df$rechunk()$n_chunks() + +# rechunk is not an in-place operation +df$n_chunks() + +# The following toy example emulates the Series "chunkyness" in R. Here it a +# S3-classed list with same type of vectors and where have all relevant S3 +# generics implemented to make behave as if it was a regular vector. +"+.chunked_vector" = \(x, y) structure(list(unlist(x) + unlist(y)), class = "chunked_vector") +print.chunked_vector = \(x, ...) print(unlist(x), ...) +c.chunked_vector = \(...) { + structure(do.call(c, lapply(list(...), unclass)), class = "chunked_vector") +} +rechunk = \(x) structure(unlist(x), class = "chunked_vector") +x = structure(list(1:4, 5L), class = "chunked_vector") +x +x + 5:1 +lapply(x, tracemem) # trace chunks to verify no re-allocation +z = c(x, x) +z # looks like a plain vector +lapply(z, tracemem) # mem allocation in z are the same from x +str(z) +z = rechunk(z) +str(z) +} +\seealso{ +\code{\link[=DataFrame_rechunk]{$rechunk()}} +} +\keyword{DataFrame} diff --git a/man/DataFrame_rechunk.Rd b/man/DataFrame_rechunk.Rd new file mode 100644 index 000000000..25606064c --- /dev/null +++ b/man/DataFrame_rechunk.Rd @@ -0,0 +1,74 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dataframe__frame.R +\name{DataFrame_rechunk} +\alias{DataFrame_rechunk} +\title{Rechunk a DataFrame} +\usage{ +DataFrame_rechunk() +} +\value{ +A DataFrame +} +\description{ +Rechunking re-allocates any "chunked" memory allocations to +speed-up e.g. vectorized operations. +} +\details{ +A DataFrame is a vector of Series. Each Series in rust-polars is a wrapper +around a ChunkedArray, which is like a virtual contiguous vector physically +backed by an ordered set of chunks. Each chunk of values has a contiguous +memory layout and is an arrow array. Arrow arrays are a fast, thread-safe and +cross-platform memory layout. + +In R, combining with \code{c()} or \code{rbind()} requires immediate vector re-allocation +to place vector values in contiguous memory. This is slow and memory consuming, +and it is why repeatedly appending to a vector in R is discouraged. + +In polars, when we concatenate or append to Series or DataFrame, the +re-allocation can be avoided or delayed by simply appending chunks to each +individual Series. However, if chunks become many and small or are misaligned +across Series, this can hurt the performance of subsequent operations. + +Most places in the polars api where chunking could occur, the user have to +typically actively opt-out by setting an argument \code{rechunk = FALSE}. +} +\examples{ +# create DataFrame with misaligned chunks +df = pl$concat( + 1:10, # single chunk + pl$concat(1:5, 1:5, rechunk = FALSE, how = "vertical")$rename("b"), # two chunks + how = "horizontal" +) +df +df$n_chunks() + +# rechunk a chunked DataFrame +df$rechunk()$n_chunks() + +# rechunk is not an in-place operation +df$n_chunks() + +# The following toy example emulates the Series "chunkyness" in R. Here it a +# S3-classed list with same type of vectors and where have all relevant S3 +# generics implemented to make behave as if it was a regular vector. +"+.chunked_vector" = \(x, y) structure(list(unlist(x) + unlist(y)), class = "chunked_vector") +print.chunked_vector = \(x, ...) print(unlist(x), ...) +c.chunked_vector = \(...) { + structure(do.call(c, lapply(list(...), unclass)), class = "chunked_vector") +} +rechunk = \(x) structure(unlist(x), class = "chunked_vector") +x = structure(list(1:4, 5L), class = "chunked_vector") +x +x + 5:1 +lapply(x, tracemem) # trace chunks to verify no re-allocation +z = c(x, x) +z # looks like a plain vector +lapply(z, tracemem) # mem allocation in z are the same from x +str(z) +z = rechunk(z) +str(z) +} +\seealso{ +\code{\link[=DataFrame_n_chunks]{$n_chunks()}} +} +\keyword{DataFrame} diff --git a/man/ExprCat_get_categories.Rd b/man/ExprCat_get_categories.Rd new file mode 100644 index 000000000..f39c7c009 --- /dev/null +++ b/man/ExprCat_get_categories.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/expr__categorical.R +\name{ExprCat_get_categories} +\alias{ExprCat_get_categories} +\title{Get the categories stored in this data type} +\value{ +A polars DataFrame with the categories for each categorical Series. +} +\description{ +Get the categories stored in this data type +} +\examples{ +df = pl$DataFrame( + cats = factor(c("z", "z", "k", "a", "b")), + vals = factor(c(3, 1, 2, 2, 3)) +) +df + +df$select( + pl$col("cats")$cat$get_categories() +) +df$select( + pl$col("vals")$cat$get_categories() +) +} +\keyword{ExprCat} diff --git a/man/ExprCat_set_ordering.Rd b/man/ExprCat_set_ordering.Rd index 0f92d762b..157827cd3 100644 --- a/man/ExprCat_set_ordering.Rd +++ b/man/ExprCat_set_ordering.Rd @@ -20,10 +20,10 @@ Determine how this categorical series should be sorted. } \examples{ df = pl$DataFrame( - cats = c("z", "z", "k", "a", "b"), + cats = factor(c("z", "z", "k", "a", "b")), vals = c(3, 1, 2, 2, 3) )$with_columns( - pl$col("cats")$cast(pl$Categorical)$cat$set_ordering("physical") + pl$col("cats")$cat$set_ordering("physical") ) df$select(pl$all()$sort()) } diff --git a/man/Expr_apply.Rd b/man/Expr_apply.Rd index e2ba1c7f8..bf85cf374 100644 --- a/man/Expr_apply.Rd +++ b/man/Expr_apply.Rd @@ -24,7 +24,7 @@ if FALSE will convert to a Polars Null and carry on.} but convert result to a polars Null and carry on.} \item{in_background}{Boolean. Whether to execute the map in a background R process. Combined wit -setting e.g. \code{pl$set_global_rpool_cap(4)} it can speed up some slow R functions as they can run +setting e.g. \code{pl$set_options(rpool_cap = 4)} it can speed up some slow R functions as they can run in parallel R sessions. The communication speed between processes is quite slower than between threads. Will likely only give a speed-up in a "low IO - high CPU" usecase. A single map will not be paralleled, only in case of multiple \verb{$map}(s) in the query these can be run in parallel.} @@ -134,9 +134,9 @@ pl$LazyFrame(iris)$groupby("Species")$agg( )$collect() |> system.time() # map in parallel 1: Overhead to start up extra R processes / sessions -pl$set_global_rpool_cap(0) # drop any previous processes, just to show start-up overhead here -pl$set_global_rpool_cap(4) # set back to 4, the default -pl$get_global_rpool_cap() +pl$set_options(rpool_cap = 0) # drop any previous processes, just to show start-up overhead here +pl$set_options(rpool_cap = 4) # set back to 4, the default +pl$options$rpool_cap pl$LazyFrame(iris)$groupby("Species")$agg( pl$all()$apply(\(s) { Sys.sleep(.1) @@ -145,7 +145,7 @@ pl$LazyFrame(iris)$groupby("Species")$agg( )$collect() |> system.time() # map in parallel 2: Reuse R processes in "polars global_rpool". -pl$get_global_rpool_cap() +pl$options$rpool_cap pl$LazyFrame(iris)$groupby("Species")$agg( pl$all()$apply(\(s) { Sys.sleep(.1) diff --git a/man/Expr_map.Rd b/man/Expr_map.Rd index cd8636c37..c1dd44d40 100644 --- a/man/Expr_map.Rd +++ b/man/Expr_map.Rd @@ -15,8 +15,8 @@ could theoretically have some downstream implications to the query.} \item{agg_list}{Aggregate list. Map from vector to group in groupby context.} -\item{in_background}{Boolean. Whether to execute the map in a background R process. Combined wit -setting e.g. \code{pl$set_global_rpool_cap(4)} it can speed up some slow R functions as they can run +\item{in_background}{Boolean. Whether to execute the map in a background R process. Combined with +setting e.g. \code{pl$set_options(rpool_cap = 4)} it can speed up some slow R functions as they can run in parallel R sessions. The communication speed between processes is quite slower than between threads. Will likely only give a speed-up in a "low IO - high CPU" usecase. A single map will not be paralleled, only in case of multiple \verb{$map}(s) in the query these can be run in parallel.} @@ -35,7 +35,7 @@ Map fully supports \code{browser()}. If \code{in_background = FALSE} the functio variable of the R session. But all R maps in the query sequentially share the same main R session. Any native polars computations can still be executed meanwhile. In \code{in_background = TRUE} the map will run in one or more other R sessions and will not have access -to global variables. Use \code{pl$set_global_rpool_cap(4)} and \code{pl$get_global_rpool_cap()} to see and +to global variables. Use \code{pl$set_options(rpool_cap = 4)} and \code{pl$options$rpool_cap} to see and view number of parallel R sessions. } \examples{ @@ -54,9 +54,9 @@ pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( )$collect() |> system.time() # map in parallel 1: Overhead to start up extra R processes / sessions -pl$set_global_rpool_cap(0) # drop any previous processes, just to show start-up overhead -pl$set_global_rpool_cap(4) # set back to 4, the default -pl$get_global_rpool_cap() +pl$set_options(rpool_cap = 0) # drop any previous processes, just to show start-up overhead +pl$set_options(rpool_cap = 4) # set back to 4, the default +pl$options$rpool_cap pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( pl$all()$map(\(s) { Sys.sleep(.5) @@ -65,7 +65,7 @@ pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( )$collect() |> system.time() # map in parallel 2: Reuse R processes in "polars global_rpool". -pl$get_global_rpool_cap() +pl$options$rpool_cap pl$LazyFrame(a = 1, b = 2, c = 3, d = 4)$select( pl$all()$map(\(s) { Sys.sleep(.5) diff --git a/man/LazyFrame_unnest.Rd b/man/LazyFrame_unnest.Rd index d8414d15a..0f884bbdb 100644 --- a/man/LazyFrame_unnest.Rd +++ b/man/LazyFrame_unnest.Rd @@ -23,10 +23,10 @@ lf = pl$LazyFrame( b = c("one", "two", "three", "four", "five"), c = 6:10 )$ - select( - pl$col("b")$to_struct(), - pl$col("a", "c")$to_struct()$alias("a_and_c") - ) + select( + pl$col("b")$to_struct(), + pl$col("a", "c")$to_struct()$alias("a_and_c") +) lf$collect() # by default, all struct columns are unnested diff --git a/man/RThreadHandle_RThreadHandle_class.Rd b/man/RThreadHandle_RThreadHandle_class.Rd index 2ef35666b..95a7bb512 100644 --- a/man/RThreadHandle_RThreadHandle_class.Rd +++ b/man/RThreadHandle_RThreadHandle_class.Rd @@ -20,7 +20,7 @@ query detached from the R session and return an \code{RThreadHandle} immediately NOTICE: The background thread cannot use the main R session, but can access the pool of extra R sessions to process R code embedded in polars query via \verb{$map(...,background = TRUE)} or -\verb{$apply(background=TRUE)}. Use \code{\link[=global_rpool_cap]{pl$set_global_rpool_cap()}} to limit number of +\verb{$apply(background=TRUE)}. Use \code{\link[=set_options]{pl$set_options(rpool_cap = XX)}} to limit number of parallel R sessions. Starting polars \code{\link[=LazyFrame_collect_in_background]{$collect_in_background()}} with e.g. some \verb{$map(...,background = FALSE)} will raise an Error as the main R session is not diff --git a/man/Series_dtype.Rd b/man/Series_dtype.Rd index a29fa59a8..8d591f6e6 100644 --- a/man/Series_dtype.Rd +++ b/man/Series_dtype.Rd @@ -2,30 +2,19 @@ % Please edit documentation in R/series__series.R \name{Series_dtype} \alias{Series_dtype} -\alias{Series_flags} \title{Get data type of Series} \usage{ Series_dtype() - -Series_flags() } \value{ -DataType - DataType } \description{ Get data type of Series - -Get data type of Series -} -\details{ -property sorted flags are not settable, use set_sorted } \examples{ pl$Series(1:4)$dtype pl$Series(c(1, 2))$dtype pl$Series(letters)$dtype -pl$Series(1:4)$sort()$flags } \keyword{Series} diff --git a/man/Series_flags.Rd b/man/Series_flags.Rd new file mode 100644 index 000000000..d909d2c6a --- /dev/null +++ b/man/Series_flags.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/series__series.R +\name{Series_flags} +\alias{Series_flags} +\title{Get data type of Series} +\usage{ +Series_flags() +} +\value{ +DataType +} +\description{ +Get data type of Series +} +\details{ +property sorted flags are not settable, use set_sorted +} +\examples{ +pl$Series(1:4)$sort()$flags +} +\keyword{Series} diff --git a/man/global_rpool_cap.Rd b/man/global_rpool_cap.Rd index f8e0c587e..b99b4bbb6 100644 --- a/man/global_rpool_cap.Rd +++ b/man/global_rpool_cap.Rd @@ -3,7 +3,7 @@ \name{global_rpool_cap} \alias{global_rpool_cap} \alias{set_global_rpool_cap} -\title{Get/set global R session pool capacity} +\title{Get/set global R session pool capacity (DEPRECATED)} \arguments{ \item{n}{Integer, the capacity limit R sessions to process R code.} } @@ -19,7 +19,7 @@ is not already reached. If \code{capacity} is already reached, the thread worker will sleep until an R session is idling. } \description{ -Get/set global R session pool capacity +Deprecated. Use pl$options to get, and pl$set_options() to set. } \details{ Background R sessions communicate via polars arrow IPC (series/vectors) or R diff --git a/man/pl_concat.Rd b/man/pl_concat.Rd index 288a30d37..227bebb70 100644 --- a/man/pl_concat.Rd +++ b/man/pl_concat.Rd @@ -4,13 +4,25 @@ \alias{pl_concat} \title{Concat polars objects} \arguments{ -\item{l}{list of DataFrame, or Series, LazyFrame or Expr} +\item{...}{Either individual unpacked args or args wrapped in list(). Args can +be eager as DataFrame, Series and R vectors, or lazy as LazyFrame and Expr. +The first element determines the output of \verb{$concat()}: if the first element +is lazy, a LazyFrame is returned; otherwise, a DataFrame is returned (note +that if the first element is eager, all other elements have to be eager to +avoid implicit collect).} -\item{rechunk}{perform a rechunk at last} +\item{rechunk}{Perform a rechunk at last.} -\item{how}{choice of bind direction "vertical"(rbind) "horizontal"(cbind) "diagonal" diagonally} +\item{how}{Bind direction. Can be "vertical" (like \code{rbind()}), "horizontal" +(like \code{cbind()}), or "diagonal".} -\item{parallel}{BOOL default TRUE, only used for LazyFrames} +\item{parallel}{Only used for LazyFrames. If \code{TRUE} (default), lazy +computations may be executed in parallel.} + +\item{to_supertypes}{If \code{TRUE} (default), cast columns shared super types, if +any. For example, if we try to vertically concatenate two columns of types \code{i32} +and \code{f64}, the column of type \code{i32} will be cast to \code{f64} beforehand. This +argument is equivalent to the "_relaxed" operations in Python polars.} } \value{ DataFrame, or Series, LazyFrame or Expr @@ -19,8 +31,8 @@ DataFrame, or Series, LazyFrame or Expr Concat polars objects } \details{ -Categorical columns/Series must have been constructed while global string cache enabled -\code{\link[=pl_enable_string_cache]{pl$enable_string_cache()}} +Categorical columns/Series must have been constructed while global string +cache enabled. See \code{\link[=pl_enable_string_cache]{pl$enable_string_cache()}}. } \examples{ # vertical @@ -33,7 +45,6 @@ l_ver = lapply(1:10, function(i) { }) pl$concat(l_ver, how = "vertical") - # horizontal l_hor = lapply(1:10, function(i) { l_internal = list( @@ -44,6 +55,14 @@ l_hor = lapply(1:10, function(i) { pl$DataFrame(l_internal) }) pl$concat(l_hor, how = "horizontal") + # diagonal pl$concat(l_hor, how = "diagonal") + +# if two columns don't share the same type, concat() will error unless we use +# `to_supertypes = TRUE`: +test = pl$DataFrame(x = 1L) # i32 +test2 = pl$DataFrame(x = 1.0) #f64 + +pl$concat(test, test2, to_supertypes = TRUE) } diff --git a/man/polars_options.Rd b/man/polars_options.Rd index fc48a2208..ea3394215 100644 --- a/man/polars_options.Rd +++ b/man/polars_options.Rd @@ -18,17 +18,39 @@ messages. The default (\code{FALSE}) is to show them.} \item{debug_polars}{Print additional information to debug Polars.} \item{no_messages}{Hide messages.} + +\item{rpool_cap}{The maximum number of R sessions that can be used to process +R code in the background. See Details.} } \value{ \code{pl$options} returns a named list with the value (\code{TRUE} or \code{FALSE}) of each option. + \code{pl$set_options()} silently modifies the options values. + \code{pl$reset_options()} silently resets the options to their default values. } \description{ Get and set polars options. See sections "Value" and "Examples" below for more details. } +\details{ +\code{pl$options$rpool_active} indicates the number of R sessions already +spawned in pool. \code{pl$options$rpool_cap} indicates the maximum number of new R +sessions that can be spawned. Anytime a polars thread worker needs a background +R session specifically to run R code embedded in a query via +\verb{$map(..., in_background = TRUE)} or \verb{$apply(..., in_background = TRUE)}, it +will obtain any R session idling in rpool, or spawn a new R session (process) +and add it to the rpool if \code{rpool_cap} is not already reached. If \code{rpool_cap} +is already reached, the thread worker will sleep until an R session is idling. + +Background R sessions communicate via polars arrow IPC (series/vectors) or R +serialize + shared memory buffers via the rust crate \code{ipc-channel}. +Multi-process communication has overhead because all data must be +serialized/de-serialized and sent via buffers. Using multiple R sessions +will likely only give a speed-up in a \verb{low io - high cpu} scenario. Native +polars query syntax runs in threads and have no overhead. +} \examples{ pl$set_options(maintain_order = TRUE, strictly_immutable = FALSE) pl$options diff --git a/src/rust/src/concat.rs b/src/rust/src/concat.rs new file mode 100644 index 000000000..7ac7e873b --- /dev/null +++ b/src/rust/src/concat.rs @@ -0,0 +1,94 @@ +use crate::rdataframe::DataFrame; +use crate::robj_to; + +use crate::rdataframe::LazyFrame; +use crate::rpolarserr::*; +use crate::series::Series; +use extendr_api::prelude::*; +use polars::lazy::dsl; +use polars::prelude as pl; +use polars_core; +use polars_core::functions as pl_functions; +use std::result::Result; + +#[extendr] +fn concat_lf(l: Robj, rechunk: bool, parallel: bool, to_supertypes: bool) -> RResult { + let vlf = robj_to!(Vec, PLLazyFrame, l)?; + dsl::concat( + vlf, + pl::UnionArgs { + parallel, + rechunk, + to_supertypes, + }, + ) + .map_err(polars_to_rpolars_err) + .map(LazyFrame) +} + +#[extendr] +fn diag_concat_lf(l: Robj, rechunk: bool, parallel: bool) -> RResult { + let vlf = robj_to!(Vec, PLLazyFrame, l)?; + dsl::diag_concat_lf(vlf, rechunk, parallel) + .map_err(polars_to_rpolars_err) + .map(LazyFrame) +} + +#[extendr] +pub fn hor_concat_df(l: Robj) -> RResult { + let df_vec = robj_to!(Vec, PLDataFrame, l)?; + pl_functions::hor_concat_df(&df_vec) + .map_err(polars_to_rpolars_err) + .map(DataFrame) +} + +#[extendr] +pub fn concat_series(l: Robj, rechunk: Robj, to_supertypes: Robj) -> RResult { + let to_supertypes = robj_to!(bool, to_supertypes)?; + let mut s_vec = robj_to!(Vec, PLSeries, l)?; + + // find any common supertype and cast to it + if to_supertypes { + let shared_supertype: RResult> = s_vec + .iter() + .map(|s| s.dtype().clone()) + .fold(Ok(None), |acc, x| match acc { + Err(err) => Err(err), + Ok(None) => Ok(Some(x)), // first fold, acc is None, just us x, + Ok(Some(acc)) => polars_core::utils::get_supertype(&acc, &x) + .ok_or(RPolarsErr::new().plain("Series' have no common supertype".to_string())) + .map(|dt| Some(dt)), + }); + let shared_supertype = shared_supertype?.expect("cannot be None, unless empty s_vec"); + + for i in 0..s_vec.len() { + if *s_vec[i].dtype() != shared_supertype { + s_vec[i] = s_vec[i] + .cast(&shared_supertype) + .map_err(polars_to_rpolars_err)?; + }; + } + } + + let mut iter = s_vec.into_iter(); + let mut first_s = iter + .next() + .ok_or(RPolarsErr::new().plain("no series found to concatenate".into()))?; + for next_s in iter { + first_s.append(&next_s).map_err(polars_to_rpolars_err)?; + } + + if robj_to!(bool, rechunk)? { + Ok(first_s.rechunk().into()) + } else { + Ok(first_s.into()) + } +} + +extendr_module! { + mod concat; + fn concat_lf; + fn diag_concat_lf; + fn hor_concat_df; + fn concat_series; +} diff --git a/src/rust/src/lazy/dsl.rs b/src/rust/src/lazy/dsl.rs index 75a4797ff..c4cb95068 100644 --- a/src/rust/src/lazy/dsl.rs +++ b/src/rust/src/lazy/dsl.rs @@ -2313,7 +2313,6 @@ impl Expr { Ok(format!("{e}")) } - //the only cat ns function from dsl.rs fn cat_set_ordering(&self, ordering: Robj) -> Result { let ordering = robj_to!(Map, str, ordering, |s| { Ok(crate::rdatatype::new_categorical_ordering(s).map_err(Rctx::Plain)?) @@ -2321,6 +2320,10 @@ impl Expr { Ok(self.0.clone().cat().set_ordering(ordering).into()) } + fn cat_get_categories(&self) -> Expr { + self.0.clone().cat().get_categories().into() + } + // external expression function which typically starts a new expression chain // to avoid name space collisions in R, these static methods are not free functions // as in py-polars. prefix with new_ to not collide with other methods in class diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 35cf71fdf..798b7a84e 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -13,6 +13,7 @@ pub mod concurrent; pub mod lazy; pub mod arrow_interop; +pub mod concat; pub mod conversion; pub mod conversion_r_to_s; pub mod conversion_s_to_r; @@ -40,6 +41,8 @@ pub use crate::rbackground::RBGPOOL; // Macro to generate exports extendr_module! { mod polars; + use rlib; + use concat; use rdataframe; use rpolarserr; use rbackground; diff --git a/src/rust/src/rbackground.rs b/src/rust/src/rbackground.rs index 8d0be8e8a..9c1f1a2cc 100644 --- a/src/rust/src/rbackground.rs +++ b/src/rust/src/rbackground.rs @@ -396,7 +396,7 @@ impl RBackgroundPool { println!("lease fail cap <0 "); rerr() .plain("cannot run background R process with zero capacity") - .hint("try increase cap e.g. pl$set_global_rpool_cap(4)") + .hint("try increase cap e.g. pl$set_options(rpool_cap = 4)") } None if pool_guard.active < pool_guard.cap => { #[cfg(feature = "rpolars_debug_print")] @@ -521,7 +521,7 @@ pub fn set_global_rpool_cap(c: Robj) -> RResult<()> { pub fn get_global_rpool_cap() -> RResult { let pool_guard = RBGPOOL.0.lock()?; Ok(list!( - available = pool_guard.active, + active = pool_guard.active, capacity = pool_guard.cap )) } diff --git a/src/rust/src/rdataframe/mod.rs b/src/rust/src/rdataframe/mod.rs index 34c09ebac..b48e8f3b2 100644 --- a/src/rust/src/rdataframe/mod.rs +++ b/src/rust/src/rdataframe/mod.rs @@ -8,9 +8,8 @@ use crate::conversion_r_to_s::robjname2series; use crate::lazy; use crate::rdatatype; use crate::rdatatype::RPolarsDataType; -use crate::rlib; use crate::robj_to; -use crate::rpolarserr::{polars_to_rpolars_err, RResult}; +use crate::rpolarserr::*; use polars::prelude::{CsvWriter, QuoteStyle, SerWriter}; @@ -88,6 +87,29 @@ impl DataFrame { r!([shp.0, shp.1]) } + pub fn n_chunks(&self, strategy: Robj) -> RResult> { + let nchks: Vec<_> = self.0.iter().map(|s| s.n_chunks() as f64).collect(); + + match robj_to!(str, strategy)? { + "all" => Ok(nchks), + "first" => { + if nchks.is_empty() { + Ok(vec![]) + } else { + Ok(vec![nchks.into_iter().next().expect("has atleast len 1")]) + } + } + _ => { + Err(RPolarsErr::new() + .plain("strategy not recognized, neither 'all' or 'first'".into())) + } + } + } + + pub fn rechunk(&self) -> Self { + self.0.agg_chunks().into() + } + //renamed back to clone pub fn clone_see_me_macro(&self) -> DataFrame { self.clone() @@ -515,7 +537,7 @@ extendr_module! { use read_ipc; use read_parquet; use rdatatype; - use rlib; + impl DataFrame; impl VecDataFrame; } diff --git a/src/rust/src/rlib.rs b/src/rust/src/rlib.rs index 76ab78da4..116c4352f 100644 --- a/src/rust/src/rlib.rs +++ b/src/rust/src/rlib.rs @@ -2,61 +2,14 @@ use crate::lazy::dsl::Expr; use crate::lazy::dsl::ProtoExprArray; use crate::rdataframe::DataFrame; use crate::robj_to; - use crate::rpolarserr::{rdbg, RResult}; use crate::series::Series; -use crate::{rdataframe::VecDataFrame, utils::r_result_list}; use extendr_api::prelude::*; use polars::prelude as pl; use polars_core::functions as pl_functions; use std::result::Result; -#[extendr] -fn concat_df(vdf: &VecDataFrame) -> List { - //-> PyResult { - - use polars_core::error::PolarsResult; - use polars_core::utils::rayon::prelude::*; - - let identity_df = (*vdf.0.iter().peekable().peek().unwrap()) - .clone() - .slice(0, 0); - let rdfs: Vec> = - vdf.0.iter().map(|df| Ok(df.clone())).collect(); - let identity = || Ok(identity_df.clone()); - - let result = polars_core::POOL - .install(|| { - rdfs.into_par_iter() - .fold(identity, |acc: PolarsResult, df| { - let mut acc = acc?; - acc.vstack_mut(&df?)?; - Ok(acc) - }) - .reduce(identity, |acc, df| { - let mut acc = acc?; - acc.vstack_mut(&df?)?; - Ok(acc) - }) - }) - .map(DataFrame); - - r_result_list(result.map_err(|err| format!("{:?}", err))) -} - -#[extendr] -fn diag_concat_df(dfs: &VecDataFrame) -> List { - let df = pl_functions::diag_concat_df(&dfs.0[..]).map(DataFrame); - r_result_list(df.map_err(|err| format!("{:?}", err))) -} - -#[extendr] -pub fn hor_concat_df(dfs: &VecDataFrame) -> List { - let df = pl_functions::hor_concat_df(&dfs.0[..]).map(DataFrame); - r_result_list(df.map_err(|err| format!("{:?}", err))) -} - #[extendr] fn min_exprs(exprs: &ProtoExprArray) -> Expr { let exprs = exprs.to_vec("select"); @@ -278,9 +231,7 @@ fn polars_features() -> List { extendr_module! { mod rlib; - fn concat_df; - fn hor_concat_df; - fn diag_concat_df; + fn min_exprs; fn max_exprs; fn coalesce_exprs; diff --git a/src/rust/src/utils/mod.rs b/src/rust/src/utils/mod.rs index 70dfa5ce3..aa28bb6eb 100644 --- a/src/rust/src/utils/mod.rs +++ b/src/rust/src/utils/mod.rs @@ -3,16 +3,21 @@ pub mod extendr_concurrent; pub mod extendr_helpers; pub mod wrappers; +use crate::conversion_r_to_s::robjname2series; use crate::lazy::dsl::Expr; use crate::rdatatype::RPolarsDataType; -use crate::rpolarserr::{rdbg, rerr, RPolarsErr, RResult, WithRctx}; +use crate::rpolarserr::{polars_to_rpolars_err, rdbg, rerr, RPolarsErr, RResult, WithRctx}; +use crate::series::Series; use extendr_api::prelude::list; use std::any::type_name as tn; //use std::intrinsics::read_via_copy; use crate::lazy::dsl::robj_to_col; +use crate::rdataframe::{DataFrame, LazyFrame}; +use extendr_api::eval_string_with_params; use extendr_api::Attributes; use extendr_api::ExternalPtr; use extendr_api::Result as ExtendrResult; +use extendr_api::R; use polars::prelude as pl; //macro to translate polars NULLs and emulate R NA value of any type @@ -739,13 +744,78 @@ fn internal_rust_wrap_e(robj: Robj, str_to_lit: bool) -> RResult { } } -pub fn robj_to_lazyframe(robj: extendr_api::Robj) -> RResult { +pub fn robj_to_lazyframe(robj: extendr_api::Robj) -> RResult { let robj = unpack_r_result_list(robj)?; let rv = rdbg(&robj); - use crate::rdataframe::LazyFrame; - let res: Result, _> = robj.try_into(); - let ext_ldf = res.bad_val(rv).mistyped(tn::())?; - Ok(LazyFrame(ext_ldf.0.clone())) + + // closure to allow ?-convert extendr::Result to RResult + let res = || -> RResult { + match () { + // allow input as a DataFrame + _ if robj.inherits("DataFrame") => { + let extptr_df: ExternalPtr = robj.try_into()?; + Ok(extptr_df.lazy()) + } + _ if robj.inherits("LazyFrame") => { + let lf: ExternalPtr = robj.try_into()?; + let lf = LazyFrame(lf.0.clone()); + Ok(lf) + } + _ if robj.inherits("data.frame") => { + let df = unpack_r_eval(R!("polars:::result(pl$DataFrame({{robj}}))"))?; + let extptr_df: ExternalPtr = df.try_into()?; + Ok(extptr_df.lazy()) + } + _ => Ok(DataFrame::new_with_capacity(1) + .lazy() + .0 + .select(&[robj_to_rexpr(robj, true)?.0])) + .map(LazyFrame), + } + }(); + + res.bad_val(rv).mistyped(tn::()) +} + +pub fn robj_to_dataframe(robj: extendr_api::Robj) -> RResult { + let robj = unpack_r_result_list(robj)?; + let robj_clone = robj.clone(); + + // closure to allow ?-convert extendr::Result to RResult + let res = || -> RResult { + match () { + // allow input as a DataFrame + _ if robj.inherits("DataFrame") => { + let extptr_df: ExternalPtr = robj.try_into()?; + Ok(extptr_df.0.clone()) + } + _ if robj.inherits("data.frame") => { + let df = unpack_r_eval(R!("polars:::result(pl$DataFrame({{robj}}))"))?; + let extptr_df: ExternalPtr = df.try_into()?; + Ok(extptr_df.0.clone()) + } + _ => DataFrame::new_with_capacity(1) + .lazy() + .0 + .select(&[robj_to_rexpr(robj, true)?.0]) + .collect(), + } + .map(DataFrame) + .map_err(polars_to_rpolars_err) + }(); + + res.bad_val(rdbg(robj_clone)) + .plain("could not be converted into a DataFrame") +} + +pub fn robj_to_series(robj: extendr_api::Robj) -> RResult { + let robj = unpack_r_result_list(robj)?; + let robj_clone = robj.clone(); + robjname2series(robj, "") + .map(Series) + .map_err(polars_to_rpolars_err) + .bad_val(rdbg(robj_clone)) + .plain("could not be converted into a DataFrame") } pub fn list_expr_to_vec_pl_expr( @@ -848,6 +918,14 @@ macro_rules! robj_to_inner { $crate::utils::robj_to_binary_vec($a) }; + (Series, $a:ident) => { + $crate::utils::robj_to_series($a) + }; + + (PLSeries, $a:ident) => { + $crate::utils::robj_to_series($a).map(|ok| ok.0) + }; + (Expr, $a:ident) => { $crate::utils::robj_to_rexpr($a, true) }; @@ -895,6 +973,18 @@ macro_rules! robj_to_inner { $crate::utils::robj_to_lazyframe($a) }; + (PLLazyFrame, $a:ident) => { + $crate::utils::robj_to_lazyframe($a).map(|lf| lf.0) + }; + + (DataFrame, $a:ident) => { + $crate::utils::robj_to_dataframe($a) + }; + + (PLDataFrame, $a:ident) => { + $crate::utils::robj_to_dataframe($a).map(|lf| lf.0) + }; + (RArrow_schema, $a:ident) => { $crate::utils::robj_to_rarrow_schema($a) }; diff --git a/tests/testthat/test-concat.R b/tests/testthat/test-concat.R index f448a692f..90e028d89 100644 --- a/tests/testthat/test-concat.R +++ b/tests/testthat/test-concat.R @@ -1,20 +1,73 @@ test_that("concat dataframe", { - # vertical - l_ver = lapply(1:10, function(i) { + # mixing lazy with first eager not allowed + ctx = pl$concat(pl$DataFrame(mtcars), pl$LazyFrame(mtcars), how = "vertical") |> get_err_ctx() + expect_true(endsWith(ctx$BadArgument, "number 2")) + expect_true(endsWith(ctx$PlainErrorMessage, "avoid implicit collect")) + + ctx = pl$concat(pl$DataFrame(mtcars), mtcars$hp, pl$lit(mtcars$mpg), how = "horizontal") |> + get_err_ctx() + expect_true(endsWith(ctx$BadArgument, "number 3")) + expect_true(endsWith(ctx$PlainErrorMessage, "avoid implicit collect")) + + # mixing eager with first lazy is allowd + df_ref = rbind(mtcars, mtcars) + row.names(df_ref) = 1:64 + expect_identical( + pl$concat(pl$LazyFrame(mtcars), pl$DataFrame(mtcars), how = "vertical")$ + collect()$ + to_data_frame(), + df_ref + ) + + # vertical dfs + l_ver = lapply(1:3, function(i) { l_internal = list( a = 1:5, b = letters[1:5] ) pl$DataFrame(l_internal) }) + df_ver = pl$concat(l_ver, how = "vertical") expect_equal( df_ver$to_data_frame(), do.call(rbind, lapply(l_ver, function(df) df$to_data_frame())) ) + # unpack args allowed + df_ver_2 = pl$concat(l_ver[[1L]], l_ver[[2L]], l_ver[[3L]], how = "vertical") + expect_identical(df_ver$to_list(), df_ver_2$to_list()) + + # use supertypes + expect_identical( + pl$concat(l_ver[[1L]], pl$DataFrame(a = 2, b = 42L), how = "vertical", to_supertypes = TRUE)$to_list(), + pl$DataFrame(rbind(data.frame(a = 1:5, b = letters[1:5]), data.frame(a = 2, b = 42L)))$to_list() + ) + + # type 'relaxed' vertical concatenation is not allowed by default + expect_true( + pl$concat(l_ver[[1L]], pl$DataFrame(a = 2, b = 42L), how = "vertical") |> + get_err_ctx() |> + (\(ctx) ctx$PolarsError)() |> + grepl(pat = "dtypes for column", fixed = TRUE) + ) + + + # check lazy eager is identical + l_ver_lazy = lapply(l_ver, \(df) df$lazy()) + expect_identical( + pl$concat(l_ver_lazy)$collect()$to_list(), + pl$concat(l_ver)$to_list() + ) + + # check rechunk works + expect_identical(pl$concat(mtcars, mtcars, rechunk = TRUE)$n_chunks(), rep(1, 11)) + expect_identical(pl$concat(mtcars, mtcars, rechunk = FALSE)$n_chunks(), rep(2, 11)) + + + # horizontal - l_hor = lapply(1:10, function(i) { + l_hor = lapply(1:5, function(i) { l_internal = list( 1:5, letters[1:5] @@ -28,8 +81,27 @@ test_that("concat dataframe", { do.call(cbind, lapply(l_hor, function(df) df$to_data_frame())) ) - # diagonal + pl$concat(pl$LazyFrame(a = 1:3), how = "horizontal") |> + get_err_ctx("Plain") |> + startsWith("how=='horizontal' is not supported for lazy") |> + expect_true() + + # can concat Series + expect_identical( + pl$concat(1:5, pl$Series(5:1, "b"), how = "horizontal")$to_list(), + list(1:5, b = 5:1) + ) + + + # diagonal eager df_dia = pl$concat(l_hor, how = "diagonal") - expect_equal(df_dia$shape, c(50, 20)) - expect_equal(mean(is.na(df_dia$to_data_frame())), 9 / 10) + expect_equal(df_dia$shape, c(25, 10)) + expect_equal(mean(is.na(df_dia$to_data_frame())), 8 / 10) + + # diagonal lazy + lf_dia = pl$concat(l_hor |> lapply(pl$LazyFrame), how = "diagonal") + expect_identical( + lf_dia$collect()$to_list(), + df_dia$to_list() + ) }) diff --git a/tests/testthat/test-dataframe.R b/tests/testthat/test-dataframe.R index 1c292f7f9..4b159916c 100644 --- a/tests/testthat/test-dataframe.R +++ b/tests/testthat/test-dataframe.R @@ -158,7 +158,7 @@ test_that("DataFrame, custom schema", { # works fine if a variable is called "schema" expect_no_error( - pl$DataFrame(list(schema = 1), schema = list(schema = pl$Float32)) + pl$DataFrame(list(schema = 1), schema = list(schema = pl$Float32)) ) # errors if incorrect datatype expect_error(pl$DataFrame(x = 1, schema = list(schema = foo))) @@ -808,6 +808,24 @@ test_that("join_asof_simple", { ) }) +test_that("n_chunks", { + df = pl$concat( + 1:10, + pl$concat(1:5, 1:5, rechunk = FALSE, how = "vertical")$rename("b"), + how = "horizontal" + ) + + expect_identical(df$n_chunks(), c(1, 2)) + expect_identical(df$n_chunks("first"), c(1)) + expect_identical(pl$DataFrame()$n_chunks(), numeric()) + expect_identical(pl$DataFrame()$n_chunks("first"), numeric()) + + pl$DataFrame()$n_chunks("wrong strat") |> + get_err_ctx("Plain") |> + grepl(pat = "strategy") |> + expect_true() +}) + test_that("melt example", { df = pl$DataFrame( diff --git a/tests/testthat/test-expr_arr.R b/tests/testthat/test-expr_arr.R index e11138021..fc4d26c31 100644 --- a/tests/testthat/test-expr_arr.R +++ b/tests/testthat/test-expr_arr.R @@ -436,4 +436,3 @@ test_that("eval", { ) ) }) - diff --git a/tests/testthat/test-expr_categorical.R b/tests/testthat/test-expr_categorical.R index 38060b34e..fb1f1d431 100644 --- a/tests/testthat/test-expr_categorical.R +++ b/tests/testthat/test-expr_categorical.R @@ -10,3 +10,13 @@ test_that("set_ordering", { # https://github.com/pola-rs/polars/issues/6513 e$cat$set_ordering("lexical")$sort()$lit_to_s() }) + +test_that("get_categories", { + df = pl$DataFrame( + cats = factor(c("z", "z", "k", "a", "b")) + ) + expect_identical( + df$select(pl$col("cats")$cat$get_categories())$to_data_frame(), + data.frame(cats = c("z", "k", "a", "b")) + ) +}) diff --git a/tests/testthat/test-lazy.R b/tests/testthat/test-lazy.R index 8402c786a..b5688e81b 100644 --- a/tests/testthat/test-lazy.R +++ b/tests/testthat/test-lazy.R @@ -774,9 +774,9 @@ test_that("unnest", { df2 = df$ select( - pl$col("a", "b", "c")$to_struct()$alias("first_struct"), - pl$col("d", "e", "f")$to_struct()$alias("second_struct") - ) + pl$col("a", "b", "c")$to_struct()$alias("first_struct"), + pl$col("d", "e", "f")$to_struct()$alias("second_struct") + ) expect_identical( df2$unnest()$collect()$to_data_frame(), @@ -787,9 +787,9 @@ test_that("unnest", { df2$unnest("first_struct")$collect()$to_data_frame(), df$ select( - pl$col("a", "b", "c"), - pl$col("d", "e", "f")$to_struct()$alias("second_struct") - )$ + pl$col("a", "b", "c"), + pl$col("d", "e", "f")$to_struct()$alias("second_struct") + )$ collect()$ to_data_frame() ) diff --git a/tests/testthat/test-options.R b/tests/testthat/test-options.R index 4fbb439c1..b0e0f1bb4 100644 --- a/tests/testthat/test-options.R +++ b/tests/testthat/test-options.R @@ -24,15 +24,13 @@ test_that("pl$options$ read-write", { expect_true(pl$options$maintain_order) # set_options() only accepts booleans - expect_error( - pl$set_options(maintain_order = 42), - "Incorrect input" - ) + ctx = pl$set_options(maintain_order = 42) |> get_err_ctx() + expect_identical(ctx$BadArgument, "maintain_order") + expect_identical(ctx$PlainErrorMessage, "Input must be TRUE or FALSE") - expect_error( - pl$set_options(strictly_immutable = c(TRUE, TRUE)), - "Incorrect input" - ) + ctx = pl$set_options(strictly_immutable = c(TRUE, TRUE)) |> get_err_ctx() + expect_identical(ctx$BadArgument, "strictly_immutable") + expect_identical(ctx$PlainErrorMessage, "Input must be TRUE or FALSE") # reset_options() works pl$reset_options() diff --git a/tests/testthat/test-rbackground.R b/tests/testthat/test-rbackground.R index 7d06f986a..8e2a866c9 100644 --- a/tests/testthat/test-rbackground.R +++ b/tests/testthat/test-rbackground.R @@ -12,29 +12,35 @@ test_that("Test collecting LazyFrame in background", { test_that("Test using $map() in background", { # change capacity - pl$set_global_rpool_cap(0) - expect_equal(pl$get_global_rpool_cap(), list(available = 0, capacity = 0)) - pl$set_global_rpool_cap(1) - expect_equal(pl$get_global_rpool_cap(), list(available = 0, capacity = 1)) - + pl$set_options(rpool_cap = 0) + expect_equal(pl$options$rpool_cap, 0) + expect_equal(pl$options$rpool_active, 0) + pl$set_options(rpool_cap = 1) + expect_equal(pl$options$rpool_cap, 1) + expect_equal(pl$options$rpool_active, 0) compute = lf$select(pl$col("y")$map(\(x) x * x, in_background = FALSE)) - compute_bg = lf$select(pl$col("y")$map(\(x) x * x, in_background = TRUE)) + compute_bg = lf$select(pl$col("y")$map(\(x) { + Sys.sleep(2) + x * x + }, in_background = TRUE)) res_ref = compute$collect()$to_data_frame() # no process spawned yet - expect_equal(pl$get_global_rpool_cap(), list(available = 0, capacity = 1)) + expect_equal(pl$options$rpool_cap, 1) + expect_equal(pl$options$rpool_active, 0) # process was spawned res_fg_map_bg = compute_bg$collect()$to_data_frame() - expect_equal(pl$get_global_rpool_cap(), list(available = 1, capacity = 1)) + expect_equal(pl$options$rpool_cap, 1) + expect_equal(pl$options$rpool_active, 1) # same result expect_identical(res_ref, res_fg_map_bg) # cannot collect in background without a cap - pl$set_global_rpool_cap(0) + pl$set_options(rpool_cap = 0) handle = compute_bg$collect_in_background() res = result(handle$join()) expect_rpolarserr(unwrap(res), c("When", "Hint", "PlainErrorMessage")) @@ -43,7 +49,6 @@ test_that("Test using $map() in background", { "cannot run background R process with zero capacity" ) - # can print handle after exhausted handle |> as.character() |> @@ -55,3 +60,28 @@ test_that("Test using $map() in background", { # gives correct err message expect_rpolarserr(handle$join(), "Handled") }) + + +test_that("reset rpool_cap", { + pl$reset_options() + orig = pl$options$rpool_cap + pl$set_options(rpool_cap = orig + 1) + expect_different(pl$options$rpool_cap, orig) + pl$reset_options() + expect_identical(pl$options$rpool_cap, orig) +}) + + +test_that("rpool errors", { + + ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx() + expect_identical(ctx$BadArgument, "rpool_cap") + expect_true(startsWith(ctx$TypeMismatch,"i64")) + + ctx = pl$set_options(rpool_cap = -1) |> get_err_ctx() + expect_identical(ctx$ValueOutOfScope, "cannot be less than zero") + + ctx = {polars_optenv$rpool_active <- 0} |> get_err_ctx() + expect_true(endsWith(ctx$PlainErrorMessage,"rpool_active cannot be set directly")) + +}) diff --git a/tests/testthat/test-sink_stream.R b/tests/testthat/test-sink_stream.R index 1866f2acf..decdb3ac6 100644 --- a/tests/testthat/test-sink_stream.R +++ b/tests/testthat/test-sink_stream.R @@ -39,7 +39,7 @@ test_that("Test sinking data to parquet file", { collect()$ to_series() } - pl$set_global_rpool_cap(4) + pl$set_options(rpool_cap = 4) rdf_in_bg = pl$LazyFrame()$ select(pl$lit(tmpf)$map(f_ipc_to_s, in_background = TRUE))$ collect()$