diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index ae16e4f33..a44108162 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -89,7 +89,7 @@ import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, new_from_ndjson <- function(path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset) .Call(wrap__new_from_ndjson, path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset) -new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning) +new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, use_statistics, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, use_statistics, low_memory, hive_partitioning) test_rpolarserr <- function() .Call(wrap__test_rpolarserr) diff --git a/R/parquet.R b/R/parquet.R index 6c5a2da3e..19f5447f4 100644 --- a/R/parquet.R +++ b/R/parquet.R @@ -1,16 +1,23 @@ #' Scan a parquet file -#' @keywords LazyFrame_new #' -#' @param file string filepath -#' @param n_rows limit rows to scan -#' @param cache bool use cache -#' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan. -#' @param rechunk bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now. -#' @param row_count_name NULL or string, if a string add a rowcount column named by this string -#' @param row_count_offset integer, the rowcount column can be offset by this value -#' @param low_memory bool, try reduce memory footprint +#' @param file Path to a file. You can use globbing with `*` to scan/read multiple +#' files in the same directory (see examples). +#' @param n_rows Maximum number of rows to read. +#' @param cache Cache the result after reading. +#' @param parallel This determines the direction of parallelism. `"auto"` will +#' try to determine the optimal direction. Can be `"auto"`, `"none"`, `"columns"`, +#' or `"rowgroups"`, +#' @param rechunk In case of reading multiple files via a glob pattern, rechunk +#' the final DataFrame into contiguous memory chunks. +#' @param row_count_name If not `NULL`, this will insert a row count column with +#' the given name into the DataFrame. +#' @param row_count_offset Offset to start the row_count column (only used if +#' the name is set). +#' @param low_memory Reduce memory usage (will yield a lower performance). #' @param hive_partitioning Infer statistics and schema from hive partitioned URL #' and use them to prune reads. +#' @param use_statistics Use statistics in the parquet file to determine if pages +#' can be skipped from reading. #' #' @return LazyFrame #' @name scan_parquet @@ -32,28 +39,24 @@ #' file.path(temp_dir, "**/*.parquet") #' )$collect() pl$scan_parquet = function( - file, # : str | Path, - n_rows = NULL, # : int | None = None, - cache = TRUE, # : bool = True, + file, + n_rows = NULL, + cache = TRUE, parallel = c( "Auto", # default "None", - "Columns", # Parallelize over the row groups - "RowGroups" # Parallelize over the columns - ), # Automatically determine over which unit to parallelize, This will choose the most occurring unit. - rechunk = TRUE, # : bool = True, - row_count_name = NULL, # : str | None = None, - row_count_offset = 0L, # : int = 0, - # storage_options,#: dict[str, object] | None = None, #seems fsspec specific - low_memory = FALSE, # : bool = False, - hive_partitioning = TRUE) { #-> LazyFrame - - parallel = parallel[1L] - if (!parallel %in% c("None", "Columns", "RowGroups", "Auto")) { - stop("unknown parallel strategy") - } + "Columns", + "RowGroups" + ), + rechunk = TRUE, + row_count_name = NULL, + row_count_offset = 0L, + low_memory = FALSE, + use_statistics = TRUE, + hive_partitioning = TRUE + ) { - result_lf = new_from_parquet( + new_from_parquet( path = file, n_rows = n_rows, cache = cache, @@ -61,47 +64,39 @@ pl$scan_parquet = function( rechunk = rechunk, row_name = row_count_name, row_count = row_count_offset, + #storage_options = storage_options, # not supported yet low_memory = low_memory, + use_statistics = use_statistics, hive_partitioning = hive_partitioning - ) - - unwrap(result_lf) + ) |> + unwrap("in pl$scan_parquet(): ") } - #' Read a parquet file #' @rdname IO_read_parquet -#' @param file string filepath -#' @param n_rows limit rows to scan -#' @param cache bool use cache -#' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan. -#' @param rechunk bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now. -#' @param row_count_name NULL or string, if a string add a rowcount column named by this string -#' @param row_count_offset integer, the rowcount column can be offset by this value -#' @param low_memory bool, try reduce memory footprint +#' @inheritParams scan_parquet #' @return DataFrame -#' @name read_parquet -pl$read_parquet = function( - file, - n_rows = NULL, - cache = TRUE, - parallel = c("Auto", "None", "Columns", "RowGroups"), - rechunk = TRUE, - row_count_name = NULL, - row_count_offset = 0L, - low_memory = FALSE) { - mc = match.call() - mc[[1]] = get("pl", envir = asNamespace("polars"))$scan_parquet - eval.parent(mc)$collect() -} +read_parquet = function( # remapped to pl$read_parquet, a hack to support roxygen2 @inheritsParams + file, + n_rows = NULL, + cache = TRUE, + parallel = c( + "Auto", # default + "None", + "Columns", + "RowGroups" + ), + rechunk = TRUE, + row_count_name = NULL, + row_count_offset = 0L, + low_memory = FALSE, + use_statistics = TRUE, + hive_partitioning = TRUE) { - -# -# def _prepare_row_count_args( -# row_count_name: str | None = None, -# row_count_offset: int = 0, -# ) -> tuple[str, int] | None: -# if row_count_name is not None: -# return (row_count_name, row_count_offset) -# else: -# return None + args = as.list(environment()) + result({ + do.call(pl$scan_parquet, args)$collect() + }) |> + unwrap("in pl$read_parquet(): ") +} +pl$read_parquet = read_parquet diff --git a/man/IO_read_parquet.Rd b/man/IO_read_parquet.Rd index 4355aced7..20d236424 100644 --- a/man/IO_read_parquet.Rd +++ b/man/IO_read_parquet.Rd @@ -3,22 +3,48 @@ \name{read_parquet} \alias{read_parquet} \title{Read a parquet file} +\usage{ +read_parquet( + file, + n_rows = NULL, + cache = TRUE, + parallel = c("Auto", "None", "Columns", "RowGroups"), + rechunk = TRUE, + row_count_name = NULL, + row_count_offset = 0L, + low_memory = FALSE, + use_statistics = TRUE, + hive_partitioning = TRUE +) +} \arguments{ -\item{file}{string filepath} +\item{file}{Path to a file. You can use globbing with \code{*} to scan/read multiple +files in the same directory (see examples).} + +\item{n_rows}{Maximum number of rows to read.} + +\item{cache}{Cache the result after reading.} -\item{n_rows}{limit rows to scan} +\item{parallel}{This determines the direction of parallelism. \code{"auto"} will +try to determine the optimal direction. Can be \code{"auto"}, \code{"none"}, \code{"columns"}, +or \code{"rowgroups"},} -\item{cache}{bool use cache} +\item{rechunk}{In case of reading multiple files via a glob pattern, rechunk +the final DataFrame into contiguous memory chunks.} -\item{parallel}{String either Auto, None, Columns or RowGroups. The way to parallelized the scan.} +\item{row_count_name}{If not \code{NULL}, this will insert a row count column with +the given name into the DataFrame.} -\item{rechunk}{bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} +\item{row_count_offset}{Offset to start the row_count column (only used if +the name is set).} -\item{row_count_name}{NULL or string, if a string add a rowcount column named by this string} +\item{low_memory}{Reduce memory usage (will yield a lower performance).} -\item{row_count_offset}{integer, the rowcount column can be offset by this value} +\item{use_statistics}{Use statistics in the parquet file to determine if pages +can be skipped from reading.} -\item{low_memory}{bool, try reduce memory footprint} +\item{hive_partitioning}{Infer statistics and schema from hive partitioned URL +and use them to prune reads.} } \value{ DataFrame diff --git a/man/IO_scan_parquet.Rd b/man/IO_scan_parquet.Rd index 7e7a69d61..0f99bf715 100644 --- a/man/IO_scan_parquet.Rd +++ b/man/IO_scan_parquet.Rd @@ -4,24 +4,33 @@ \alias{scan_parquet} \title{Scan a parquet file} \arguments{ -\item{file}{string filepath} +\item{file}{Path to a file. You can use globbing with \code{*} to scan/read multiple +files in the same directory (see examples).} -\item{n_rows}{limit rows to scan} +\item{n_rows}{Maximum number of rows to read.} -\item{cache}{bool use cache} +\item{cache}{Cache the result after reading.} -\item{parallel}{String either Auto, None, Columns or RowGroups. The way to parallelized the scan.} +\item{parallel}{This determines the direction of parallelism. \code{"auto"} will +try to determine the optimal direction. Can be \code{"auto"}, \code{"none"}, \code{"columns"}, +or \code{"rowgroups"},} -\item{rechunk}{bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.} +\item{rechunk}{In case of reading multiple files via a glob pattern, rechunk +the final DataFrame into contiguous memory chunks.} -\item{row_count_name}{NULL or string, if a string add a rowcount column named by this string} +\item{row_count_name}{If not \code{NULL}, this will insert a row count column with +the given name into the DataFrame.} -\item{row_count_offset}{integer, the rowcount column can be offset by this value} +\item{row_count_offset}{Offset to start the row_count column (only used if +the name is set).} -\item{low_memory}{bool, try reduce memory footprint} +\item{low_memory}{Reduce memory usage (will yield a lower performance).} \item{hive_partitioning}{Infer statistics and schema from hive partitioned URL and use them to prune reads.} + +\item{use_statistics}{Use statistics in the parquet file to determine if pages +can be skipped from reading.} } \value{ LazyFrame @@ -48,4 +57,3 @@ pl$scan_parquet( )$collect() \dontshow{\}) # examplesIf} } -\keyword{LazyFrame_new} diff --git a/src/rust/src/rdataframe/read_parquet.rs b/src/rust/src/rdataframe/read_parquet.rs index 0c53d4701..c351fcc78 100644 --- a/src/rust/src/rdataframe/read_parquet.rs +++ b/src/rust/src/rdataframe/read_parquet.rs @@ -1,55 +1,44 @@ -use crate::utils::r_result_list; - use crate::lazy::dataframe::LazyFrame; +use crate::robj_to; +use crate::rpolarserr::{polars_to_rpolars_err, RResult}; -//use crate::utils::wrappers::*; -use crate::utils::wrappers::null_to_opt; -use extendr_api::{extendr, prelude::*}; +use extendr_api::Rinternals; +use extendr_api::{extendr, extendr_module, Robj}; +use polars::io::RowCount; use polars::prelude::{self as pl}; -//this function is derived from polars/py-polars/src/lazy/DataFrame.rs new_from_csv - #[allow(clippy::too_many_arguments)] #[extendr] pub fn new_from_parquet( - path: String, - n_rows: Nullable, - cache: bool, - parallel: String, //Wrap, - rechunk: bool, - row_name: Nullable, - row_count: u32, - low_memory: bool, - hive_partitioning: bool, -) -> List { - let parallel_strategy = match parallel { - x if x == "Auto" => pl::ParallelStrategy::Auto, - _ => panic!("not implemented"), - }; - - let row_name = null_to_opt(row_name); - - let row_count = row_name.map(|name| polars::io::RowCount { - name, - offset: row_count, - }); - let n_rows = null_to_opt(n_rows); - + path: Robj, + n_rows: Robj, + cache: Robj, + parallel: Robj, + rechunk: Robj, + row_name: Robj, + row_count: Robj, + //storage_options: Robj, // not supported yet, add provide features e.g. aws + use_statistics: Robj, + low_memory: Robj, + hive_partitioning: Robj, + //retries: Robj // not supported yet, with CloudOptions +) -> RResult { + let offset = robj_to!(Option, u32, row_count)?.unwrap_or(0); + let opt_rowcount = robj_to!(Option, String, row_name)?.map(|name| RowCount { name, offset }); let args = pl::ScanArgsParquet { - n_rows: n_rows.map(|x| x as usize), - cache, - parallel: parallel_strategy, - rechunk, - row_count, - low_memory, - cloud_options: None, //TODO implement cloud options - use_statistics: true, //TODO expose use statistics - hive_partitioning, + n_rows: robj_to!(Option, usize, n_rows)?, + cache: robj_to!(bool, cache)?, + parallel: robj_to!(ParallelStrategy, parallel)?, + rechunk: robj_to!(bool, rechunk)?, + row_count: opt_rowcount, + low_memory: robj_to!(bool, low_memory)?, + cloud_options: None, + use_statistics: robj_to!(bool, use_statistics)?, + hive_partitioning: robj_to!(bool, hive_partitioning)?, }; - let lf_result = pl::LazyFrame::scan_parquet(path, args) - .map_err(|x| x.to_string()) - .map(LazyFrame); - r_result_list(lf_result) + pl::LazyFrame::scan_parquet(robj_to!(String, path)?, args) + .map_err(polars_to_rpolars_err) + .map(LazyFrame) } extendr_module! { diff --git a/src/rust/src/rdatatype.rs b/src/rust/src/rdatatype.rs index 79756da7d..3793e016d 100644 --- a/src/rust/src/rdatatype.rs +++ b/src/rust/src/rdatatype.rs @@ -516,6 +516,20 @@ pub fn robj_to_closed_window(robj: Robj) -> RResult { } } +pub fn robj_to_parallel_strategy(robj: extendr_api::Robj) -> RResult { + use pl::ParallelStrategy as PS; + match robj_to_rchoice(robj)?.to_lowercase().as_str() { + //accept also lowercase as normal for most other enums + "auto" => Ok(PS::Auto), + "none" => Ok(PS::Auto), + "columns" => Ok(PS::Auto), + "rowgroups" => Ok(PS::Auto), + s => rerr().bad_val(format!( + "ParallelStrategy choice ['{s}'] should be one of 'Auto', 'None', 'Columns', 'RowGroups'" + )), + } +} + extendr_module! { mod rdatatype; impl RPolarsDataType; diff --git a/src/rust/src/utils/mod.rs b/src/rust/src/utils/mod.rs index 379cc477b..4e9cbb9a2 100644 --- a/src/rust/src/utils/mod.rs +++ b/src/rust/src/utils/mod.rs @@ -1059,6 +1059,10 @@ macro_rules! robj_to_inner { $crate::rdatatype::robj_to_join_type($a) }; + (ParallelStrategy, $a:ident) => { + $crate::rdatatype::robj_to_parallel_strategy($a) + }; + (PathBuf, $a:ident) => { $crate::utils::robj_to_pathbuf($a) }; diff --git a/tests/testthat/test-parquet.R b/tests/testthat/test-parquet.R new file mode 100644 index 000000000..887de0238 --- /dev/null +++ b/tests/testthat/test-parquet.R @@ -0,0 +1,63 @@ +test_that("plain scan read parquet", { + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + + # simple scan + expect_identical( + pl$scan_parquet(tmpf)$collect()$to_data_frame(), + df_exp + ) + + # simple read + expect_identical( + pl$read_parquet(tmpf)$to_data_frame(), + df_exp + ) +}) + + +test_that("scan read parquet - test arg rowcount", { + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + + expect_identical( + pl$scan_parquet(tmpf, row_count_name = "rc", row_count_offset = 5)$collect()$to_data_frame(), + data.frame(rc = as.numeric(5:36), df_exp) + ) + + expect_identical( + pl$read_parquet(tmpf, row_count_name = "rc", row_count_offset = 5)$to_data_frame(), + data.frame(rc = as.numeric(5:36), df_exp) + ) +}) + + +test_that("scan read parquet - parallel strategies", { + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + + # check all parallel strategies produce same result + for (choice in c("auto", "COLUMNS", "None", "rowGroups")) { + expect_identical( + pl$read_parquet(tmpf, parallel = choice)$to_data_frame(), + df_exp + ) + } + + # bad parallel args + ctx = pl$read_parquet(tmpf, parallel = "34") |> get_err_ctx() + expect_true(startsWith(ctx$BadValue, "ParallelStrategy choice")) + expect_identical(ctx$BadArgument, "parallel") + ctx = pl$read_parquet(tmpf, parallel = 42) |> get_err_ctx() + expect_identical(ctx$NotAChoice, "input is not a character vector") + +}) diff --git a/tests/testthat/test-without_library_polars.R b/tests/testthat/test-without_library_polars.R index ace8876c6..cfbe5d75d 100644 --- a/tests/testthat/test-without_library_polars.R +++ b/tests/testthat/test-without_library_polars.R @@ -21,3 +21,28 @@ test_that("without library(polars)", { # This works because library(polars) puts polars in search() expect_true(polars:::test_wrong_call_pl_lit(42) |> polars:::is_ok()) }) + + + + +test_that("scan read parquet from other process", { + skip_if_not_installed("callr") + + tmpf = tempfile() + on.exit(unlink(tmpf)) + lf_exp = polars::pl$LazyFrame(mtcars) + lf_exp$sink_parquet(tmpf, compression = "snappy") + df_exp = lf_exp$collect()$to_data_frame() + + # simple scan + expect_identical( + callr::r(\(tmpf) polars::pl$scan_parquet(tmpf)$collect()$to_data_frame(), args = list(tmpf = tmpf)), + df_exp + ) + + # simple read + expect_identical( + callr::r(\(tmpf) polars::pl$read_parquet(tmpf)$to_data_frame(), args = list(tmpf = tmpf)), + df_exp + ) +})