Skip to content

Commit

Permalink
Fix the race between task termination and join table build
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Aug 4, 2023
1 parent 0d328e5 commit 4263e9f
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 12 deletions.
4 changes: 4 additions & 0 deletions velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,8 @@ void HashAggregation::close() {
output_ = nullptr;
groupingSet_.reset();
}

void HashAggregation::abort() {
close();
}
} // namespace facebook::velox::exec
2 changes: 2 additions & 0 deletions velox/exec/HashAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class HashAggregation : public Operator {

void close() override;

void abort() override;

private:
void updateRuntimeStats();

Expand Down
8 changes: 5 additions & 3 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,8 @@ bool HashBuild::finishHashBuild() {
return false;
}

TestValue::adjust("facebook::velox::exec::HashBuild::finishHashBuild", this);

auto promisesGuard = folly::makeGuard([&]() {
// Realize the promises so that the other Drivers (which were not
// the last to finish) can continue from the barrier and finish.
Expand All @@ -736,7 +738,7 @@ bool HashBuild::finishHashBuild() {
for (auto& peer : peers) {
auto op = peer->findOperator(planNodeId());
HashBuild* build = dynamic_cast<HashBuild*>(op);
VELOX_CHECK(build);
VELOX_CHECK_NOT_NULL(build);
if (build->joinHasNullKeys_) {
joinHasNullKeys_ = true;
if (isAntiJoin(joinType_) && nullAware_ && !joinNode_->filter()) {
Expand Down Expand Up @@ -1055,8 +1057,8 @@ void HashBuild::reclaim(uint64_t /*unused*/) {
}
}

void HashBuild::close() {
Operator::close();
void HashBuild::abort() {
Operator::abort();

// Free up major memory usage.
joinBridge_.reset();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class HashBuild final : public Operator {

void reclaim(uint64_t targetBytes) override;

void close() override;
void abort() override;

private:
void setState(State state);
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1408,8 +1408,8 @@ void HashProbe::setRunning() {
setState(ProbeOperatorState::kRunning);
}

void HashProbe::close() {
Operator::close();
void HashProbe::abort() {
Operator::abort();

// Free up major memory usage.
joinBridge_.reset();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class HashProbe : public Operator {
return false;
}

void close() override;
void abort() override;

void clearDynamicFilters() override;

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ void Operator::MemoryReclaimer::abort(memory::MemoryPool* pool) {
driver->state().isTerminated);
VELOX_CHECK(driver->task()->isCancelled());

// Calls operator close to free up major memory usage.
op_->close();
// Calls operator abort to free up major memory usage.
op_->abort();
}
} // namespace facebook::velox::exec
6 changes: 6 additions & 0 deletions velox/exec/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,12 @@ class Operator : public BaseRuntimeStatWriter {
operatorCtx_->pool()->release();
}

// Invoked by memory arbitrator to free up operator's resource on memory
// abort. The query will stop running after this.
virtual void abort() {
close();
}

// Returns true if 'this' never has more output rows than input rows.
virtual bool isFilter() const {
return false;
Expand Down
4 changes: 2 additions & 2 deletions velox/exec/OrderBy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,8 @@ void OrderBy::prepareOutput() {
}
}

void OrderBy::close() {
Operator::close();
void OrderBy::abort() {
Operator::abort();

output_ = nullptr;
spiller_.reset();
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/OrderBy.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class OrderBy : public Operator {

void reclaim(uint64_t targetBytes) override;

void close() override;
void abort() override;

private:
// Checks if input will fit in the existing memory and increases
Expand Down
26 changes: 26 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,32 @@ DEBUG_ONLY_TEST_P(MultiThreadedHashJoinTest, parallelJoinBuildCheck) {
ASSERT_EQ(numDrivers_ == 1, !isParallelBuild);
}

DEBUG_ONLY_TEST_P(
MultiThreadedHashJoinTest,
raceBetweenTaskTerminateAndTableBuild) {
if (numDrivers_ == 1) {
return;
}
std::atomic<bool> isParallelBuild{false};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::HashBuild::finishHashBuild",
std::function<void(Operator*)>([&](Operator* op) {
auto task = op->testingOperatorCtx()->task();
task->requestAbort();
}));
VELOX_ASSERT_THROW(
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(numDrivers_)
.keyTypes({BIGINT(), VARCHAR()})
.probeVectors(1600, 5)
.buildVectors(1500, 5)
.referenceQuery(
"SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 AND t_k1 = u_k1")
.injectSpill(false)
.run(),
"Aborted for external error");
}

TEST_P(MultiThreadedHashJoinTest, allTypes) {
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.keyTypes(
Expand Down

0 comments on commit 4263e9f

Please sign in to comment.