Skip to content

Commit

Permalink
Support reading matching projected and filter cols from Parquet files…
Browse files Browse the repository at this point in the history
… with otherwise mismatched schemas (rapidsai#16394)

Closes rapidsai#16269.

This PR adds support to read (matching) projected/selected and filter columns from Parquet files with otherwise mismatching schemas. 

### Solution Description
We create a `std::vector<unordered_maps<int32_t, int32_t>>`, one per file except 0th file. We then co-walk schema trees and populate the map with corresponding (one-to-one mapped) `schema_idx` of valid selected (projection and filter) column between 0th and the rest of the files. The same `unordered_map` is used to get the `schema_idx` of the same columns across files when creating `ColumnChunkDesc` and copying column chunk metadata into the page decoder.

### Known Limitation
- [x] Nullability across files: Each selected column must still be either nullable or non-nullable across all files. See rapidsai#12702 also described in [#dask/9935](dask/dask#9935)

CC @wence-

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Lawrence Mitchell (https://github.com/wence-)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: rapidsai#16394
  • Loading branch information
mhaseeb123 authored Aug 28, 2024
1 parent 925530a commit fbd6114
Show file tree
Hide file tree
Showing 11 changed files with 534 additions and 30 deletions.
37 changes: 37 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class parquet_reader_options {
bool _use_pandas_metadata = true;
// Whether to read and use ARROW schema
bool _use_arrow_schema = true;
// Whether to allow reading matching select columns from mismatched Parquet files.
bool _allow_mismatched_pq_schemas = false;
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};

Expand Down Expand Up @@ -138,6 +140,18 @@ class parquet_reader_options {
*/
[[nodiscard]] bool is_enabled_use_arrow_schema() const { return _use_arrow_schema; }

/**
* @brief Returns true/false depending on whether to read matching projected and filter columns
* from mismatched Parquet sources.
*
* @return `true` if mismatched projected and filter columns will be read from mismatched Parquet
* sources.
*/
[[nodiscard]] bool is_enabled_allow_mismatched_pq_schemas() const
{
return _allow_mismatched_pq_schemas;
}

/**
* @brief Returns optional tree of metadata.
*
Expand Down Expand Up @@ -258,6 +272,15 @@ class parquet_reader_options {
*/
void enable_use_arrow_schema(bool val) { _use_arrow_schema = val; }

/**
* @brief Sets to enable/disable reading of matching projected and filter columns from mismatched
* Parquet sources.
*
* @param val Boolean value whether to read matching projected and filter columns from mismatched
* Parquet sources.
*/
void enable_allow_mismatched_pq_schemas(bool val) { _allow_mismatched_pq_schemas = val; }

/**
* @brief Sets reader column schema.
*
Expand Down Expand Up @@ -382,6 +405,20 @@ class parquet_reader_options_builder {
return *this;
}

/**
* @brief Sets to enable/disable reading of matching projected and filter columns from mismatched
* Parquet sources.
*
* @param val Boolean value whether to read matching projected and filter columns from mismatched
* Parquet sources.
* @return this for chaining.
*/
parquet_reader_options_builder& allow_mismatched_pq_schemas(bool val)
{
options._allow_mismatched_pq_schemas = val;
return *this;
}

/**
* @brief Sets reader metadata.
*
Expand Down
13 changes: 9 additions & 4 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,10 @@ reader::impl::impl(std::size_t chunk_read_limit,
_input_pass_read_limit{pass_read_limit}
{
// Open and parse the source dataset metadata
_metadata =
std::make_unique<aggregate_reader_metadata>(_sources, options.is_enabled_use_arrow_schema());
_metadata = std::make_unique<aggregate_reader_metadata>(
_sources,
options.is_enabled_use_arrow_schema(),
options.get_columns().has_value() and options.is_enabled_allow_mismatched_pq_schemas());

// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();
Expand Down Expand Up @@ -769,11 +771,14 @@ parquet_column_schema walk_schema(aggregate_reader_metadata const* mt, int idx)

parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> const> sources)
{
// do not use arrow schema when reading information from parquet metadata.
// Do not use arrow schema when reading information from parquet metadata.
static constexpr auto use_arrow_schema = false;

// Do not select any columns when only reading the parquet metadata.
static constexpr auto has_column_projection = false;

// Open and parse the source dataset metadata
auto metadata = aggregate_reader_metadata(sources, use_arrow_schema);
auto metadata = aggregate_reader_metadata(sources, use_arrow_schema, has_column_projection);

return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)},
metadata.get_num_rows(),
Expand Down
174 changes: 157 additions & 17 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,17 @@ aggregate_reader_metadata::collect_keyval_metadata() const
return kv_maps;
}

std::vector<std::unordered_map<int32_t, int32_t>> aggregate_reader_metadata::init_schema_idx_maps(
bool const has_cols_from_mismatched_srcs) const
{
// Only initialize if more than 1 data sources and has select columns from mismatched data sources
if (has_cols_from_mismatched_srcs and per_file_metadata.size() > 1) {
return std::vector<std::unordered_map<int32_t, int32_t>>{per_file_metadata.size() - 1};
}

return {};
}

int64_t aggregate_reader_metadata::calc_num_rows() const
{
return std::accumulate(
Expand Down Expand Up @@ -539,13 +550,18 @@ void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_inf
}

aggregate_reader_metadata::aggregate_reader_metadata(
host_span<std::unique_ptr<datasource> const> sources, bool use_arrow_schema)
host_span<std::unique_ptr<datasource> const> sources,
bool use_arrow_schema,
bool has_cols_from_mismatched_srcs)
: per_file_metadata(metadatas_from_sources(sources)),
keyval_maps(collect_keyval_metadata()),
schema_idx_maps(init_schema_idx_maps(has_cols_from_mismatched_srcs)),
num_rows(calc_num_rows()),
num_row_groups(calc_num_row_groups())
{
if (per_file_metadata.size() > 0) {
// Validate that all sources have the same schema unless we are reading select columns
// from mismatched sources, in which case, we will only check the projected columns later.
if (per_file_metadata.size() > 1 and not has_cols_from_mismatched_srcs) {
auto const& first_meta = per_file_metadata.front();
auto const num_cols =
first_meta.row_groups.size() > 0 ? first_meta.row_groups.front().columns.size() : 0;
Expand Down Expand Up @@ -632,7 +648,7 @@ arrow_schema_data_types aggregate_reader_metadata::collect_arrow_schema() const
if (field->type_type() == flatbuf::Type::Type_Duration) {
auto type_data = field->type_as_Duration();
if (type_data != nullptr) {
auto name = (field->name()) ? field->name()->str() : "";
auto name = field->name() ? field->name()->str() : "";
// set the schema_elem type to duration type
schema_elem.type = duration_from_flatbuffer(type_data);
arrow_type_col_seen |= (schema_elem.type.id() != type_id::EMPTY);
Expand Down Expand Up @@ -868,12 +884,23 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t
size_type src_idx,
int schema_idx) const
{
// schema_idx_maps will only have > 0 size when we are reading matching column projection from
// mismatched Parquet sources.
if (src_idx and not schema_idx_maps.empty()) {
auto const& schema_idx_map = schema_idx_maps[src_idx - 1];
CUDF_EXPECTS(schema_idx_map.find(schema_idx) != schema_idx_map.end(),
"Unmapped schema index encountered in the specified source tree",
std::range_error);
schema_idx = schema_idx_map.at(schema_idx);
}

auto col =
std::find_if(per_file_metadata[src_idx].row_groups[row_group_index].columns.begin(),
per_file_metadata[src_idx].row_groups[row_group_index].columns.end(),
[schema_idx](ColumnChunk const& col) { return col.schema_idx == schema_idx; });
CUDF_EXPECTS(col != std::end(per_file_metadata[src_idx].row_groups[row_group_index].columns),
"Found no metadata for schema index");
"Found no metadata for schema index",
std::range_error);
return col->meta_data;
}

Expand Down Expand Up @@ -1041,18 +1068,19 @@ aggregate_reader_metadata::select_columns(
std::optional<std::vector<std::string>> const& filter_columns_names,
bool include_index,
bool strings_to_categorical,
type_id timestamp_type_id) const
type_id timestamp_type_id)
{
auto find_schema_child = [&](SchemaElement const& schema_elem, std::string const& name) {
auto const& col_schema_idx =
std::find_if(schema_elem.children_idx.cbegin(),
schema_elem.children_idx.cend(),
[&](size_t col_schema_idx) { return get_schema(col_schema_idx).name == name; });

return (col_schema_idx != schema_elem.children_idx.end())
? static_cast<size_type>(*col_schema_idx)
: -1;
};
auto const find_schema_child =
[&](SchemaElement const& schema_elem, std::string const& name, int const pfm_idx = 0) {
auto const& col_schema_idx = std::find_if(
schema_elem.children_idx.cbegin(),
schema_elem.children_idx.cend(),
[&](size_t col_schema_idx) { return get_schema(col_schema_idx, pfm_idx).name == name; });

return (col_schema_idx != schema_elem.children_idx.end())
? static_cast<size_type>(*col_schema_idx)
: -1;
};

std::vector<cudf::io::detail::inline_column_buffer> output_columns;
std::vector<input_column_info> input_columns;
Expand All @@ -1074,7 +1102,7 @@ aggregate_reader_metadata::select_columns(
if (schema_elem.is_stub()) {
// is this legit?
CUDF_EXPECTS(schema_elem.num_children == 1, "Unexpected number of children for stub");
auto child_col_name_info = (col_name_info) ? &col_name_info->children[0] : nullptr;
auto const child_col_name_info = col_name_info ? &col_name_info->children[0] : nullptr;
return build_column(
child_col_name_info, schema_elem.children_idx[0], out_col_array, has_list_parent);
}
Expand Down Expand Up @@ -1154,6 +1182,97 @@ aggregate_reader_metadata::select_columns(
return path_is_valid;
};

// Compares two schema elements to be equal except their number of children
auto const equal_to_except_num_children = [](SchemaElement const& lhs, SchemaElement const& rhs) {
return lhs.type == rhs.type and lhs.converted_type == rhs.converted_type and
lhs.type_length == rhs.type_length and lhs.repetition_type == rhs.repetition_type and
lhs.name == rhs.name and lhs.decimal_scale == rhs.decimal_scale and
lhs.decimal_precision == rhs.decimal_precision and lhs.field_id == rhs.field_id;
};

// Maps a projected column's schema_idx in the zeroth per_file_metadata (source) to the
// corresponding schema_idx in pfm_idx'th per_file_metadata (destination). The projected
// column's path must match across sources, else an appropriate exception is thrown.
std::function<void(column_name_info const*, int const, int const, int const)> map_column =
[&](column_name_info const* col_name_info,
int const src_schema_idx,
int const dst_schema_idx,
int const pfm_idx) {
auto const& src_schema_elem = get_schema(src_schema_idx);
auto const& dst_schema_elem = get_schema(dst_schema_idx, pfm_idx);

// Check the schema elements to be equal except their number of children as we only care about
// the specific column paths in the schema trees. Raise an invalid_argument error if the
// schema elements don't match.
CUDF_EXPECTS(equal_to_except_num_children(src_schema_elem, dst_schema_elem),
"Encountered mismatching SchemaElement properties for a column in "
"the selected path",
std::invalid_argument);

// If src_schema_elem is a stub, it does not exist in the column_name_info and column_buffer
// hierarchy. So continue on with mapping.
if (src_schema_elem.is_stub()) {
// Check if dst_schema_elem is also a stub i.e. has num_children == 1 that we didn't
// previously check. Raise an invalid_argument error if dst_schema_elem is not a stub.
CUDF_EXPECTS(dst_schema_elem.is_stub(),
"Encountered mismatching schemas for stub.",
std::invalid_argument);
auto const child_col_name_info = col_name_info ? &col_name_info->children[0] : nullptr;
return map_column(child_col_name_info,
src_schema_elem.children_idx[0],
dst_schema_elem.children_idx[0],
pfm_idx);
}

// The path ends here. If this is a list/struct col (has children), then map all its children
// which must be identical.
if (col_name_info == nullptr or col_name_info->children.empty()) {
// Check the number of children to be equal to be mapped. An out_of_range error if the
// number of children isn't equal.
CUDF_EXPECTS(src_schema_elem.num_children == dst_schema_elem.num_children,
"Encountered mismatching number of children for a "
"column in the selected path",
std::out_of_range);

std::for_each(thrust::make_counting_iterator(0),
thrust::make_counting_iterator(src_schema_elem.num_children),
[&](auto const child_idx) {
map_column(nullptr,
src_schema_elem.children_idx[child_idx],
dst_schema_elem.children_idx[child_idx],
pfm_idx);
});
}
// The path goes further down to specific child(ren) of this column so map only those
// children.
else {
std::for_each(
col_name_info->children.cbegin(),
col_name_info->children.cend(),
[&](auto const& child_col_name_info) {
// Ensure that each named child column exists in the destination schema tree for the
// paths to align up. An out_of_range error otherwise.
CUDF_EXPECTS(
find_schema_child(dst_schema_elem, child_col_name_info.name, pfm_idx) != -1,
"Encountered mismatching schema tree depths across data sources",
std::out_of_range);
map_column(&child_col_name_info,
find_schema_child(src_schema_elem, child_col_name_info.name),
find_schema_child(dst_schema_elem, child_col_name_info.name, pfm_idx),
pfm_idx);
});
}

// We're at a leaf and this is an input column (one with actual data stored) so map it.
if (src_schema_elem.num_children == 0) {
// Get the schema_idx_map for this data source (pfm)
auto& schema_idx_map = schema_idx_maps[pfm_idx - 1];

// Map the schema index from 0th tree (src) to the one in the current (dst) tree.
schema_idx_map[src_schema_idx] = dst_schema_idx;
}
};

std::vector<int> output_column_schemas;

//
Expand Down Expand Up @@ -1287,7 +1406,28 @@ aggregate_reader_metadata::select_columns(
for (auto& col : selected_columns) {
auto const& top_level_col_schema_idx = find_schema_child(root, col.name);
bool valid_column = build_column(&col, top_level_col_schema_idx, output_columns, false);
if (valid_column) output_column_schemas.push_back(top_level_col_schema_idx);
if (valid_column) {
output_column_schemas.push_back(top_level_col_schema_idx);

// Map the column's schema_idx across the rest of the data sources if required.
if (per_file_metadata.size() > 1 and not schema_idx_maps.empty()) {
std::for_each(thrust::make_counting_iterator(static_cast<size_t>(1)),
thrust::make_counting_iterator(per_file_metadata.size()),
[&](auto const pfm_idx) {
auto const& dst_root = get_schema(0, pfm_idx);
// Ensure that each top level column exists in the destination schema
// tree. An out_of_range error is thrown otherwise.
CUDF_EXPECTS(
find_schema_child(dst_root, col.name, pfm_idx) != -1,
"Encountered mismatching schema tree depths across data sources",
std::out_of_range);
map_column(&col,
top_level_col_schema_idx,
find_schema_child(dst_root, col.name, pfm_idx),
pfm_idx);
});
}
}
}
}

Expand Down
Loading

0 comments on commit fbd6114

Please sign in to comment.