Skip to content

Commit

Permalink
Better handling of hive partitions in $scan_parquet() (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
etiennebacher authored Aug 21, 2024
1 parent 3c91eb4 commit d26074e
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 67 deletions.
6 changes: 5 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
and `TRUE` for the newest one (less compatible). It can also take an integer
determining a specific compatibility level when more are added in the future.
For now, `future = FALSE` can be replaced by `compat_level = FALSE` (#1183).
- In `$scan_parquet()` and `$read_parquet()`, the default value of
`hive_partitioning` is now `NULL` (#1189).

### New features

Expand All @@ -35,8 +37,10 @@
- `$scan_parquet()`, `$scan_ipc()` and `$read_parquet()` have a new argument
`include_file_paths` to automatically add a column containing the path to the
source file(s) (#1183).
- `$scan_ipc` can read a hive-partitioned directory with its new arguments
- `$scan_ipc()` can read a hive-partitioned directory with its new arguments
`hive_partitioning`, `hive_schema`, and `try_parse_hive_dates` (#1183).
- `$scan_parquet()` and `$read_parquet()` gain two new arguments for more control
on importing hive partitions: `hive_schema` and `try_parse_hive_dates` (#1189).

### Other changes

Expand Down
2 changes: 1 addition & 1 deletion R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_index,

new_from_ndjson <- function(path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_index_name, row_index_offset, ignore_errors) .Call(wrap__new_from_ndjson, path, infer_schema_length, batch_size, n_rows, low_memory, rechunk, row_index_name, row_index_offset, ignore_errors)

new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_index, storage_options, use_statistics, low_memory, hive_partitioning, glob, include_file_paths) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_index, storage_options, use_statistics, low_memory, hive_partitioning, glob, include_file_paths)
new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_index, storage_options, use_statistics, low_memory, hive_partitioning, hive_schema, try_parse_hive_dates, glob, include_file_paths) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_index, storage_options, use_statistics, low_memory, hive_partitioning, hive_schema, try_parse_hive_dates, glob, include_file_paths)

test_rpolarserr <- function() .Call(wrap__test_rpolarserr)

Expand Down
60 changes: 32 additions & 28 deletions R/io_parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,21 @@
#' a valid service account. Be sure to always include a service account in the
#' `storage_options` argument.
#'
#' @examplesIf requireNamespace("arrow", quietly = TRUE) && arrow::arrow_with_dataset() && arrow::arrow_with_parquet()
#' temp_dir = tempfile()
#' @examplesIf requireNamespace("withr", quietly = TRUE)
#' # Write a Parquet file than we can then import as DataFrame
#' temp_file = withr::local_tempfile(fileext = ".parquet")
#' pl$DataFrame(mtcars)$write_parquet(temp_file)
#'
#' pl$scan_parquet(temp_file)$collect()
#'
#' # Write a hive-style partitioned parquet dataset
#' arrow::write_dataset(
#' mtcars,
#' temp_dir,
#' partitioning = c("cyl", "gear"),
#' format = "parquet",
#' hive_style = TRUE
#' )
#' temp_dir = withr::local_tempdir()
#' pl$DataFrame(mtcars)$write_parquet(temp_dir, partition_by = c("cyl", "gear"))
#' list.files(temp_dir, recursive = TRUE)
#'
#' # Read the dataset
#' pl$scan_parquet(
#' file.path(temp_dir, "**/*.parquet")
#' )$collect()
#' # If the path is a folder, Polars automatically tries to detect partitions
#' # and includes them in the output
#' pl$scan_parquet(temp_dir)$collect()
pl_scan_parquet = function(
source,
...,
Expand All @@ -78,7 +77,9 @@ pl_scan_parquet = function(
"row_groups",
"none"
),
hive_partitioning = TRUE,
hive_partitioning = NULL,
hive_schema = NULL,
try_parse_hive_dates = TRUE,
glob = TRUE,
rechunk = FALSE,
low_memory = FALSE,
Expand All @@ -97,6 +98,8 @@ pl_scan_parquet = function(
low_memory = low_memory,
use_statistics = use_statistics,
hive_partitioning = hive_partitioning,
hive_schema = hive_schema,
try_parse_hive_dates = try_parse_hive_dates,
storage_options = storage_options,
glob = glob,
include_file_paths = include_file_paths
Expand All @@ -108,22 +111,21 @@ pl_scan_parquet = function(
#' @rdname IO_read_parquet
#' @inherit pl_read_csv return
#' @inherit pl_scan_parquet params details
#' @examplesIf requireNamespace("arrow", quietly = TRUE) && arrow::arrow_with_dataset() && arrow::arrow_with_parquet()
#' temp_dir = tempfile()
#' @examplesIf requireNamespace("withr", quietly = TRUE)
#' # Write a Parquet file than we can then import as DataFrame
#' temp_file = withr::local_tempfile(fileext = ".parquet")
#' pl$DataFrame(mtcars)$write_parquet(temp_file)
#'
#' pl$read_parquet(temp_file)
#'
#' # Write a hive-style partitioned parquet dataset
#' arrow::write_dataset(
#' mtcars,
#' temp_dir,
#' partitioning = c("cyl", "gear"),
#' format = "parquet",
#' hive_style = TRUE
#' )
#' temp_dir = withr::local_tempdir()
#' pl$DataFrame(mtcars)$write_parquet(temp_dir, partition_by = c("cyl", "gear"))
#' list.files(temp_dir, recursive = TRUE)
#'
#' # Read the dataset
#' pl$read_parquet(
#' file.path(temp_dir, "**/*.parquet")
#' )
#' # If the path is a folder, Polars automatically tries to detect partitions
#' # and includes them in the output
#' pl$read_parquet(temp_dir)
pl_read_parquet = function(
source,
...,
Expand All @@ -136,7 +138,9 @@ pl_read_parquet = function(
"row_groups",
"none"
),
hive_partitioning = TRUE,
hive_partitioning = NULL,
hive_schema = NULL,
try_parse_hive_dates = TRUE,
glob = TRUE,
rechunk = TRUE,
low_memory = FALSE,
Expand Down
37 changes: 23 additions & 14 deletions man/IO_read_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 23 additions & 14 deletions man/IO_scan_parquet.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 10 additions & 7 deletions src/rust/src/rdataframe/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::rpolarserr::{polars_to_rpolars_err, RResult};

use extendr_api::Rinternals;
use extendr_api::{extendr, extendr_module, Robj};
use polars::io::RowIndex;
use polars::io::{HiveOptions, RowIndex};
use polars::prelude::{self as pl, Arc};

#[allow(clippy::too_many_arguments)]
Expand All @@ -22,6 +22,8 @@ pub fn new_from_parquet(
use_statistics: Robj,
low_memory: Robj,
hive_partitioning: Robj,
hive_schema: Robj,
try_parse_hive_dates: Robj,
glob: Robj,
include_file_paths: Robj,
//retries: Robj // not supported yet, with CloudOptions
Expand All @@ -33,6 +35,12 @@ pub fn new_from_parquet(
name: name.into(),
offset,
});
let hive_options = HiveOptions {
enabled: robj_to!(Option, bool, hive_partitioning)?,
hive_start_idx: 0,
schema: robj_to!(Option, WrapSchema, hive_schema)?.map(|x| Arc::new(x.0)),
try_parse_dates: robj_to!(bool, try_parse_hive_dates)?,
};
let args = pl::ScanArgsParquet {
n_rows: robj_to!(Option, usize, n_rows)?,
cache: robj_to!(bool, cache)?,
Expand All @@ -42,12 +50,7 @@ pub fn new_from_parquet(
low_memory: robj_to!(bool, low_memory)?,
cloud_options,
use_statistics: robj_to!(bool, use_statistics)?,
hive_options: polars::io::HiveOptions {
enabled: robj_to!(Option, bool, hive_partitioning)?,
hive_start_idx: 0, // TODO: is it actually 0?
schema: None, // TODO: implement a option to set this
try_parse_dates: true,
},
hive_options,
glob: robj_to!(bool, glob)?,
include_file_paths: robj_to!(Option, String, include_file_paths)?.map(Arc::from),
};
Expand Down
52 changes: 50 additions & 2 deletions tests/testthat/test-parquet.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ test_that("scan read parquet - parallel strategies", {
)
})



test_that("scanning from hive partition works", {
skip_if_not_installed("withr")
temp_dir = withr::local_tempdir()
Expand All @@ -79,6 +77,56 @@ test_that("scanning from hive partition works", {
mtcars[order(mtcars$cyl, mtcars$gear), c("mpg", "gear")],
ignore_attr = TRUE
)

# TODO: uncomment when https://github.com/pola-rs/polars/issues/18293 is resolved

# hive_partitioning controls whether partitioning columns are included
# expect_identical(
# pl$scan_parquet(temp_dir, hive_partitioning = FALSE)$collect() |> dim(),
# c(32L, 9L)
# )

# TODO: uncomment when https://github.com/pola-rs/polars/issues/18294 is resolved

# can use hive_schema for more fine grained control on partitioning columns
# sch = pl$scan_parquet(temp_dir, hive_schema = list(cyl = pl$String, gear = pl$Int32))$
# collect()$schema
# expect_true(sch$gear$is_integer())
# expect_true(sch$cyl$is_string())
expect_grepl_error(
pl$scan_parquet(temp_dir, hive_schema = list(cyl = "a"))
)

# cannot get a subset of partitioning columns
expect_grepl_error(
pl$scan_parquet(temp_dir, hive_schema = list(cyl = pl$String))$collect(),
r"(path contains column not present in the given Hive schema: "gear")"
)
})

test_that("try_parse_hive_dates works", {
skip_if_not_installed("arrow")
skip_if_not_installed("withr")
temp_dir = withr::local_tempdir()
test = data.frame(dt = as.Date(c("2020-01-01", "2020-01-01", "2020-01-02")), y = 1:3)
arrow::write_dataset(
test,
temp_dir,
partitioning = "dt",
format = "parquet",
hive_style = TRUE
)

# default is to parse dates
expect_identical(
pl$scan_parquet(temp_dir)$select("dt")$collect()$to_list(),
list(dt = as.Date(c("2020-01-01", "2020-01-01", "2020-01-02")))
)

expect_identical(
pl$scan_parquet(temp_dir, try_parse_hive_dates = FALSE)$select("dt")$collect()$to_list(),
list(dt = c("2020-01-01", "2020-01-01", "2020-01-02"))
)
})

test_that("scan_parquet can include file path", {
Expand Down

0 comments on commit d26074e

Please sign in to comment.