Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and improve $scan/read_parquet() #492

Merged
merged 12 commits into from
Nov 15, 2023
2 changes: 1 addition & 1 deletion R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count,

new_from_ndjson <- function(path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset) .Call(wrap__new_from_ndjson, path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_count_name, row_count_offset)

new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory, hive_partitioning)
new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, use_statistics, low_memory, hive_partitioning) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, use_statistics, low_memory, hive_partitioning)

test_rpolarserr <- function() .Call(wrap__test_rpolarserr)

Expand Down
121 changes: 58 additions & 63 deletions R/parquet.R
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
#' Scan a parquet file
#' @keywords LazyFrame_new
#'
#' @param file string filepath
#' @param n_rows limit rows to scan
#' @param cache bool use cache
#' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan.
#' @param rechunk bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.
#' @param row_count_name NULL or string, if a string add a rowcount column named by this string
#' @param row_count_offset integer, the rowcount column can be offset by this value
#' @param low_memory bool, try reduce memory footprint
#' @param file Path to a file. You can use globbing with `*` to scan/read multiple
#' files in the same directory (see examples).
#' @param n_rows Maximum number of rows to read.
#' @param cache Cache the result after reading.
#' @param parallel This determines the direction of parallelism. `"auto"` will
#' try to determine the optimal direction. Can be `"auto"`, `"none"`, `"columns"`,
#' or `"rowgroups"`,
#' @param rechunk In case of reading multiple files via a glob pattern, rechunk
#' the final DataFrame into contiguous memory chunks.
#' @param row_count_name If not `NULL`, this will insert a row count column with
#' the given name into the DataFrame.
#' @param row_count_offset Offset to start the row_count column (only used if
#' the name is set).
#' @param low_memory Reduce memory usage (will yield a lower performance).
#' @param hive_partitioning Infer statistics and schema from hive partitioned URL
#' and use them to prune reads.
#' @param use_statistics Use statistics in the parquet file to determine if pages
#' can be skipped from reading.
#'
#' @return LazyFrame
#' @name scan_parquet
Expand All @@ -32,76 +39,64 @@
#' file.path(temp_dir, "**/*.parquet")
#' )$collect()
pl$scan_parquet = function(
file, # : str | Path,
n_rows = NULL, # : int | None = None,
cache = TRUE, # : bool = True,
file,
n_rows = NULL,
cache = TRUE,
parallel = c(
"Auto", # default
"None",
"Columns", # Parallelize over the row groups
"RowGroups" # Parallelize over the columns
), # Automatically determine over which unit to parallelize, This will choose the most occurring unit.
rechunk = TRUE, # : bool = True,
row_count_name = NULL, # : str | None = None,
row_count_offset = 0L, # : int = 0,
# storage_options,#: dict[str, object] | None = None, #seems fsspec specific
low_memory = FALSE, # : bool = False,
hive_partitioning = TRUE) { #-> LazyFrame

parallel = parallel[1L]
if (!parallel %in% c("None", "Columns", "RowGroups", "Auto")) {
stop("unknown parallel strategy")
}
"Columns",
"RowGroups"
),
rechunk = TRUE,
row_count_name = NULL,
row_count_offset = 0L,
low_memory = FALSE,
use_statistics = TRUE,
hive_partitioning = TRUE
) {

result_lf = new_from_parquet(
new_from_parquet(
path = file,
n_rows = n_rows,
cache = cache,
parallel = parallel,
rechunk = rechunk,
row_name = row_count_name,
row_count = row_count_offset,
#storage_options = storage_options, # not supported yet
low_memory = low_memory,
use_statistics = use_statistics,
hive_partitioning = hive_partitioning
)

unwrap(result_lf)
) |>
unwrap("in pl$scan_parquet(): ")
}


#' Read a parquet file
#' @rdname IO_read_parquet
#' @param file string filepath
#' @param n_rows limit rows to scan
#' @param cache bool use cache
#' @param parallel String either Auto, None, Columns or RowGroups. The way to parallelized the scan.
#' @param rechunk bool rechunk reorganize memory layout, potentially make future operations faster , however perform reallocation now.
#' @param row_count_name NULL or string, if a string add a rowcount column named by this string
#' @param row_count_offset integer, the rowcount column can be offset by this value
#' @param low_memory bool, try reduce memory footprint
#' @inheritParams scan_parquet
#' @return DataFrame
#' @name read_parquet
pl$read_parquet = function(
file,
n_rows = NULL,
cache = TRUE,
parallel = c("Auto", "None", "Columns", "RowGroups"),
rechunk = TRUE,
row_count_name = NULL,
row_count_offset = 0L,
low_memory = FALSE) {
mc = match.call()
mc[[1]] = get("pl", envir = asNamespace("polars"))$scan_parquet
eval.parent(mc)$collect()
}
read_parquet = function( # remapped to pl$read_parquet, a hack to support roxygen2 @inheritsParams
file,
n_rows = NULL,
cache = TRUE,
parallel = c(
"Auto", # default
"None",
"Columns",
"RowGroups"
),
rechunk = TRUE,
row_count_name = NULL,
row_count_offset = 0L,
low_memory = FALSE,
use_statistics = TRUE,
hive_partitioning = TRUE) {


#
# def _prepare_row_count_args(
# row_count_name: str | None = None,
# row_count_offset: int = 0,
# ) -> tuple[str, int] | None:
# if row_count_name is not None:
# return (row_count_name, row_count_offset)
# else:
# return None
args = as.list(environment())
result({
do.call(pl$scan_parquet, args)$collect()
}) |>
unwrap("in pl$read_parquet(): ")
}
pl$read_parquet = read_parquet
42 changes: 34 additions & 8 deletions man/IO_read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 17 additions & 9 deletions man/IO_scan_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 32 additions & 43 deletions src/rust/src/rdataframe/read_parquet.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
use crate::utils::r_result_list;

