Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[DNM] hashjoin opt2 #1124

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class GazellePluginConfig(conf: SQLConf) extends Logging {
conf.getConfString("spark.oap.sql.columnar.rowtocolumnar", "true").toBoolean && enableCpu

val forceShuffledHashJoin: Boolean =
conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "false").toBoolean &&
conf.getConfString("spark.oap.sql.columnar.forceshuffledhashjoin", "true").toBoolean &&
enableCpu

val resizeShuffledHashJoinInputPartitions: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,19 +311,19 @@ case class ColumnarShuffledHashJoinExec(
hash_relation_function,
Field.nullable("result", new ArrowType.Int(32, true)))
hashRelationKernel.build(hash_relation_schema, Lists.newArrayList(hash_relation_expr), true)
val beforeEval = System.nanoTime()
while (depIter.hasNext) {
val dep_cb = depIter.next()
(0 until dep_cb.numCols).toList.foreach(i =>
dep_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain())
hashRelationBatchHolder += dep_cb
val beforeEval = System.nanoTime()
val dep_rb = ConverterUtils.createArrowRecordBatch(dep_cb)
hashRelationKernel.evaluate(dep_rb)
ConverterUtils.releaseArrowRecordBatch(dep_rb)
build_elapse += System.nanoTime() - beforeEval

}
val hashRelationResultIterator = hashRelationKernel.finishByIterator()

