Skip to content

Commit

Permalink
Put table scan in suspensions state when get output (facebookincubato…
Browse files Browse the repository at this point in the history
…r#9477)

Summary:
When memory arbitrator reclaims from a query, the query might get stuck in a slow table scan
operation as remote storage access can be very slow. This can cause long tail in memory arbitration.
To optimize this, we could avoid waiting table scan operator to stop during the memory arbitration
as we anyway can't reclaim memory from a table scan operator.
This PR adds a suspended section at the start of get output processing, and continue table scan
process next split if the task has been paused instead of blocking the driver thread in a on-thread
sleep loop. To support this, we add recursive suspension request support as the table scan operator
itself might trigger memory arbitration and we need to make sure the driver suspension state and
the task running threads are handled properly. We should decrement the number of running driver
threads in the task on the first arbitration request and correspondingly, we shall increment or get the
driver out of the suspension state on the last suspension request.
This PR also refactor the yield logic a bit in table scan get output to ease maintenance.
Also add per-driver yield check in split processing loop. Unit test added to verify.


Reviewed By: Yuhta

Differential Revision: D56092234

Pulled By: xiaoxmeng
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Apr 16, 2024
1 parent 1bad4cf commit 22b998b
Show file tree
Hide file tree
Showing 25 changed files with 398 additions and 116 deletions.
2 changes: 1 addition & 1 deletion velox/common/file/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ target_link_libraries(
PUBLIC velox_exception Folly::folly
PRIVATE velox_common_base fmt::fmt glog::glog)

if(${VELOX_BUILD_TESTING})
if(${VELOX_BUILD_TESTING} OR ${VELOX_BUILD_TEST_UTILS})
add_subdirectory(tests)
endif()
13 changes: 13 additions & 0 deletions velox/common/file/tests/FaultyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@

namespace facebook::velox::tests::utils {

std::string FaultFileOperation::typeString(Type type) {
switch (type) {
case Type::kReadv:
return "READV";
case Type::kRead:
return "READ";
default:
VELOX_UNSUPPORTED(
"Unknown file operation type: {}", static_cast<int>(type));
break;
}
}

FaultyReadFile::FaultyReadFile(
const std::string& path,
std::shared_ptr<ReadFile> delegatedFile,
Expand Down
7 changes: 7 additions & 0 deletions velox/common/file/tests/FaultyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ struct FaultFileOperation {
/// TODO: add to support fault injections for the other file operation
/// types.
};
static std::string typeString(Type type);

const Type type;

Expand All @@ -45,6 +46,12 @@ struct FaultFileOperation {
: type(_type), path(_path) {}
};

FOLLY_ALWAYS_INLINE std::ostream& operator<<(
std::ostream& o,
const FaultFileOperation::Type& type) {
return o << FaultFileOperation::typeString(type);
}

/// Fault injection parameters for file read API.
struct FaultFileReadOperation : FaultFileOperation {
const uint64_t offset;
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class FakeMemoryOperator : public Operator {
override {
VELOX_CHECK(canReclaim());
auto* driver = operatorCtx_->driver();
VELOX_CHECK(!driver->state().isOnThread() || driver->state().isSuspended);
VELOX_CHECK(!driver->state().isOnThread() || driver->state().suspended());
VELOX_CHECK(driver->task()->pauseRequested());
VELOX_CHECK_GT(targetBytes, 0);

Expand Down
18 changes: 10 additions & 8 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ const std::string& getColumnName(const common::Subfield& subfield) {
VELOX_CHECK_GT(subfield.path().size(), 0);
auto* field = dynamic_cast<const common::Subfield::NestedField*>(
subfield.path()[0].get());
VELOX_CHECK(field);
VELOX_CHECK_NOT_NULL(field);
return field->name();
}

Expand All @@ -273,7 +273,7 @@ void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type) {

} break;
case TypeKind::ROW: {
for (auto& outputName : type->asRow().names()) {
for (const auto& outputName : type->asRow().names()) {
VELOX_CHECK(
!std::any_of(outputName.begin(), outputName.end(), isupper));
}
Expand All @@ -290,15 +290,15 @@ void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
for (auto& pair : filters) {
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
for (const auto& filterIt : filters) {
const auto name = filterIt.first.toString();
if (isSynthesizedColumn(name, infoColumns)) {
continue;
}
auto& path = pair.first.path();
const auto& path = filterIt.first.path();

for (int i = 0; i < path.size(); ++i) {
auto nestedField =
auto* nestedField =
dynamic_cast<const common::Subfield::NestedField*>(path[i].get());
if (nestedField == nullptr) {
continue;
Expand Down Expand Up @@ -727,7 +727,7 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
SubfieldFilters& filters,
double& sampleRate) {
auto* call = dynamic_cast<const core::CallTypedExpr*>(expr.get());
if (!call) {
if (call == nullptr) {
return expr;
}
common::Filter* oldFilter = nullptr;
Expand All @@ -749,11 +749,13 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
LOG(WARNING) << "Merging with " << oldFilter->toString();
}
}

if (isNotExpr(expr, call, evaluator)) {
auto inner = extractFiltersFromRemainingFilter(
call->inputs()[0], evaluator, !negated, filters, sampleRate);
return inner ? replaceInputs(call, {inner}) : nullptr;
}

if ((call->name() == "and" && !negated) ||
(call->name() == "or" && negated)) {
auto lhs = extractFiltersFromRemainingFilter(
Expand Down
13 changes: 6 additions & 7 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ HiveDataSource::HiveDataSource(
// Column handled keyed on the column alias, the name used in the query.
for (const auto& [canonicalizedName, columnHandle] : columnHandles) {
auto handle = std::dynamic_pointer_cast<HiveColumnHandle>(columnHandle);
VELOX_CHECK(
handle != nullptr,
VELOX_CHECK_NOT_NULL(
handle,
"ColumnHandle must be an instance of HiveColumnHandle for {}",
canonicalizedName);

Expand All @@ -67,7 +67,7 @@ HiveDataSource::HiveDataSource(
auto readerRowTypes = outputType_->children();
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
subfields;
for (auto& outputName : outputType_->names()) {
for (const auto& outputName : outputType_->names()) {
auto it = columnHandles.find(outputName);
VELOX_CHECK(
it != columnHandles.end(),
Expand All @@ -86,9 +86,8 @@ HiveDataSource::HiveDataSource(
}

hiveTableHandle_ = std::dynamic_pointer_cast<HiveTableHandle>(tableHandle);
VELOX_CHECK(
hiveTableHandle_ != nullptr,
"TableHandle must be an instance of HiveTableHandle");
VELOX_CHECK_NOT_NULL(
hiveTableHandle_, "TableHandle must be an instance of HiveTableHandle");
if (hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties())) {
checkColumnNameLowerCase(outputType_);
Expand All @@ -97,7 +96,7 @@ HiveDataSource::HiveDataSource(
}

SubfieldFilters filters;
for (auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
for (const auto& [k, v] : hiveTableHandle_->subfieldFilters()) {
filters.emplace(k.clone(), v->clone());
}
double sampleRate = 1;
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class HiveDataSource : public DataSource {

// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;
core::ExpressionEvaluator* const expressionEvaluator_;

// Column handles for the Split info columns keyed on their column names.
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
Expand All @@ -149,7 +150,6 @@ class HiveDataSource : public DataSource {
RowVectorPtr emptyOutput_;
dwio::common::RuntimeStatistics runtimeStats_;
std::atomic<uint64_t> totalRemainingFilterTime_{0};
core::ExpressionEvaluator* expressionEvaluator_;
uint64_t completedRows_ = 0;

// Reusable memory for remaining filter evaluation.
Expand Down
4 changes: 2 additions & 2 deletions velox/connectors/hive/TableHandle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ std::string HiveColumnHandle::toString() const {
columnTypeName(columnType_),
dataType_->toString());
out << " requiredSubfields: [";
for (const auto& s : requiredSubfields_) {
out << " " << s.toString();
for (const auto& subfield : requiredSubfields_) {
out << " " << subfield.toString();
}
out << " ]]";
return out.str();
Expand Down
28 changes: 14 additions & 14 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,20 @@ class HiveColumnHandle : public ColumnHandle {
return hiveType_;
}

// Applies to columns of complex types: arrays, maps and structs. When a
// query uses only some of the subfields, the engine provides the complete
// list of required subfields and the connector is free to prune the rest.
//
// Examples:
// - SELECT a[1], b['x'], x.y FROM t
// - SELECT a FROM t WHERE b['y'] > 10
//
// Pruning a struct means populating some of the members with null values.
//
// Pruning a map means dropping keys not listed in the required subfields.
//
// Pruning arrays means dropping values with indices larger than maximum
// required index.
/// Applies to columns of complex types: arrays, maps and structs. When a
/// query uses only some of the subfields, the engine provides the complete
/// list of required subfields and the connector is free to prune the rest.
///
/// Examples:
/// - SELECT a[1], b['x'], x.y FROM t
/// - SELECT a FROM t WHERE b['y'] > 10
///
/// Pruning a struct means populating some of the members with null values.
///
/// Pruning a map means dropping keys not listed in the required subfields.
///
/// Pruning arrays means dropping values with indices larger than maximum
/// required index.
const std::vector<common::Subfield>& requiredSubfields() const {
return requiredSubfields_;
}
Expand Down
7 changes: 5 additions & 2 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void BlockingState::setResume(std::shared_ptr<BlockingState> state) {
state->operator_->recordBlockingTime(
state->sinceMicros_, state->reason_);
}
VELOX_CHECK(!driver->state().isSuspended);
VELOX_CHECK(!driver->state().suspended());
VELOX_CHECK(driver->state().hasBlockingFuture);
driver->state().hasBlockingFuture = false;
if (task->pauseRequested()) {
Expand Down Expand Up @@ -1007,7 +1007,10 @@ SuspendedSection::SuspendedSection(Driver* driver) : driver_(driver) {

SuspendedSection::~SuspendedSection() {
if (driver_->task()->leaveSuspended(driver_->state()) != StopReason::kNone) {
VELOX_FAIL("Terminate detected when leaving suspended section");
LOG(WARNING)
<< "Terminate detected when leaving suspended section for driver "
<< driver_->driverCtx()->driverId << " from task "
<< driver_->task()->taskId();
}
}

Expand Down
15 changes: 10 additions & 5 deletions velox/exec/Driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,11 @@ struct ThreadState {
/// True if there is a future outstanding that will schedule this on an
/// executor thread when some promise is realized.
bool hasBlockingFuture{false};
/// True if on thread but in a section waiting for RPC or memory strategy
/// decision. The thread is not supposed to access its memory, which a third
/// party can revoke while the thread is in this state.
bool isSuspended{false};
/// The number of suspension requests on a on-thread driver. If > 0, this
/// driver thread is in a (recursive) section waiting for RPC or memory
/// strategy decision. The thread is not supposed to access its memory, which
/// a third party can revoke while the thread is in this state.
uint32_t numSuspensions{0};
/// The start execution time on thread in milliseconds. It is reset when the
/// driver goes off thread. This is used to track the time that a driver has
/// continuously run on a thread for per-driver cpu time slice enforcement.
Expand Down Expand Up @@ -161,14 +162,18 @@ struct ThreadState {
return getCurrentTimeMs() - startExecTimeMs;
}

bool suspended() const {
return numSuspensions > 0;
}

folly::dynamic toJson() const {
folly::dynamic obj = folly::dynamic::object;
obj["onThread"] = std::to_string(isOnThread());
obj["tid"] = tid.load();
obj["isTerminated"] = isTerminated.load();
obj["isEnqueued"] = isEnqueued.load();
obj["hasBlockingFuture"] = hasBlockingFuture;
obj["isSuspended"] = isSuspended;
obj["isSuspended"] = suspended();
obj["startExecTime"] = startExecTimeMs;
return obj;
}
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/MemoryReclaimer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void memoryArbitrationStateCheck(memory::MemoryPool& pool) {
const auto* driverThreadCtx = driverThreadContext();
if (driverThreadCtx != nullptr) {
Driver* driver = driverThreadCtx->driverCtx.driver;
if (!driver->state().isSuspended) {
if (!driver->state().suspended()) {
VELOX_FAIL(
"Driver thread is not suspended under memory arbitration processing: {}, request memory pool: {}",
driver->toString(),
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,11 @@ uint64_t Operator::MemoryReclaimer::reclaim(
}
VELOX_CHECK_EQ(pool->name(), op_->pool()->name());
VELOX_CHECK(
!driver->state().isOnThread() || driver->state().isSuspended ||
!driver->state().isOnThread() || driver->state().suspended() ||
driver->state().isTerminated,
"driverOnThread {}, driverSuspended {} driverTerminated {} {}",
driver->state().isOnThread(),
driver->state().isSuspended,
driver->state().suspended(),
driver->state().isTerminated,
pool->name());
VELOX_CHECK(driver->task()->pauseRequested());
Expand Down Expand Up @@ -670,10 +670,10 @@ void Operator::MemoryReclaimer::abort(
}
VELOX_CHECK_EQ(pool->name(), op_->pool()->name());
VELOX_CHECK(
!driver->state().isOnThread() || driver->state().isSuspended ||
!driver->state().isOnThread() || driver->state().suspended() ||
driver->state().isTerminated);
VELOX_CHECK(driver->task()->isCancelled());
if (driver->state().isOnThread() && driver->state().isSuspended) {
if (driver->state().isOnThread() && driver->state().suspended()) {
// We can't abort an operator if it is running on a driver thread and
// suspended for memory arbitration. Otherwise, it might cause random crash
// when the driver thread throws after detects the aborted query.
Expand Down
31 changes: 26 additions & 5 deletions velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,28 @@ folly::dynamic TableScan::toJson() const {
return ret;
}

bool TableScan::shouldYield(StopReason taskStopReason, size_t startTimeMs)
const {
// Checks task-level yield signal, driver-level yield signal and table scan
// output processing time limit.
//
// NOTE: if the task is being paused, then we shall continue execution as we
// won't yield the driver thread but simply spinning (with on-thread time
// sleep) until the task has been resumed.
return (taskStopReason == StopReason::kYield ||
driverCtx_->driver->shouldYield() ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) &&
!driverCtx_->task->pauseRequested();
}

bool TableScan::shouldStop(StopReason taskStopReason) const {
return taskStopReason != StopReason::kNone &&
taskStopReason != StopReason::kYield;
}

RowVectorPtr TableScan::getOutput() {
SuspendedSection suspendedSection(driverCtx_->driver);
auto exitCurStatusGuard = folly::makeGuard([this]() { curStatus_ = ""; });

if (noMoreSplits_) {
Expand All @@ -72,9 +93,9 @@ RowVectorPtr TableScan::getOutput() {
// w/o producing a result. In this case we return with the Yield blocking
// reason and an already fulfilled future.
curStatus_ = "getOutput: task->shouldStop";
if ((driverCtx_->task->shouldStop() != StopReason::kNone) ||
((getOutputTimeLimitMs_ != 0) &&
(getCurrentTimeMs() - startTimeMs) >= getOutputTimeLimitMs_)) {
const StopReason taskStopReason = driverCtx_->task->shouldStop();
if (shouldStop(taskStopReason) ||
shouldYield(taskStopReason, startTimeMs)) {
blockingReason_ = BlockingReason::kYield;
blockingFuture_ = ContinueFuture{folly::Unit{}};
// A point for test code injection.
Expand Down Expand Up @@ -137,7 +158,7 @@ RowVectorPtr TableScan::getOutput() {
connectorSplit->connectorId,
"Got splits with different connector IDs");

if (!dataSource_) {
if (dataSource_ == nullptr) {
curStatus_ = "getOutput: creating dataSource_";
connectorQueryCtx_ = operatorCtx_->createConnectorQueryCtx(
connectorSplit->connectorId, planNodeId(), connectorPool_);
Expand All @@ -162,7 +183,7 @@ RowVectorPtr TableScan::getOutput() {
},
&debugString_});

if (connectorSplit->dataSource) {
if (connectorSplit->dataSource != nullptr) {
curStatus_ = "getOutput: preloaded split";
++numPreloadedSplits_;
// The AsyncSource returns a unique_ptr to a shared_ptr. The unique_ptr
Expand Down
8 changes: 8 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ class TableScan : public SourceOperator {
}

private:
// Checks if this table scan operator needs to yield before processing the
// next split.
bool shouldYield(StopReason taskStopReason, size_t startTimeMs) const;

// Checks if this table scan operator needs to stop because the task has been
// terminated.
bool shouldStop(StopReason taskStopReason) const;

// Sets 'maxPreloadSplits' and 'splitPreloader' if prefetching splits is
// appropriate. The preloader will be applied to the 'first 'maxPreloadSplits'
// of the Task's split queue for 'this' when getting splits.
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/TableWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ uint64_t TableWriter::ConnectorReclaimer::reclaim(
return 0;
}
VELOX_CHECK(
!driver->state().isOnThread() || driver->state().isSuspended ||
!driver->state().isOnThread() || driver->state().suspended() ||
driver->state().isTerminated);
VELOX_CHECK(driver->task()->pauseRequested());

Expand Down
Loading

0 comments on commit 22b998b

Please sign in to comment.