Skip to content

Commit

Permalink
Hash join changes for probe side spilling support
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Mar 1, 2024
1 parent b557ab6 commit d096565
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 235 deletions.
15 changes: 8 additions & 7 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,9 @@ bool HashBuild::finishHashBuild() {
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit);
addRuntimeStats();
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_);
if (spillEnabled()) {
intermediateStateCleared_ = true;
spillGroup_->restart();
}
Expand Down Expand Up @@ -1222,11 +1223,11 @@ void HashBuild::reclaim(
}

bool HashBuild::nonReclaimableState() const {
// Apart from being in the nonReclaimable section,
// its also not reclaimable if:
// 1) the hash table has been built by the last build thread (inidicated
// by state_)
// 2) the last build operator has transferred ownership of 'this' operator's
// Apart from being in the nonReclaimable section, it's also not reclaimable
// if:
// 1) the hash table has been built by the last build thread (indicated by
// state_)
// 2) the last build operator has transferred ownership of 'this operator's
// intermediate state (table_ and spiller_) to itself
// 3) it has completed spilling before reaching either of the previous
// two states.
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ class HashBuild final : public Operator {
// The row type used for hash table build and disk spilling.
RowTypePtr tableType_;

// Used to serialize access to intermediate state variables (like 'table_' and
// 'spiller_'). This is only required when variables are accessed
// Used to serialize access to internal state including 'table_' and
// 'spiller_'. This is only required when variables are accessed
// concurrently, that is, when a thread tries to close the operator while
// another thread is building the hash table. Refer to 'close()' and
// finishHashBuild()' for more details.
Expand Down Expand Up @@ -316,8 +316,8 @@ class HashBuild final : public Operator {
uint64_t numSpillBytes_{0};

// This can be nullptr if either spilling is not allowed or it has been
// trsnaferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to setup a new one for recursive spilling.
// transferred to the last hash build operator while in kWaitForBuild state or
// it has been cleared to set up a new one for recursive spilling.
std::unique_ptr<Spiller> spiller_;

// Used to read input from previously spilled data for restoring.
Expand Down
75 changes: 53 additions & 22 deletions velox/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ void HashJoinBridge::addBuilder() {
++numBuilders_;
}

bool HashJoinBridge::setHashTable(
void HashJoinBridge::setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
bool hasNullKeys) {
VELOX_CHECK_NOT_NULL(table, "setHashTable called with null table");

auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);

bool hasSpillData;
std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
Expand All @@ -64,12 +63,30 @@ bool HashJoinBridge::setHashTable(
std::move(spillPartitionIdSet),
hasNullKeys);
restoringSpillPartitionId_.reset();

hasSpillData = !spillPartitionSets_.empty();
promises = std::move(promises_);
}
notify(std::move(promises));
return hasSpillData;
}

void HashJoinBridge::setSpilledHashTable(SpillPartitionSet spillPartitionSet) {
VELOX_CHECK(
!spillPartitionSet.empty(), "Spilled table partitions can't be empty");
std::shared_ptr<BaseHashTable> tableToFree;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(started_);
VELOX_CHECK(buildResult_.has_value());
VELOX_CHECK(restoringSpillShards_.empty());
VELOX_CHECK(!restoringSpillPartitionId_.has_value());

for (auto& partitionEntry : spillPartitionSet) {
const auto id = partitionEntry.first;
VELOX_CHECK_EQ(spillPartitionSets_.count(id), 0);
spillPartitionSets_.emplace(id, std::move(partitionEntry.second));
}
// Free up the memory resource of the spilled hash table.
tableToFree = std::move(buildResult_->table);
}
}

