Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Hive hash computation for nested types #2720

Draft
wants to merge 8 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,8 @@ ConfigureBench(BLOOM_FILTER_BENCH
ConfigureBench(GET_JSON_OBJECT_BENCH
get_json_object.cu)

ConfigureBench(HASH_BENCH
hash.cu)

ConfigureBench(PARSE_URI_BENCH
parse_uri.cpp)
67 changes: 67 additions & 0 deletions src/main/cpp/benchmarks/hash.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <benchmarks/common/generate_input.hpp>

#include <cudf_test/column_utilities.hpp>

#include <cudf/io/types.hpp>

#include <hash.hpp>
#include <nvbench/nvbench.cuh>

constexpr auto min_width = 10;
constexpr auto max_width = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this two be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The optimization effect is related to the schema, and I'm not sure how to write the benchmark for a good comparison.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark should support running either (or both) lists and structs. You can switch benchmark types using a boolean flag. That flag should not be constexpr so we can compile all the code to verify syntax.

auto const bench_structs = false;
auto const data_profile = [&] {
  if (bench_structs) {
    ...
    .struct_types(...);
    return ...;
  } else {
    ...
    .list_type(...);
    return ...;
  }();
   


static void hash(nvbench::state& state)
{
std::size_t const size_bytes = static_cast<cudf::size_type>(state.get_int64("size_bytes"));
// cudf::size_type const list_depth = static_cast<cudf::size_type>(state.get_int64("list_depth"));

data_profile const table_profile =
data_profile_builder()
.no_validity()
//.distribution(cudf::type_id::LIST, distribution_id::NORMAL, min_width, max_width)
//.list_depth(list_depth)
//.list_type(cudf::type_id::INT32);
.struct_types(std::vector<cudf::type_id>{
cudf::type_id::BOOL8, cudf::type_id::INT32, cudf::type_id::FLOAT32});

auto const input_table = create_random_table(
std::vector<cudf::type_id>{cudf::type_id::STRUCT}, table_size_bytes{size_bytes}, table_profile);

auto const stream = cudf::get_default_stream();
state.set_cuda_stream(nvbench::make_cuda_stream_view(stream.value()));
state.exec(nvbench::exec_tag::timer | nvbench::exec_tag::sync,
[&](nvbench::launch& launch, auto& timer) {
timer.start();
auto const output = spark_rapids_jni::hive_hash(*input_table);
stream.synchronize();
timer.stop();
});

auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value");
state.add_global_memory_reads<nvbench::int8_t>(size_bytes);
}

NVBENCH_BENCH(hash).set_name("hash").add_int64_axis(
"size_bytes",
{50'000'000,
100'000'000,
250'000'000,
500'000'000,
1'000'000'000}); // 50MB, 100MB, 250MB, 500MB, 1GB
//.add_int64_axis("list_depth", {1, 2, 4});
ustcfy marked this conversation as resolved.
Show resolved Hide resolved
179 changes: 139 additions & 40 deletions src/main/cpp/src/hive_hash.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/structs/structs_column_view.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_device_view.cuh>

Expand Down Expand Up @@ -161,8 +162,16 @@ hive_hash_value_t __device__ inline hive_hash_function<cudf::timestamp_us>::oper
template <template <typename> class hash_function, typename Nullate>
class hive_device_row_hasher {
public:
CUDF_HOST_DEVICE hive_device_row_hasher(Nullate check_nulls, cudf::table_device_view t) noexcept
: _check_nulls{check_nulls}, _table{t}
CUDF_HOST_DEVICE hive_device_row_hasher(Nullate check_nulls,
cudf::table_device_view t,
cudf::column_device_view* flattened_column_views,
cudf::size_type* first_child_index,
cudf::size_type* nested_column_map) noexcept
: _check_nulls{check_nulls},
_table{t},
_flattened_column_views{flattened_column_views},
_first_child_index{first_child_index},
_nested_column_map{nested_column_map}
{
// Error out if passed an unsupported hash_function
static_assert(std::is_base_of_v<hive_hash_function<int>, hash_function<int>>,
Expand All @@ -182,9 +191,19 @@ class hive_device_row_hasher {
_table.end(),
HIVE_INIT_HASH,
cuda::proclaim_return_type<hive_hash_value_t>(
[row_index, nulls = this->_check_nulls] __device__(auto hash, auto const& column) {
auto cur_hash =
cudf::type_dispatcher(column.type(), element_hasher_adapter{nulls}, column, row_index);
[row_index,
nulls = this->_check_nulls,
table = this->_table,
flattened_column_views = this->_flattened_column_views,
first_child_index = this->_first_child_index,
nested_column_map = this->_nested_column_map] __device__(auto hash, auto const& column) {
cudf::size_type col_idx = &column - table.begin();
auto cur_hash = cudf::type_dispatcher(
column.type(),
element_hasher_adapter{
nulls, col_idx, flattened_column_views, first_child_index, nested_column_map},
column,
row_index);
return HIVE_HASH_FACTOR * hash + cur_hash;
}));
}
Expand All @@ -197,8 +216,16 @@ class hive_device_row_hasher {
public:
using hash_functor_t = cudf::experimental::row::hash::element_hasher<hash_function, Nullate>;

__device__ element_hasher_adapter(Nullate check_nulls) noexcept
: hash_functor{check_nulls, HIVE_INIT_HASH, HIVE_INIT_HASH}
__device__ element_hasher_adapter(Nullate check_nulls,
cudf::size_type col_idx,
cudf::column_device_view* flattened_column_views,
cudf::size_type* first_child_index,
cudf::size_type* nested_column_map) noexcept
: hash_functor{check_nulls, HIVE_INIT_HASH, HIVE_INIT_HASH},
_col_idx{col_idx},
_flattened_column_views{flattened_column_views},
_first_child_index{first_child_index},
_nested_column_map{nested_column_map}
{
}

Expand All @@ -214,16 +241,16 @@ class hive_device_row_hasher {
*/
struct col_stack_frame {
private:
cudf::column_device_view _column; // the column has only one row
hive_hash_value_t _cur_hash; // current hash value of the column
int _idx_to_process; // the index of child or element to process next
cudf::size_type _col_idx; // the column has only one row
ustcfy marked this conversation as resolved.
Show resolved Hide resolved
cudf::size_type _row_idx; // the index of the row in the column
int _idx_to_process; // the index of child or element to process next
hive_hash_value_t _cur_hash; // current hash value of the column

public:
__device__ col_stack_frame() =
delete; // Because the default constructor of `cudf::column_device_view` is deleted
__device__ col_stack_frame() = default;

__device__ col_stack_frame(cudf::column_device_view col)
: _column(std::move(col)), _idx_to_process(0), _cur_hash(HIVE_INIT_HASH)
__device__ col_stack_frame(cudf::size_type col_idx, cudf::size_type row_idx)
: _col_idx(col_idx), _row_idx(row_idx), _cur_hash(HIVE_INIT_HASH), _idx_to_process(0)
{
}

Expand All @@ -238,7 +265,9 @@ class hive_device_row_hasher {

__device__ int get_idx_to_process() { return _idx_to_process; }

__device__ cudf::column_device_view get_column() { return _column; }
__device__ cudf::size_type get_col_idx() { return _col_idx; }

__device__ cudf::size_type get_row_idx() { return _row_idx; }
};

/**
Expand Down Expand Up @@ -363,53 +392,57 @@ class hive_device_row_hasher {
__device__ hive_hash_value_t operator()(cudf::column_device_view const& col,
cudf::size_type row_index) const noexcept
{
cudf::column_device_view curr_col = col.slice(row_index, 1);
// The default constructor of `col_stack_frame` is deleted, so it can not allocate an array
// of `col_stack_frame` directly.
// Instead leverage the byte array to create the col_stack_frame array.
alignas(col_stack_frame) char stack_wrapper[sizeof(col_stack_frame) * MAX_STACK_DEPTH];
auto col_stack = reinterpret_cast<col_stack_frame*>(stack_wrapper);
int stack_size = 0;
cudf::size_type curr_col_idx = _nested_column_map[this->_col_idx];
cudf::size_type curr_row_idx = row_index;

col_stack[stack_size++] = col_stack_frame(curr_col);
col_stack_frame col_stack[MAX_STACK_DEPTH];
int stack_size = 0;
col_stack[stack_size++] = col_stack_frame(curr_col_idx, curr_row_idx);

while (stack_size > 0) {
col_stack_frame& top = col_stack[stack_size - 1];
curr_col = top.get_column();
curr_col_idx = top.get_col_idx();
curr_row_idx = top.get_row_idx();
auto const& curr_col = _flattened_column_views[curr_col_idx];
// Do not pop it until it is processed. The definition of `processed` is:
// - For structs, it is when all child columns are processed.
// - For lists, it is when all elements in the list are processed.
if (curr_col.type().id() == cudf::type_id::STRUCT) {
if (top.get_idx_to_process() == curr_col.num_child_columns()) {
if (--stack_size > 0) { col_stack[stack_size - 1].update_cur_hash(top.get_hash()); }
} else {
auto const structcv = cudf::detail::structs_column_device_view(curr_col);
while (top.get_idx_to_process() < curr_col.num_child_columns()) {
auto idx = top.get_and_inc_idx_to_process();
auto const child_col = structcv.get_sliced_child(idx);
auto idx = top.get_and_inc_idx_to_process();
auto child_col_idx = _first_child_index[curr_col_idx] + idx;
auto const& child_col = _flattened_column_views[child_col_idx];
// If the child is of primitive type, accumulate child hash into struct hash
if (child_col.type().id() != cudf::type_id::LIST &&
child_col.type().id() != cudf::type_id::STRUCT) {
auto child_hash =
cudf::type_dispatcher<cudf::experimental::dispatch_void_if_nested>(
child_col.type(), this->hash_functor, child_col, 0);
child_col.type(), this->hash_functor, child_col, curr_row_idx);
top.update_cur_hash(child_hash);
} else {
col_stack[stack_size++] = col_stack_frame(child_col);
col_stack[stack_size++] = col_stack_frame(child_col_idx, curr_row_idx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it is better to update the array element(col_stack_frame) inplacely instead of creating a new object.

break;
}
}
}
} else if (curr_col.type().id() == cudf::type_id::LIST) {
// Get the child column of the list column
cudf::column_device_view child_col =
cudf::detail::lists_column_device_view(curr_col).get_sliced_child();
auto offset_col_idx = _first_child_index[curr_col_idx];
auto child_col_idx = offset_col_idx + 1;
auto const& offsets_col = _flattened_column_views[offset_col_idx];
auto const& child_col = _flattened_column_views[child_col_idx];
auto child_row_idx_begin = offsets_col.element<cudf::size_type>(curr_row_idx);
auto child_row_idx_end = offsets_col.element<cudf::size_type>(curr_row_idx + 1);

// If the child column is of primitive type, directly compute the hash value of the list
if (child_col.type().id() != cudf::type_id::LIST &&
child_col.type().id() != cudf::type_id::STRUCT) {
auto single_level_list_hash = cudf::detail::accumulate(
thrust::counting_iterator(0),
thrust::counting_iterator(child_col.size()),
thrust::counting_iterator(child_row_idx_begin),
thrust::counting_iterator(child_row_idx_end),
HIVE_INIT_HASH,
[child_col, hasher = this->hash_functor] __device__(auto hash, auto element_index) {
auto cur_hash = cudf::type_dispatcher<cudf::experimental::dispatch_void_if_nested>(
Expand All @@ -419,12 +452,12 @@ class hive_device_row_hasher {
top.update_cur_hash(single_level_list_hash);
sperlingxx marked this conversation as resolved.
Show resolved Hide resolved
if (--stack_size > 0) { col_stack[stack_size - 1].update_cur_hash(top.get_hash()); }
} else {
if (top.get_idx_to_process() == child_col.size()) {
if (top.get_idx_to_process() == child_row_idx_end - child_row_idx_begin) {
if (--stack_size > 0) { col_stack[stack_size - 1].update_cur_hash(top.get_hash()); }
} else {
// Push the next element into the stack
col_stack[stack_size++] =
col_stack_frame(child_col.slice(top.get_and_inc_idx_to_process(), 1));
col_stack[stack_size++] = col_stack_frame(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

child_col_idx, child_row_idx_begin + top.get_and_inc_idx_to_process());
}
}
}
Expand All @@ -434,10 +467,17 @@ class hive_device_row_hasher {

private:
hash_functor_t const hash_functor;
cudf::size_type const _col_idx;
cudf::column_device_view* _flattened_column_views;
cudf::size_type* _first_child_index;
cudf::size_type* _nested_column_map;
};

Nullate const _check_nulls;
cudf::table_device_view const _table;
cudf::column_device_view* _flattened_column_views;
cudf::size_type* _first_child_index;
cudf::size_type* _nested_column_map;
};

void check_nested_depth(cudf::table_view const& input)
Expand Down Expand Up @@ -486,15 +526,74 @@ std::unique_ptr<cudf::column> hive_hash(cudf::table_view const& input,

check_nested_depth(input);

// `flattened_column_views` only contains nested columns and columns that result from flattening
// nested columns
std::vector<cudf::column_view> flattened_column_views;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column view constructor will calculate null count which is time consuming.
The original approach does not need to calculate null count.
We may need to find a way to avoid this column view array.
Please help check if the contiguous_copy_column_device_views is helpful?

// `first_child_index` has the same size as `flattened_column_views`
std::vector<cudf::size_type> first_child_index;
// `nested_column_map` has the same size as `input.num_columns()`, and it maps the column index in
// `input` to the index in `flattened_column_views`
std::vector<cudf::size_type> nested_column_map;
// Construct the `flattened_column_views` by level order traversal
for (auto i = 0; i < input.num_columns(); i++) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extract a function like get_flattened_device_views

cudf::column_view col = input.column(i);
if (col.type().id() == cudf::type_id::LIST || col.type().id() == cudf::type_id::STRUCT) {
nested_column_map.push_back(flattened_column_views.size());
flattened_column_views.push_back(col);
// flattened_column_views[idx] is the next column to process
for (auto idx = nested_column_map.back();
idx < static_cast<cudf::size_type>(flattened_column_views.size());
idx++) {
col = flattened_column_views[idx];
if (col.type().id() == cudf::type_id::LIST) {
first_child_index.push_back(flattened_column_views.size());
flattened_column_views.push_back(cudf::lists_column_view(col).offsets());
flattened_column_views.push_back(cudf::lists_column_view(col).get_sliced_child(stream));
} else if (col.type().id() == cudf::type_id::STRUCT) {
first_child_index.push_back(flattened_column_views.size());
for (auto child_idx = 0; child_idx < col.num_children(); child_idx++) {
flattened_column_views.push_back(
cudf::structs_column_view(col).get_sliced_child(child_idx, stream));
}
} else {
first_child_index.push_back(-1);
}
}
} else {
nested_column_map.push_back(-1);
}
}

std::vector<cudf::column_device_view> device_flattened_column_views;
device_flattened_column_views.reserve(flattened_column_views.size());

for (auto const& col : flattened_column_views) {
auto const device_view = cudf::column_device_view::create(col, stream);
device_flattened_column_views.push_back(*device_view);
}

auto flattened_column_device_views =
cudf::detail::make_device_uvector_async(device_flattened_column_views, stream, mr);
auto first_child_index_view =
cudf::detail::make_device_uvector_async(first_child_index, stream, mr);
auto nested_column_map_view =
cudf::detail::make_device_uvector_async(nested_column_map, stream, mr);
Comment on lines +572 to +575
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory resource mr is only passed to the function that creates the the final output (or part of it). These variables are not returned thus we only pass in cudf::get_current_device_resource_ref().

Currently they are the same but this is required for future evolution of libcudf.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And please check if there are similar situations in the other places that need to be changed.

stream.synchronize();

bool const nullable = has_nested_nulls(input);
auto const input_view = cudf::table_device_view::create(input, stream);
auto output_view = output->mutable_view();

// Compute the hash value for each row
thrust::tabulate(rmm::exec_policy(stream),
output_view.begin<hive_hash_value_t>(),
output_view.end<hive_hash_value_t>(),
hive_device_row_hasher<hive_hash_function, bool>(nullable, *input_view));
thrust::tabulate(
rmm::exec_policy(stream),
output_view.begin<hive_hash_value_t>(),
output_view.end<hive_hash_value_t>(),
hive_device_row_hasher<hive_hash_function, bool>(nullable,
*input_view,
flattened_column_device_views.data(),
first_child_index_view.data(),
nested_column_map_view.data()));

return output;
}
Expand Down
Loading