use crate::lazy::dataframe::LazyFrame;
use crate::robj_to;
use crate::rpolarserr::{polars_to_rpolars_err, RResult};

//use crate::utils::wrappers::*;
use crate::utils::wrappers::null_to_opt;
use extendr_api::{extendr, prelude::*};
use extendr_api::Rinternals;
use extendr_api::{extendr, extendr_module, Robj};
use polars::io::RowCount;
use polars::prelude::{self as pl};
//this function is derived from polars/py-polars/src/lazy/DataFrame.rs new_from_csv

#[allow(clippy::too_many_arguments)]
#[extendr]
pub fn new_from_parquet(
path: String,
n_rows: Nullable<i32>,
cache: bool,
parallel: String, //Wrap<ParallelStrategy>,
rechunk: bool,
row_name: Nullable<String>,
row_count: u32,
low_memory: bool,
hive_partitioning: bool,
) -> List {
let parallel_strategy = match parallel {
x if x == "Auto" => pl::ParallelStrategy::Auto,
_ => panic!("not implemented"),
};

let row_name = null_to_opt(row_name);

let row_count = row_name.map(|name| polars::io::RowCount {
name,
offset: row_count,
});
let n_rows = null_to_opt(n_rows);

path: Robj,
n_rows: Robj,
cache: Robj,
parallel: Robj,
rechunk: Robj,
row_name: Robj,
row_count: Robj,
//storage_options: Robj, // not supported yet, add provide features e.g. aws
use_statistics: Robj,
low_memory: Robj,
hive_partitioning: Robj,
//retries: Robj // not supported yet, with CloudOptions
) -> RResult<LazyFrame> {
let offset = robj_to!(Option, u32, row_count)?.unwrap_or(0);
let opt_rowcount = robj_to!(Option, String, row_name)?.map(|name| RowCount { name, offset });
let args = pl::ScanArgsParquet {
n_rows: n_rows.map(|x| x as usize),
cache,
parallel: parallel_strategy,
rechunk,
row_count,
low_memory,
cloud_options: None, //TODO implement cloud options
use_statistics: true, //TODO expose use statistics
hive_partitioning,
n_rows: robj_to!(Option, usize, n_rows)?,
cache: robj_to!(bool, cache)?,
parallel: robj_to!(ParallelStrategy, parallel)?,
rechunk: robj_to!(bool, rechunk)?,
row_count: opt_rowcount,
low_memory: robj_to!(bool, low_memory)?,
cloud_options: None,
use_statistics: robj_to!(bool, use_statistics)?,
hive_partitioning: robj_to!(bool, hive_partitioning)?,
};

let lf_result = pl::LazyFrame::scan_parquet(path, args)
.map_err(|x| x.to_string())
.map(LazyFrame);
r_result_list(lf_result)
pl::LazyFrame::scan_parquet(robj_to!(String, path)?, args)
.map_err(polars_to_rpolars_err)
.map(LazyFrame)
}

extendr_module! {
Expand Down
14 changes: 14 additions & 0 deletions src/rust/src/rdatatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,20 @@ pub fn robj_to_closed_window(robj: Robj) -> RResult<pl::ClosedWindow> {
}
}

pub fn robj_to_parallel_strategy(robj: extendr_api::Robj) -> RResult<pl::ParallelStrategy> {
use pl::ParallelStrategy as PS;
match robj_to_rchoice(robj)?.to_lowercase().as_str() {
//accept also lowercase as normal for most other enums
"auto" => Ok(PS::Auto),
"none" => Ok(PS::Auto),
"columns" => Ok(PS::Auto),
"rowgroups" => Ok(PS::Auto),
s => rerr().bad_val(format!(
"ParallelStrategy choice ['{s}'] should be one of 'Auto', 'None', 'Columns', 'RowGroups'"
)),
}
}

extendr_module! {
mod rdatatype;
impl RPolarsDataType;
Expand Down
4 changes: 4 additions & 0 deletions src/rust/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,10 @@ macro_rules! robj_to_inner {
$crate::rdatatype::robj_to_join_type($a)
};

(ParallelStrategy, $a:ident) => {
$crate::rdatatype::robj_to_parallel_strategy($a)
};

(PathBuf, $a:ident) => {
$crate::utils::robj_to_pathbuf($a)
};
Expand Down
Loading
Loading