build_elapse += System.nanoTime() - beforeEval
val native_function = TreeBuilder.makeFunction(
s"standalone",
Lists.newArrayList(getKernelFunction(1)),
Expand Down
12 changes: 3 additions & 9 deletions native-sql-engine/cpp/compile.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,8 @@ cd ${CURRENT_DIR}
if [ -d build ]; then
rm -r build
fi
mkdir build
mkdir -p build
cd build
cmake .. -DTESTS=${TESTS} -DBUILD_ARROW=${BUILD_ARROW} -DSTATIC_ARROW=${STATIC_ARROW} -DBUILD_PROTOBUF=${BUILD_PROTOBUF} -DARROW_ROOT=${ARROW_ROOT} -DARROW_BFS_INSTALL_DIR=${ARROW_BFS_INSTALL_DIR} -DBUILD_JEMALLOC=${BUILD_JEMALLOC}
make -j2

set +eu

make -j2

set +eu

cores=$(grep -c ^processor /proc/cpuinfo 2>/dev/null)
make -j$cores
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ class ConditionedProbeKernel::Impl {
for (auto payload_arr : payloads) {
payload_arr->Append(i, &unsafe_key_row);
}
if (!hash_relation_->maybeContains(unsafe_key_row)) {continue;}
int index = hash_relation_->Get(typed_key_array->GetView(i), unsafe_key_row);
if (index == -1) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "codegen/common/hash_relation_number.h"
#include "codegen/common/hash_relation_string.h"
#include "utils/macros.h"
#include "third_party/hyperloglog.hpp"

namespace sparkcolumnarplugin {
namespace codegen {
Expand Down Expand Up @@ -156,13 +157,21 @@ class HashRelationKernel::Impl {
key_projector_->Evaluate(*hash_in_batch, ctx_->memory_pool(), &hash_outputs));
key_array = hash_outputs[0];
key_hash_cached_.push_back(key_array);
// note: this estimated on its hash
auto typed_key_arr = std::make_shared<precompile::Int32Array>(key_array);
for (size_t i = 0; i< key_array->length(); i++) {
estimator.add(typed_key_arr->GetView(i));
}

return arrow::Status::OK();
}

arrow::Status FinishInternal() {
if (builder_type_ == 2) return arrow::Status::OK();
// Decide init hashmap size
estimated_cardinality_ = estimator.estimate();
std::cout << "cardinality: " << estimated_cardinality_ << std::endl;
std::cout << "number cached: " << num_total_cached_ << std::endl;
if (builder_type_ == 1) {
int init_key_capacity = 128;
int init_bytes_map_capacity = init_key_capacity * 256;
Expand All @@ -174,13 +183,16 @@ class HashRelationKernel::Impl {
if (key_size_ != -1) {
tmp_capacity *= 12;
} else {
//TODO: fully calcute the space for multiple keys
tmp_capacity *= 128;
}
if (tmp_capacity > INT_MAX) {
init_bytes_map_capacity = INT_MAX;
} else {
init_bytes_map_capacity = tmp_capacity;
}
std::cout << "init_key_capacity: " << init_key_capacity << std::endl;
std::cout << "init_bytes_capacity: " << init_bytes_map_capacity << std::endl;
RETURN_NOT_OK(
hash_relation_->InitHashTable(init_key_capacity, init_bytes_map_capacity));
}
Expand Down Expand Up @@ -265,6 +277,8 @@ class HashRelationKernel::Impl {
std::vector<arrow::ArrayVector> keys_cached_;
std::vector<std::shared_ptr<arrow::Array>> key_hash_cached_;
uint64_t num_total_cached_ = 0;
hll::HyperLogLog estimator;
uint64_t estimated_cardinality_ = 0;
int builder_type_ = 0;
bool semi_ = false;
int key_size_ = -1; // If key_size_ != 0, key will be stored directly in key_map
Expand Down
7 changes: 7 additions & 0 deletions native-sql-engine/cpp/src/codegen/common/hash_relation.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "precompile/unsafe_array.h"
#include "third_party/murmurhash/murmurhash32.h"
#include "third_party/row_wise_memory/hashMap.h"
#include "third_party/bf.hpp"
#include "utils/macros.h"

using sparkcolumnarplugin::codegen::arrowcompute::extra::ArrayItemIndex;
Expand Down Expand Up @@ -299,6 +300,10 @@ class HashRelation {
return 0;
}

bool maybeContains(std::shared_ptr<UnsafeRow> payload){
return bf.MaybeContains(std::string(payload->data, payload->cursor));
}

template <typename CType,
typename std::enable_if_t<is_number_or_decimal_type<CType>::value>* = nullptr>
int IfExists(int32_t v, CType payload) {
Expand Down Expand Up @@ -450,6 +455,7 @@ class HashRelation {
uint64_t num_arrays_ = 0;
std::vector<std::shared_ptr<HashRelationColumn>> hash_relation_column_list_;
unsafeHashMap* hash_table_ = nullptr;
qp::Bloomfilter<std::string, 1<<30, 6> bf;
using ArrayType = sparkcolumnarplugin::precompile::Int32Array;
bool null_index_set_ = false;
std::vector<ArrayItemIndex> null_index_list_;
Expand All @@ -462,6 +468,7 @@ class HashRelation {
arrow::Status Insert(int32_t v, std::shared_ptr<UnsafeRow> payload, uint32_t array_id,
uint32_t id) {
assert(hash_table_ != nullptr);
bf.Add(std::string(payload->data, payload->cursor));
auto index = ArrayItemIndex(array_id, id);
if (!append(hash_table_, payload.get(), v, (char*)&index, sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
Expand Down
60 changes: 60 additions & 0 deletions native-sql-engine/cpp/src/third_party/bf.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#ifndef BLOOMFILTER_H
#define BLOOMFILTER_H

#include <bitset>
#include <random>

namespace qp {

namespace {

// A utility class which provides uniformly distributed random numbers seeded
// with the hash function on a given input. Useful for generating multiple
// bloomfilter bit indexes for a key.
template <typename T, int Size, typename Hash = std::hash<T>>
struct Mixer {
std::minstd_rand rng_;
Mixer(const T& val) : rng_(Hash()(val)) {}
std::size_t operator()() { return rng_() % Size; }
};

} // namespace

// A probabilistic space efficient data structure used for testing membership in
// a set.
// https://en.wikipedia.org/wiki/Bloom_filter
template <typename Key, int Size, int NumHashes, typename Hash = std::hash<Key>>
class Bloomfilter {
public:
Bloomfilter() = default;

Bloomfilter(const std::initializer_list<Key>& init) {
for (const auto& key : init) {
Add(key);
}
}

constexpr int size() const { return Size; }

void Add(const Key& key) {
Mixer<Key, Size, Hash> mixer(key);
for (int i = 0; i < NumHashes; ++i) {
bits_.set(mixer());
}
}

bool MaybeContains(const Key& key) const {
Mixer<Key, Size, Hash> mixer(key);
for (int i = 0; i < NumHashes; ++i) {
if (!bits_[mixer()]) return false;
}
return true;
}

private:
std::bitset<Size> bits_;
};

} // namespace qp

#endif /* BLOOMFILTER_H */