Skip to content

Commit

Permalink
Put table scan in suspensions state when get output
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Apr 14, 2024
1 parent 2b2ef52 commit efa5d67
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 7 deletions.
7 changes: 5 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ RowVectorPtr Driver::next(std::shared_ptr<BlockingState>& blockingState) {
// error.
VELOX_CHECK(
stop == StopReason::kBlock || stop == StopReason::kAtEnd ||
stop == StopReason::kAlreadyTerminated || stop == StopReason::kTerminate);
stop == StopReason::kAlreadyTerminated);

return result;
}
Expand Down Expand Up @@ -1007,7 +1007,10 @@ SuspendedSection::SuspendedSection(Driver* driver) : driver_(driver) {

SuspendedSection::~SuspendedSection() {
if (driver_->task()->leaveSuspended(driver_->state()) != StopReason::kNone) {
VELOX_FAIL("Terminate detected when leaving suspended section");
LOG(WARNING)
<< "Terminate detected when leaving suspended section for driver "
<< driver_->driverCtx()->driverId << " from task "
<< driver_->task()->taskId();
}
}

Expand Down
29 changes: 24 additions & 5 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,28 @@ folly::dynamic TableScan::toJson() const {
return ret;
}

bool TableScan::shouldYield(size_t startTimeMs) const {
// Checks task-level yield signal, driver-level yield signal and table scan
// output processing time limit.
//
// NOTE: if the task is being paused, then we shall continue execution as we
// won't yield the driver thread but simply spinning (with on-thread time
// sleep) until the task has been resumed.
return (driverCtx_->task->shouldStop() == StopReason::kYield ||
driverCtx_->driver->shouldYield() ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) &&
!driverCtx_->task->pauseRequested();
}

bool TableScan::shouldStop() const {
const auto blockingReason = driverCtx_->task->shouldStop();
return blockingReason != StopReason::kNone &&
blockingReason != StopReason::kYield;
}

RowVectorPtr TableScan::getOutput() {
SuspendedSection suspendedSection(driverCtx_->driver);
auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; });

if (noMoreSplits_) {
Expand All @@ -72,9 +93,7 @@ RowVectorPtr TableScan::getOutput() {
// w/o producing a result. In this case we return with the Yield blocking
// reason and an already fulfilled future.
curStatus_ = "getOutput: task->shouldStop";
if ((driverCtx_->task->shouldStop() != StopReason::kNone) ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) {
if (shouldYield(startTimeMs) || shouldStop()) {
blockingReason_ = BlockingReason::kYield;
blockingFuture_ = ContinueFuture{folly::Unit{}};
// A point for test code injection.
Expand Down Expand Up @@ -137,7 +156,7 @@ RowVectorPtr TableScan::getOutput() {
connectorSplit->connectorId,
"Got splits with different connector IDs");

if (!dataSource_) {
if (dataSource_ == nullptr) {
curStatus_ = "getOutput: creating dataSource_";
connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx(
connectorSplit->connectorId, planNodeId(), connectorPool_);
Expand All @@ -162,7 +181,7 @@ RowVectorPtr TableScan::getOutput() {
},
&debugString_});

if (connectorSplit->dataSource) {
if (connectorSplit->dataSource != nullptr) {
curStatus_ = "getOutput: preloaded split";
++numPreloadedSplits_;
// The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class TableScan : public SourceOperator {
}

private:
// Checks if this table scan operator needs to yield before processing the
// next split.
bool shouldYield(size_t startTimeMs) const;

// Checks if this table scan operator needs to stop because the task has been
// terminated.
bool shouldStop() const;

// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is
// appropriate. The preloader will be applied to the 'first 'maxPreloadSplits'
// of the Task's split queue for 'this' when getting splits.
Expand Down

0 comments on commit efa5d67

Please sign in to comment.