From 4fa115675a4e5919c47c29f0120efb6d5944e8f1 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Thu, 9 Nov 2023 23:43:54 +0100 Subject: [PATCH 1/9] update scan_parquet --- R/parquet.R | 49 +++++++++--------- src/rust/src/rdataframe/read_parquet.rs | 68 ++++++++++--------------- src/rust/src/rdatatype.rs | 14 +++++ src/rust/src/utils/mod.rs | 4 ++ tests/testthat/test-parquet.R | 44 ++++++++++++++++ 5 files changed, 115 insertions(+), 64 deletions(-) create mode 100644 tests/testthat/test-parquet.R diff --git a/R/parquet.R b/R/parquet.R index 6c5a2da3e..b9e955958 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -32,28 +32,23 @@ #' file.path(temp_dir, "**/*.parquet") #' )$collect() pl$scan_parquet = function( - file, # : str | Path, - n_rows = NULL, # : int | None = None, - cache = TRUE, # : bool = True, + file, + n_rows = NULL, + cache = TRUE, parallel = c( "Auto", # default "None", - "Columns", # Parallelize over the row groups - "RowGroups" # Parallelize over the columns - ), # Automatically determine over which unit to parallelize, This will choose the most occurring unit. - rechunk = TRUE, # : bool = True, - row_count_name = NULL, # : str | None = None, - row_count_offset = 0L, # : int = 0, + "Columns", + "RowGroups" + ), + rechunk = TRUE, + row_count_name = NULL, + row_count_offset = 0L, # storage_options,#: dict[str, object] | None = None, #seems fsspec specific - low_memory = FALSE, # : bool = False, + low_memory = FALSE, hive_partitioning = TRUE) { #-> LazyFrame - parallel = parallel[1L] - if (!parallel %in% c("None", "Columns", "RowGroups", "Auto")) { - stop("unknown parallel strategy") - } - - result_lf = new_from_parquet( + new_from_parquet( path = file, n_rows = n_rows, cache = cache, @@ -63,9 +58,8 @@ pl$scan_parquet = function( row_count = row_count_offset, low_memory = low_memory, hive_partitioning = hive_partitioning - ) - - unwrap(result_lf) + ) |> + unwrap("in pl$scan_parquet(): ") } @@ -85,14 +79,23 @@ pl$read_parquet = function( file, n_rows = NULL, cache = TRUE, - parallel = c("Auto", "None", "Columns", "RowGroups"), + parallel = c( + "Auto", # default + "None", + "Columns", + "RowGroups" + ), rechunk = TRUE, row_count_name = NULL, row_count_offset = 0L, - low_memory = FALSE) { + # storage_options,#: dict[str, object] | None = None, #seems fsspec specific + low_memory = FALSE, + hive_partitioning = TRUE +) { mc = match.call() - mc[[1]] = get("pl", envir = asNamespace("polars"))$scan_parquet - eval.parent(mc)$collect() + mc[[1]] = pl$scan_parquet + result(eval(mc)$collect()) |> + unwrap("in pl$read_parquet(): ") } diff --git a/src/rust/src/rdataframe/read_parquet.rs b/src/rust/src/rdataframe/read_parquet.rs index 0c53d4701..08443193c 100644 --- a/src/rust/src/rdataframe/read_parquet.rs +++ b/src/rust/src/rdataframe/read_parquet.rs @@ -1,55 +1,41 @@ -use crate::utils::r_result_list; - use crate::lazy::dataframe::LazyFrame; +use crate::robj_to; +use crate::rpolarserr::{polars_to_rpolars_err, RResult}; -//use crate::utils::wrappers::*; -use crate::utils::wrappers::null_to_opt; -use extendr_api::{extendr, prelude::*}; +use extendr_api::Rinternals; +use extendr_api::{extendr, extendr_module, Robj}; +use polars::io::RowCount; use polars::prelude::{self as pl}; -//this function is derived from polars/py-polars/src/lazy/DataFrame.rs new_from_csv - #[allow(clippy::too_many_arguments)] #[extendr] pub fn new_from_parquet( - path: String, - n_rows: Nullable, - cache: bool, - parallel: String, //Wrap, - rechunk: bool, - row_name: Nullable, - row_count: u32, - low_memory: bool, - hive_partitioning: bool, -) -> List { - let parallel_strategy = match parallel { - x if x == "Auto" => pl::ParallelStrategy::Auto, - _ => panic!("not implemented"), - }; - - let row_name = null_to_opt(row_name); - - let row_count = row_name.map(|name| polars::io::RowCount { - name, - offset: row_count, - }); - let n_rows = null_to_opt(n_rows); - + path: Robj, + n_rows: Robj, + cache: Robj, + parallel: Robj, + rechunk: Robj, + row_name: Robj, + row_count: Robj, + low_memory: Robj, + hive_partitioning: Robj, +) -> RResult { + let offset = robj_to!(Option, u32, row_count)?.unwrap_or(0); + let opt_rowcount = robj_to!(Option, String, row_name)?.map(|name| RowCount { name, offset }); let args = pl::ScanArgsParquet { - n_rows: n_rows.map(|x| x as usize), - cache, - parallel: parallel_strategy, - rechunk, - row_count, - low_memory, + n_rows: robj_to!(Option, usize, n_rows)?, + cache: robj_to!(bool, cache)?, + parallel: robj_to!(ParallelStrategy, parallel)?, + rechunk: robj_to!(bool, rechunk)?, + row_count: opt_rowcount, + low_memory: robj_to!(bool, low_memory)?, cloud_options: None, //TODO implement cloud options use_statistics: true, //TODO expose use statistics - hive_partitioning, + hive_partitioning: robj_to!(bool, hive_partitioning)?, }; - let lf_result = pl::LazyFrame::scan_parquet(path, args) - .map_err(|x| x.to_string()) - .map(LazyFrame); - r_result_list(lf_result) + pl::LazyFrame::scan_parquet(robj_to!(String, path)?, args) + .map_err(polars_to_rpolars_err) + .map(LazyFrame) } extendr_module! { diff --git a/src/rust/src/rdatatype.rs b/src/rust/src/rdatatype.rs index 79756da7d..3793e016d 100644 --- a/src/rust/src/rdatatype.rs +++ b/src/rust/src/rdatatype.rs @@ -516,6 +516,20 @@ pub fn robj_to_closed_window(robj: Robj) -> RResult { } } +pub fn robj_to_parallel_strategy(robj: extendr_api::Robj) -> RResult { + use pl::ParallelStrategy as PS; + match robj_to_rchoice(robj)?.to_lowercase().as_str() { + //accept also lowercase as normal for most other enums + "auto" => Ok(PS::Auto), + "none" => Ok(PS::Auto), + "columns" => Ok(PS::Auto), + "rowgroups" => Ok(PS::Auto), + s => rerr().bad_val(format!( + "ParallelStrategy choice ['{s}'] should be one of 'Auto', 'None', 'Columns', 'RowGroups'" + )), + } +} + extendr_module! { mod rdatatype; impl RPolarsDataType; diff --git a/src/rust/src/utils/mod.rs b/src/rust/src/utils/mod.rs index bdc5e62f3..d41d43885 100644 --- a/src/rust/src/utils/mod.rs +++ b/src/rust/src/utils/mod.rs @@ -1059,6 +1059,10 @@ macro_rules! robj_to_inner { $crate::rdatatype::robj_to_join_type($a) }; + (ParallelStrategy, $a:ident) => { + $crate::rdatatype::robj_to_parallel_strategy($a) + }; + (PathBuf, $a:ident) => { $crate::utils::robj_to_pathbuf($a) }; diff --git a/tests/testthat/test-parquet.R b/tests/testthat/test-parquet.R new file mode 100644 index 000000000..4d7e64707 --- /dev/null +++ b/tests/testthat/test-parquet.R @@ -0,0 +1,44 @@ + +tmpf = tempfile() +on.exit(unlink(tmpf)) +lf_exp = pl$LazyFrame(mtcars) +lf_exp$sink_parquet(tmpf, compression = "snappy") +df_exp = lf_exp$collect()$to_data_frame() + +test_that("scan read parquet", { + + #simple scan + expect_identical( + pl$scan_parquet(tmpf)$collect()$to_data_frame(), + df_exp + ) + + # simple read + expect_identical( + pl$read_parquet(tmpf)$to_data_frame(), + df_exp + ) + + # with row count + expect_identical( + pl$read_parquet(tmpf, row_count_name = "rc",row_count_offset = 5)$to_data_frame(), + data.frame(rc = as.numeric(5:36), df_exp) + ) + + # check all parallel strategies work + for(choice in c("auto", "COLUMNS", "None", "rowGroups")) { + expect_identical( + pl$read_parquet(tmpf, parallel = choice)$to_data_frame(), + df_exp + ) + } + + # bad parallel args + ctx = pl$read_parquet(tmpf, parallel = "34") |> get_err_ctx() + expect_true(startsWith(ctx$BadValue, "ParallelStrategy choice")) + expect_identical(ctx$BadArgument, "parallel") + ctx = pl$read_parquet(tmpf, parallel = 42) |> get_err_ctx() + expect_identical(ctx$NotAChoice, "input is not a character vector") + + +}) From bf4b4105322c715d46cd8960b1bcb83f76b97b0c Mon Sep 17 00:00:00 2001 From: sorhawell Date: Thu, 9 Nov 2023 23:44:33 +0100 Subject: [PATCH 2/9] make fmt --- R/parquet.R | 3 +-- tests/testthat/test-parquet.R | 12 ++++-------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/R/parquet.R b/R/parquet.R index b9e955958..95ca5d98a 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -90,8 +90,7 @@ pl$read_parquet = function( row_count_offset = 0L, # storage_options,#: dict[str, object] | None = None, #seems fsspec specific low_memory = FALSE, - hive_partitioning = TRUE -) { + hive_partitioning = TRUE) { mc = match.call() mc[[1]] = pl$scan_parquet result(eval(mc)$collect()) |> diff --git a/tests/testthat/test-parquet.R b/tests/testthat/test-parquet.R index 4d7e64707..55e4db823 100644 --- a/tests/testthat/test-parquet.R +++ b/tests/testthat/test-parquet.R @@ -1,13 +1,11 @@ - tmpf = tempfile() on.exit(unlink(tmpf)) lf_exp = pl$LazyFrame(mtcars) lf_exp$sink_parquet(tmpf, compression = "snappy") -df_exp = lf_exp$collect()$to_data_frame() +df_exp = lf_exp$collect()$to_data_frame() test_that("scan read parquet", { - - #simple scan + # simple scan expect_identical( pl$scan_parquet(tmpf)$collect()$to_data_frame(), df_exp @@ -21,12 +19,12 @@ test_that("scan read parquet", { # with row count expect_identical( - pl$read_parquet(tmpf, row_count_name = "rc",row_count_offset = 5)$to_data_frame(), + pl$read_parquet(tmpf, row_count_name = "rc", row_count_offset = 5)$to_data_frame(), data.frame(rc = as.numeric(5:36), df_exp) ) # check all parallel strategies work - for(choice in c("auto", "COLUMNS", "None", "rowGroups")) { + for (choice in c("auto", "COLUMNS", "None", "rowGroups")) { expect_identical( pl$read_parquet(tmpf, parallel = choice)$to_data_frame(), df_exp @@ -39,6 +37,4 @@ test_that("scan read parquet", { expect_identical(ctx$BadArgument, "parallel") ctx = pl$read_parquet(tmpf, parallel = 42) |> get_err_ctx() expect_identical(ctx$NotAChoice, "input is not a character vector") - - }) From 1bbea330e00ec7538b471e05127b2fdcfc09d036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Havelund=20Welling?= Date: Sun, 12 Nov 2023 23:01:27 +0100 Subject: [PATCH 3/9] Update R/parquet.R Co-authored-by: eitsupi <50911393+eitsupi@users.noreply.github.com> --- R/parquet.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/parquet.R b/R/parquet.R index 95ca5d98a..070f22af0 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -92,7 +92,7 @@ pl$read_parquet = function( low_memory = FALSE, hive_partitioning = TRUE) { mc = match.call() - mc[[1]] = pl$scan_parquet + mc[[1]] = get("pl", envir = asNamespace("polars"))$scan_parquet result(eval(mc)$collect()) |> unwrap("in pl$read_parquet(): ") } From 96b7cc6a499da6583756cfd09d14554cea66dcd2 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Mon, 13 Nov 2023 00:17:47 +0100 Subject: [PATCH 4/9] do.call instead of eval + tst scan library(polars) --- R/parquet.R | 20 ++++--------- tests/testthat/test-parquet.R | 13 +++++---- tests/testthat/test-without_library_polars.R | 30 ++++++++++++++++++++ 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/R/parquet.R b/R/parquet.R index 070f22af0..b1aff305f 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -62,7 +62,6 @@ pl$scan_parquet = function( unwrap("in pl$scan_parquet(): ") } - #' Read a parquet file #' @rdname IO_read_parquet #' @param file string filepath @@ -91,19 +90,10 @@ pl$read_parquet = function( # storage_options,#: dict[str, object] | None = None, #seems fsspec specific low_memory = FALSE, hive_partitioning = TRUE) { - mc = match.call() - mc[[1]] = get("pl", envir = asNamespace("polars"))$scan_parquet - result(eval(mc)$collect()) |> + + args = as.list(environment()) + result({ + do.call(pl$scan_parquet, args)$collect() + }) |> unwrap("in pl$read_parquet(): ") } - - -# -# def _prepare_row_count_args( -# row_count_name: str | None = None, -# row_count_offset: int = 0, -# ) -> tuple[str, int] | None: -# if row_count_name is not None: -# return (row_count_name, row_count_offset) -# else: -# return None diff --git a/tests/testthat/test-parquet.R b/tests/testthat/test-parquet.R index 55e4db823..81d08ce0d 100644 --- a/tests/testthat/test-parquet.R +++ b/tests/testthat/test-parquet.R @@ -1,10 +1,13 @@ -tmpf = tempfile() -on.exit(unlink(tmpf)) -lf_exp = pl$LazyFrame(mtcars) -lf_exp$sink_parquet(tmpf, compression = "snappy") -df_exp = lf_exp$collect()$to_data_frame() + test_that("scan read parquet", { + + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + # simple scan expect_identical( pl$scan_parquet(tmpf)$collect()$to_data_frame(), diff --git a/tests/testthat/test-without_library_polars.R b/tests/testthat/test-without_library_polars.R index ace8876c6..e566a727d 100644 --- a/tests/testthat/test-without_library_polars.R +++ b/tests/testthat/test-without_library_polars.R @@ -20,4 +20,34 @@ test_that("without library(polars)", { # Positive-Negative control # This works because library(polars) puts polars in search() expect_true(polars:::test_wrong_call_pl_lit(42) |> polars:::is_ok()) + + +}) + + + + +test_that("scan read parquet from other process", { + + skip_if_not_installed("callr") + + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = polars::pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + + # simple scan + expect_identical( + callr::r(\(tmpf) polars::pl$scan_parquet(tmpf)$collect()$to_data_frame(), args = list(tmpf=tmpf)), + df_exp + ) + + # simple read + expect_identical( + callr::r(\(tmpf) polars::pl$read_parquet(tmpf)$to_data_frame(), args = list(tmpf=tmpf)), + df_exp + ) + }) + From 8113c5e8b83a291d8d7efd4292b5a31cdb86dc9c Mon Sep 17 00:00:00 2001 From: eitsupi Date: Mon, 13 Nov 2023 10:19:15 +0000 Subject: [PATCH 5/9] refactor: auto formatting --- R/parquet.R | 1 - tests/testthat/test-parquet.R | 3 --- tests/testthat/test-without_library_polars.R | 9 ++------- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/R/parquet.R b/R/parquet.R index b1aff305f..e1d7ad932 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -90,7 +90,6 @@ pl$read_parquet = function( # storage_options,#: dict[str, object] | None = None, #seems fsspec specific low_memory = FALSE, hive_partitioning = TRUE) { - args = as.list(environment()) result({ do.call(pl$scan_parquet, args)$collect() diff --git a/tests/testthat/test-parquet.R b/tests/testthat/test-parquet.R index 81d08ce0d..55113418a 100644 --- a/tests/testthat/test-parquet.R +++ b/tests/testthat/test-parquet.R @@ -1,7 +1,4 @@ - - test_that("scan read parquet", { - tmpf = tempfile() on.exit(unlink(tmpf)) lf_exp = pl$LazyFrame(mtcars) diff --git a/tests/testthat/test-without_library_polars.R b/tests/testthat/test-without_library_polars.R index e566a727d..cfbe5d75d 100644 --- a/tests/testthat/test-without_library_polars.R +++ b/tests/testthat/test-without_library_polars.R @@ -20,15 +20,12 @@ test_that("without library(polars)", { # Positive-Negative control # This works because library(polars) puts polars in search() expect_true(polars:::test_wrong_call_pl_lit(42) |> polars:::is_ok()) - - }) test_that("scan read parquet from other process", { - skip_if_not_installed("callr") tmpf = tempfile() @@ -39,15 +36,13 @@ test_that("scan read parquet from other process", { # simple scan expect_identical( - callr::r(\(tmpf) polars::pl$scan_parquet(tmpf)$collect()$to_data_frame(), args = list(tmpf=tmpf)), + callr::r(\(tmpf) polars::pl$scan_parquet(tmpf)$collect()$to_data_frame(), args = list(tmpf = tmpf)), df_exp ) # simple read expect_identical( - callr::r(\(tmpf) polars::pl$read_parquet(tmpf)$to_data_frame(), args = list(tmpf=tmpf)), + callr::r(\(tmpf) polars::pl$read_parquet(tmpf)$to_data_frame(), args = list(tmpf = tmpf)), df_exp ) - }) - From 1556e6faa3ebde7051a335247266b4a09cedd946 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Wed, 15 Nov 2023 01:30:39 +0100 Subject: [PATCH 6/9] split up utest --- tests/testthat/test-parquet.R | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/tests/testthat/test-parquet.R b/tests/testthat/test-parquet.R index 81d08ce0d..887de0238 100644 --- a/tests/testthat/test-parquet.R +++ b/tests/testthat/test-parquet.R @@ -1,7 +1,4 @@ - - -test_that("scan read parquet", { - +test_that("plain scan read parquet", { tmpf = tempfile() on.exit(unlink(tmpf)) lf_exp = pl$LazyFrame(mtcars) @@ -19,14 +16,36 @@ test_that("scan read parquet", { pl$read_parquet(tmpf)$to_data_frame(), df_exp ) +}) + + +test_that("scan read parquet - test arg rowcount", { + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + + expect_identical( + pl$scan_parquet(tmpf, row_count_name = "rc", row_count_offset = 5)$collect()$to_data_frame(), + data.frame(rc = as.numeric(5:36), df_exp) + ) - # with row count expect_identical( pl$read_parquet(tmpf, row_count_name = "rc", row_count_offset = 5)$to_data_frame(), data.frame(rc = as.numeric(5:36), df_exp) ) +}) + + +test_that("scan read parquet - parallel strategies", { + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() - # check all parallel strategies work + # check all parallel strategies produce same result for (choice in c("auto", "COLUMNS", "None", "rowGroups")) { expect_identical( pl$read_parquet(tmpf, parallel = choice)$to_data_frame(), @@ -40,4 +59,5 @@ test_that("scan read parquet", { expect_identical(ctx$BadArgument, "parallel") ctx = pl$read_parquet(tmpf, parallel = 42) |> get_err_ctx() expect_identical(ctx$NotAChoice, "input is not a character vector") + }) From d8aa72a8935673e7c7a94479b87b3390e4e8d04d Mon Sep 17 00:00:00 2001 From: sorhawell Date: Wed, 15 Nov 2023 02:32:57 +0100 Subject: [PATCH 7/9] support arg use_statistics + docs --- R/extendr-wrappers.R | 2 +- R/parquet.R | 58 ++++++++++++------------- man/IO_read_parquet.Rd | 26 +++++++++-- man/IO_scan_parquet.Rd | 9 ++-- src/rust/src/rdataframe/read_parquet.rs | 7 ++- 5 files changed, 63 insertions(+), 39 deletions(-) diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index cd9c02d18..9f8aef986 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -83,7 +83,7 @@ import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, new_from_ndjson <- function(path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset) .Call(wrap__new_from_ndjson, path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset) -new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning) +new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, use_statistics, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, use_statistics, low_memory, hive_partitioning) test_rpolarserr <- function() .Call(wrap__test_rpolarserr) diff --git a/R/parquet.R b/R/parquet.R index b1aff305f..daa28088e 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -3,14 +3,16 @@ #' #' @param file string filepath #' @param n_rows limit rows to scan -#' @param cache bool use cache +#' @param cache Boolean use cache #' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan. -#' @param rechunk bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now. +#' @param rechunk Boolean rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now. #' @param row_count_name NULL or string, if a string add a rowcount column named by this string #' @param row_count_offset integer, the rowcount column can be offset by this value -#' @param low_memory bool, try reduce memory footprint +#' @param low_memory Boolean, try reduce memory footprint #' @param hive_partitioning Infer statistics and schema from hive partitioned URL #' and use them to prune reads. +#' @param use_statistics Boolean, if TRUE use statistics in the parquet to determine if pages can be +#' skipped from reading. #' #' @return LazyFrame #' @name scan_parquet @@ -44,9 +46,10 @@ pl$scan_parquet = function( rechunk = TRUE, row_count_name = NULL, row_count_offset = 0L, - # storage_options,#: dict[str, object] | None = None, #seems fsspec specific low_memory = FALSE, - hive_partitioning = TRUE) { #-> LazyFrame + use_statistics = TRUE, + hive_partitioning = TRUE + ) { #-> LazyFrame new_from_parquet( path = file, @@ -56,7 +59,9 @@ pl$scan_parquet = function( rechunk = rechunk, row_name = row_count_name, row_count = row_count_offset, + #storage_options = storage_options, # not supported yet low_memory = low_memory, + use_statistics = use_statistics, hive_partitioning = hive_partitioning ) |> unwrap("in pl$scan_parquet(): ") @@ -64,32 +69,24 @@ pl$scan_parquet = function( #' Read a parquet file #' @rdname IO_read_parquet -#' @param file string filepath -#' @param n_rows limit rows to scan -#' @param cache bool use cache -#' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan. -#' @param rechunk bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now. -#' @param row_count_name NULL or string, if a string add a rowcount column named by this string -#' @param row_count_offset integer, the rowcount column can be offset by this value -#' @param low_memory bool, try reduce memory footprint +#' @inheritParams scan_parquet #' @return DataFrame -#' @name read_parquet -pl$read_parquet = function( - file, - n_rows = NULL, - cache = TRUE, - parallel = c( - "Auto", # default - "None", - "Columns", - "RowGroups" - ), - rechunk = TRUE, - row_count_name = NULL, - row_count_offset = 0L, - # storage_options,#: dict[str, object] | None = None, #seems fsspec specific - low_memory = FALSE, - hive_partitioning = TRUE) { +read_parquet = function( # remapped to pl$read_parquet, a hack to support roxygen2 @inheritsParams + file, + n_rows = NULL, + cache = TRUE, + parallel = c( + "Auto", # default + "None", + "Columns", + "RowGroups" + ), + rechunk = TRUE, + row_count_name = NULL, + row_count_offset = 0L, + low_memory = FALSE, + use_statistics = TRUE, + hive_partitioning = TRUE) { args = as.list(environment()) result({ @@ -97,3 +94,4 @@ pl$read_parquet = function( }) |> unwrap("in pl$read_parquet(): ") } +pl$read_parquet = read_parquet diff --git a/man/IO_read_parquet.Rd b/man/IO_read_parquet.Rd index 4355aced7..70f79e3e3 100644 --- a/man/IO_read_parquet.Rd +++ b/man/IO_read_parquet.Rd @@ -3,22 +3,42 @@ \name{read_parquet} \alias{read_parquet} \title{Read a parquet file} +\usage{ +read_parquet( + file, + n_rows = NULL, + cache = TRUE, + parallel = c("Auto", "None", "Columns", "RowGroups"), + rechunk = TRUE, + row_count_name = NULL, + row_count_offset = 0L, + low_memory = FALSE, + use_statistics = TRUE, + hive_partitioning = TRUE +) +} \arguments{ \item{file}{string filepath} \item{n_rows}{limit rows to scan} -\item{cache}{bool use cache} +\item{cache}{Boolean use cache} \item{parallel}{String either Auto, None, Columns or RowGroups. The way to parallelized the scan.} -\item{rechunk}{bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} +\item{rechunk}{Boolean rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} \item{row_count_name}{NULL or string, if a string add a rowcount column named by this string} \item{row_count_offset}{integer, the rowcount column can be offset by this value} -\item{low_memory}{bool, try reduce memory footprint} +\item{low_memory}{Boolean, try reduce memory footprint} + +\item{use_statistics}{Boolean, if TRUE use statistics in the parquet to determine if pages can be +skipped from reading.} + +\item{hive_partitioning}{Infer statistics and schema from hive partitioned URL +and use them to prune reads.} } \value{ DataFrame diff --git a/man/IO_scan_parquet.Rd b/man/IO_scan_parquet.Rd index 7e7a69d61..cbb0817a8 100644 --- a/man/IO_scan_parquet.Rd +++ b/man/IO_scan_parquet.Rd @@ -8,20 +8,23 @@ \item{n_rows}{limit rows to scan} -\item{cache}{bool use cache} +\item{cache}{Boolean use cache} \item{parallel}{String either Auto, None, Columns or RowGroups. The way to parallelized the scan.} -\item{rechunk}{bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} +\item{rechunk}{Boolean rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} \item{row_count_name}{NULL or string, if a string add a rowcount column named by this string} \item{row_count_offset}{integer, the rowcount column can be offset by this value} -\item{low_memory}{bool, try reduce memory footprint} +\item{low_memory}{Boolean, try reduce memory footprint} \item{hive_partitioning}{Infer statistics and schema from hive partitioned URL and use them to prune reads.} + +\item{use_statistics}{Boolean, if TRUE use statistics in the parquet to determine if pages can be +skipped from reading.} } \value{ LazyFrame diff --git a/src/rust/src/rdataframe/read_parquet.rs b/src/rust/src/rdataframe/read_parquet.rs index 08443193c..c351fcc78 100644 --- a/src/rust/src/rdataframe/read_parquet.rs +++ b/src/rust/src/rdataframe/read_parquet.rs @@ -16,8 +16,11 @@ pub fn new_from_parquet( rechunk: Robj, row_name: Robj, row_count: Robj, + //storage_options: Robj, // not supported yet, add provide features e.g. aws + use_statistics: Robj, low_memory: Robj, hive_partitioning: Robj, + //retries: Robj // not supported yet, with CloudOptions ) -> RResult { let offset = robj_to!(Option, u32, row_count)?.unwrap_or(0); let opt_rowcount = robj_to!(Option, String, row_name)?.map(|name| RowCount { name, offset }); @@ -28,8 +31,8 @@ pub fn new_from_parquet( rechunk: robj_to!(bool, rechunk)?, row_count: opt_rowcount, low_memory: robj_to!(bool, low_memory)?, - cloud_options: None, //TODO implement cloud options - use_statistics: true, //TODO expose use statistics + cloud_options: None, + use_statistics: robj_to!(bool, use_statistics)?, hive_partitioning: robj_to!(bool, hive_partitioning)?, }; From 7234e814345b4f30e28a3d5a40929a7368ad6efb Mon Sep 17 00:00:00 2001 From: Etienne Bacher <52219252+etiennebacher@users.noreply.github.com> Date: Wed, 15 Nov 2023 07:40:59 +0100 Subject: [PATCH 8/9] tweak docs --- R/parquet.R | 30 +++++++++++++++++------------- man/IO_read_parquet.Rd | 25 +++++++++++++++---------- man/IO_scan_parquet.Rd | 26 +++++++++++++++----------- 3 files changed, 47 insertions(+), 34 deletions(-) diff --git a/R/parquet.R b/R/parquet.R index 0cfab13b9..f9ff90cbb 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -1,18 +1,22 @@ #' Scan a parquet file -#' @keywords LazyFrame_new #' -#' @param file string filepath -#' @param n_rows limit rows to scan -#' @param cache Boolean use cache -#' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan. -#' @param rechunk Boolean rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now. -#' @param row_count_name NULL or string, if a string add a rowcount column named by this string -#' @param row_count_offset integer, the rowcount column can be offset by this value -#' @param low_memory Boolean, try reduce memory footprint +#' @param file Path to a file. You can use globbing with `*` to scan/read multiple +#' files in the same directory (see examples). +#' @param n_rows Maximum number of rows to read. +#' @param cache Cache the result after reading. +#' @param parallel This determines the direction of parallelism. `"auto"` will +#' try to determine the optimal direction. +#' @param rechunk In case of reading multiple files via a glob pattern, rechunk +#' the final DataFrame into contiguous memory chunks. +#' @param row_count_name If not `NULL`, this will insert a row count column with +#' the given name into the DataFrame. +#' @param row_count_offset Offset to start the row_count column (only used if +#' the name is set). +#' @param low_memory Reduce memory usage (will yield a lower performance). #' @param hive_partitioning Infer statistics and schema from hive partitioned URL #' and use them to prune reads. -#' @param use_statistics Boolean, if TRUE use statistics in the parquet to determine if pages can be -#' skipped from reading. +#' @param use_statistics Use statistics in the parquet file to determine if pages +#' can be skipped from reading. #' #' @return LazyFrame #' @name scan_parquet @@ -49,7 +53,7 @@ pl$scan_parquet = function( low_memory = FALSE, use_statistics = TRUE, hive_partitioning = TRUE - ) { #-> LazyFrame + ) { new_from_parquet( path = file, @@ -70,7 +74,7 @@ pl$scan_parquet = function( #' Read a parquet file #' @rdname IO_read_parquet #' @inheritParams scan_parquet -#' @return DataFrames +#' @return DataFrame read_parquet = function( # remapped to pl$read_parquet, a hack to support roxygen2 @inheritsParams file, n_rows = NULL, diff --git a/man/IO_read_parquet.Rd b/man/IO_read_parquet.Rd index 70f79e3e3..dd4a13858 100644 --- a/man/IO_read_parquet.Rd +++ b/man/IO_read_parquet.Rd @@ -18,24 +18,29 @@ read_parquet( ) } \arguments{ -\item{file}{string filepath} +\item{file}{Path to a file. You can use globbing with \code{*} to scan/read multiple +files in the same directory (see examples).} -\item{n_rows}{limit rows to scan} +\item{n_rows}{Maximum number of rows to read.} -\item{cache}{Boolean use cache} +\item{cache}{Cache the result after reading.} -\item{parallel}{String either Auto, None, Columns or RowGroups. The way to parallelized the scan.} +\item{parallel}{This determines the direction of parallelism. \code{"auto"} will +try to determine the optimal direction.} -\item{rechunk}{Boolean rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} +\item{rechunk}{In case of reading multiple files via a glob pattern, rechunk +the final DataFrame into contiguous memory chunks.} -\item{row_count_name}{NULL or string, if a string add a rowcount column named by this string} +\item{row_count_name}{If not \code{NULL}, this will insert a row count column with +the given name into the DataFrame.} -\item{row_count_offset}{integer, the rowcount column can be offset by this value} +\item{row_count_offset}{Offset to start the row_count column (only used if +the name is set).} -\item{low_memory}{Boolean, try reduce memory footprint} +\item{low_memory}{Reduce memory usage (will yield a lower performance).} -\item{use_statistics}{Boolean, if TRUE use statistics in the parquet to determine if pages can be -skipped from reading.} +\item{use_statistics}{Use statistics in the parquet file to determine if pages +can be skipped from reading.} \item{hive_partitioning}{Infer statistics and schema from hive partitioned URL and use them to prune reads.} diff --git a/man/IO_scan_parquet.Rd b/man/IO_scan_parquet.Rd index cbb0817a8..87f74d37a 100644 --- a/man/IO_scan_parquet.Rd +++ b/man/IO_scan_parquet.Rd @@ -4,27 +4,32 @@ \alias{scan_parquet} \title{Scan a parquet file} \arguments{ -\item{file}{string filepath} +\item{file}{Path to a file. You can use globbing with \code{*} to scan/read multiple +files in the same directory (see examples).} -\item{n_rows}{limit rows to scan} +\item{n_rows}{Maximum number of rows to read.} -\item{cache}{Boolean use cache} +\item{cache}{Cache the result after reading.} -\item{parallel}{String either Auto, None, Columns or RowGroups. The way to parallelized the scan.} +\item{parallel}{This determines the direction of parallelism. \code{"auto"} will +try to determine the optimal direction.} -\item{rechunk}{Boolean rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} +\item{rechunk}{In case of reading multiple files via a glob pattern, rechunk +the final DataFrame into contiguous memory chunks.} -\item{row_count_name}{NULL or string, if a string add a rowcount column named by this string} +\item{row_count_name}{If not \code{NULL}, this will insert a row count column with +the given name into the DataFrame.} -\item{row_count_offset}{integer, the rowcount column can be offset by this value} +\item{row_count_offset}{Offset to start the row_count column (only used if +the name is set).} -\item{low_memory}{Boolean, try reduce memory footprint} +\item{low_memory}{Reduce memory usage (will yield a lower performance).} \item{hive_partitioning}{Infer statistics and schema from hive partitioned URL and use them to prune reads.} -\item{use_statistics}{Boolean, if TRUE use statistics in the parquet to determine if pages can be -skipped from reading.} +\item{use_statistics}{Use statistics in the parquet file to determine if pages +can be skipped from reading.} } \value{ LazyFrame @@ -51,4 +56,3 @@ pl$scan_parquet( )$collect() \dontshow{\}) # examplesIf} } -\keyword{LazyFrame_new} From c45597e808976cbbb8c23ac8ed7a63f460463eba Mon Sep 17 00:00:00 2001 From: Etienne Bacher <52219252+etiennebacher@users.noreply.github.com> Date: Wed, 15 Nov 2023 07:53:45 +0100 Subject: [PATCH 9/9] list choices of parallel --- R/parquet.R | 3 ++- man/IO_read_parquet.Rd | 3 ++- man/IO_scan_parquet.Rd | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/R/parquet.R b/R/parquet.R index f9ff90cbb..19f5447f4 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -5,7 +5,8 @@ #' @param n_rows Maximum number of rows to read. #' @param cache Cache the result after reading. #' @param parallel This determines the direction of parallelism. `"auto"` will -#' try to determine the optimal direction. +#' try to determine the optimal direction. Can be `"auto"`, `"none"`, `"columns"`, +#' or `"rowgroups"`, #' @param rechunk In case of reading multiple files via a glob pattern, rechunk #' the final DataFrame into contiguous memory chunks. #' @param row_count_name If not `NULL`, this will insert a row count column with diff --git a/man/IO_read_parquet.Rd b/man/IO_read_parquet.Rd index dd4a13858..20d236424 100644 --- a/man/IO_read_parquet.Rd +++ b/man/IO_read_parquet.Rd @@ -26,7 +26,8 @@ files in the same directory (see examples).} \item{cache}{Cache the result after reading.} \item{parallel}{This determines the direction of parallelism. \code{"auto"} will -try to determine the optimal direction.} +try to determine the optimal direction. Can be \code{"auto"}, \code{"none"}, \code{"columns"}, +or \code{"rowgroups"},} \item{rechunk}{In case of reading multiple files via a glob pattern, rechunk the final DataFrame into contiguous memory chunks.} diff --git a/man/IO_scan_parquet.Rd b/man/IO_scan_parquet.Rd index 87f74d37a..0f99bf715 100644 --- a/man/IO_scan_parquet.Rd +++ b/man/IO_scan_parquet.Rd @@ -12,7 +12,8 @@ files in the same directory (see examples).} \item{cache}{Cache the result after reading.} \item{parallel}{This determines the direction of parallelism. \code{"auto"} will -try to determine the optimal direction.} +try to determine the optimal direction. Can be \code{"auto"}, \code{"none"}, \code{"columns"}, +or \code{"rowgroups"},} \item{rechunk}{In case of reading multiple files via a glob pattern, rechunk the final DataFrame into contiguous memory chunks.}