void HashJoinBridge::setAntiJoinHasNullKeys() {
Expand Down Expand Up @@ -131,10 +148,8 @@ bool HashJoinBridge::probeFinished() {
spillPartitionSets_.begin()->second->split(numBuilders_);
VELOX_CHECK_EQ(restoringSpillShards_.size(), numBuilders_);
spillPartitionSets_.erase(spillPartitionSets_.begin());
promises = std::move(promises_);
} else {
VELOX_CHECK(promises_.empty());
}
promises = std::move(promises_);
}
notify(std::move(promises));
return hasSpillInput;
Expand All @@ -148,14 +163,16 @@ std::optional<HashJoinBridge::SpillInput> HashJoinBridge::spillInputOrFuture(
VELOX_DCHECK(
!restoringSpillPartitionId_.has_value() || !buildResult_.has_value());

if (buildResult_.has_value()) {
VELOX_CHECK(!restoringSpillPartitionId_.has_value());
promises_.emplace_back("HashJoinBridge::spillInputOrFuture");
*future = promises_.back().getSemiFuture();
return std::nullopt;
}
if (!restoringSpillPartitionId_.has_value()) {
if (spillPartitionSets_.empty()) {
return HashJoinBridge::SpillInput{};
} else {
promises_.emplace_back("HashJoinBridge::spillInputOrFuture");
*future = promises_.back().getSemiFuture();
return std::nullopt;
}
VELOX_CHECK(spillPartitionSets_.empty());
VELOX_CHECK(restoringSpillShards_.empty());
return HashJoinBridge::SpillInput{};
}
VELOX_CHECK(!restoringSpillShards_.empty());
auto spillShard = std::move(restoringSpillShards_.back());
Expand All @@ -175,22 +192,36 @@ uint64_t HashJoinMemoryReclaimer::reclaim(
uint64_t targetBytes,
uint64_t maxWaitMs,
memory::MemoryReclaimer::Stats& stats) {
bool hasReclaimedFromBuild{false};
bool hasReclaimedFromProbe{false};
uint64_t reclaimedBytes{0};
pool->visitChildren([&](memory::MemoryPool* child) {
VELOX_CHECK_EQ(child->kind(), memory::MemoryPool::Kind::kLeaf);
// The hash probe operator do not support memory reclaim.
if (!isHashBuildMemoryPool(*child)) {
return true;
const bool isBuild = isHashBuildMemoryPool(*child);
if (isBuild) {
if (!hasReclaimedFromBuild) {
hasReclaimedFromBuild = true;
reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats);
}
// We only need to reclaim from any one of the hash build operators
// which will reclaim from all the peer hash build operators.
return !hasReclaimedFromProbe;
}

if (!hasReclaimedFromProbe) {
hasReclaimedFromProbe = true;
reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats);
}
// We only need to reclaim from any one of the hash build operators
// which will reclaim from all the peer hash build operators.
reclaimedBytes = child->reclaim(targetBytes, maxWaitMs, stats);
return false;
return !hasReclaimedFromBuild;
});
return reclaimedBytes;
}

bool isHashBuildMemoryPool(const memory::MemoryPool& pool) {
return folly::StringPiece(pool.name()).endsWith("HashBuild");
}

bool isHashProbeMemoryPool(const memory::MemoryPool& pool) {
return folly::StringPiece(pool.name()).endsWith("HashProbe");
}
} // namespace facebook::velox::exec
24 changes: 16 additions & 8 deletions velox/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@ class HashJoinBridge : public JoinBridge {
/// HashBuild operators to parallelize the restoring operation.
void addBuilder();

/// Invoked by the build operator to set the built hash table.
/// 'spillPartitionSet' contains the spilled partitions while building
/// 'table'. The function returns true if there is spill data to restore
/// after HashProbe operators process 'table', otherwise false. This only
/// applies if the disk spilling is enabled.
bool setHashTable(
/// 'table' which only applies if the disk spilling is enabled.
void setHashTable(
std::unique_ptr<BaseHashTable> table,
SpillPartitionSet spillPartitionSet,
bool hasNullKeys);

/// Invoked by the probe operator to set the spilled hash table while the
/// probing. The function puts the spilled table partitions into
/// 'spillPartitionSets_' stack as well as clearing the table set in
/// 'buildResult_' to free up the memory resource held by the spilled table.
/// This only applies if the disk spilling is enabled.
void setSpilledHashTable(SpillPartitionSet spillPartitionSet);

void setAntiJoinHasNullKeys();

/// Represents the result of HashBuild operators: a hash table, an optional
Expand Down Expand Up @@ -75,8 +81,7 @@ class HashJoinBridge : public JoinBridge {
/// HashBuild operators. If HashProbe operator calls this early, 'future' will
/// be set to wait asynchronously, otherwise the built table along with
/// optional spilling related information will be returned in HashBuildResult.
std::optional<HashBuildResult> tableOrFuture(
ContinueFuture* FOLLY_NONNULL future);
std::optional<HashBuildResult> tableOrFuture(ContinueFuture* future);

/// Invoked by HashProbe operator after finishes probing the built table to
/// set one of the previously spilled partition to restore. The HashBuild
Expand All @@ -102,8 +107,7 @@ class HashJoinBridge : public JoinBridge {
/// If HashBuild operator calls this early, 'future' will be set to wait
/// asynchronously. If there is no more spill data to restore, then
/// 'spillPartition' will be set to null in the returned SpillInput.
std::optional<SpillInput> spillInputOrFuture(
ContinueFuture* FOLLY_NONNULL future);
std::optional<SpillInput> spillInputOrFuture(ContinueFuture* future);

private:
uint32_t numBuilders_{0};
Expand Down Expand Up @@ -156,4 +160,8 @@ class HashJoinMemoryReclaimer final : public MemoryReclaimer {
/// Returns true if 'pool' is a hash build operator's memory pool. The check is
/// currently based on the pool name.
bool isHashBuildMemoryPool(const memory::MemoryPool& pool);

/// Returns true if 'pool' is a hash probe operator's memory pool. The check is
/// currently based on the pool name.
bool isHashProbeMemoryPool(const memory::MemoryPool& pool);
} // namespace facebook::velox::exec
66 changes: 40 additions & 26 deletions velox/exec/tests/HashJoinBridgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,17 @@ TEST_P(HashJoinBridgeTest, withoutSpill) {
auto joinBridge = createJoinBridge();
// Can't call any other APIs except addBuilder() before start a join bridge
// first.
ASSERT_ANY_THROW(
joinBridge->setHashTable(createFakeHashTable(), {}, false));
ASSERT_ANY_THROW(joinBridge->setAntiJoinHasNullKeys());
ASSERT_ANY_THROW(joinBridge->probeFinished());
ASSERT_ANY_THROW(joinBridge->tableOrFuture(&futures[0]));
ASSERT_ANY_THROW(joinBridge->spillInputOrFuture(&futures[0]));
VELOX_ASSERT_THROW(
joinBridge->setHashTable(createFakeHashTable(), {}, false), "");
VELOX_ASSERT_THROW(joinBridge->setAntiJoinHasNullKeys(), "");
VELOX_ASSERT_THROW(joinBridge->probeFinished(), "");
VELOX_ASSERT_THROW(joinBridge->tableOrFuture(&futures[0]), "");
VELOX_ASSERT_THROW(joinBridge->spillInputOrFuture(&futures[0]), "");
VELOX_ASSERT_THROW(
joinBridge->setSpilledHashTable(makeFakeSpillPartitionSet(0)), "");

// Can't start a bridge without any builders.
ASSERT_ANY_THROW(joinBridge->start());
VELOX_ASSERT_THROW(joinBridge->start(), "");

joinBridge = createJoinBridge();

Expand All @@ -176,13 +178,13 @@ TEST_P(HashJoinBridgeTest, withoutSpill) {
BaseHashTable* rawTable = nullptr;
if (hasNullKeys) {
joinBridge->setAntiJoinHasNullKeys();
ASSERT_ANY_THROW(joinBridge->setAntiJoinHasNullKeys());
VELOX_ASSERT_THROW(joinBridge->setAntiJoinHasNullKeys(), "");
} else {
auto table = createFakeHashTable();
rawTable = table.get();
joinBridge->setHashTable(std::move(table), {}, false);
ASSERT_ANY_THROW(
joinBridge->setHashTable(createFakeHashTable(), {}, false));
VELOX_ASSERT_THROW(
joinBridge->setHashTable(createFakeHashTable(), {}, false), "");
}

for (int32_t i = 0; i < numProbers_; ++i) {
Expand All @@ -209,15 +211,24 @@ TEST_P(HashJoinBridgeTest, withoutSpill) {
}
}

// Verify builder will see no spill input.
// Verify builder will wait for probe side finish signal even if there is no
// spill input.
auto inputOr = joinBridge->spillInputOrFuture(&futures[0]);
ASSERT_FALSE(inputOr.has_value());
ASSERT_TRUE(futures[0].valid());

// Probe side completion.
ASSERT_FALSE(joinBridge->probeFinished());

futures[0].wait();

futures = createEmptyFutures(1);
inputOr = joinBridge->spillInputOrFuture(&futures[0]);
ASSERT_TRUE(inputOr.has_value());
ASSERT_FALSE(futures[0].valid());
ASSERT_TRUE(inputOr.value().spillPartition == nullptr);

// Probe side completion.
ASSERT_FALSE(joinBridge->probeFinished());
ASSERT_ANY_THROW(joinBridge->probeFinished());
VELOX_ASSERT_THROW(joinBridge->probeFinished(), "");
}
}

Expand Down Expand Up @@ -276,7 +287,7 @@ TEST_P(HashJoinBridgeTest, withSpill) {
} else {
numSpilledPartitions += spillPartitionSet.size();
spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet);
hasMoreSpill = joinBridge->setHashTable(
joinBridge->setHashTable(
createFakeHashTable(), std::move(spillPartitionSet), false);
}

Expand Down Expand Up @@ -483,29 +494,32 @@ TEST_P(HashJoinBridgeTest, multiThreading) {
}
}

TEST_P(HashJoinBridgeTest, isHashBuildMemoryPool) {
TEST_P(HashJoinBridgeTest, isHashJoinMemoryPools) {
auto root = memory::memoryManager()->addRootPool("isHashBuildMemoryPool");
struct {
std::string poolName;
bool expectedHashBuildPool;
bool isHashBuildPool;
bool isHashProbePool;

std::string debugString() const {
return fmt::format(
"poolName: {}, expectedHashBuildPool: {}",
"poolName: {}, isHashBuildPool: {}, isHashProbePool: {}",
poolName,
expectedHashBuildPool);
isHashBuildPool,
isHashProbePool);
}
} testSettings[] = {
{"HashBuild", true},
{"HashBuildd", false},
{"hHashBuild", true},
{"hHashProbe", false},
{"HashProbe", false},
{"HashProbeh", false}};
{"HashBuild", true, false},
{"HashBuildd", false, false},
{"hHashBuild", false, false},
{"hHashProbe", false, false},
{"HashProbe", false, true},
{"HashProbeh", false, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
const auto pool = root->addLeafChild(testData.poolName);
ASSERT_EQ(isHashBuildMemoryPool(*pool), testData.expectedHashBuildPool);
ASSERT_EQ(isHashBuildMemoryPool(*pool), testData.isHashBuildPool);
ASSERT_EQ(isHashProbeMemoryPool(*pool), testData.isHashProbePool);
}
}

Expand Down
Loading

0 comments on commit d096565

Please sign in to comment.