diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/common/memory/SharedArbitrator.cpp index 886680680979..7925aa04398c 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/common/memory/SharedArbitrator.cpp @@ -566,7 +566,8 @@ uint64_t SharedArbitrator::reclaim( << "Reclaimed from memory pool " << pool->name() << " with target of " << succinctBytes(targetBytes) << ", actually reclaimed " << succinctBytes(freedBytes) << " free memory and " - << succinctBytes(reclaimedBytes - freedBytes) << " used memory"; + << succinctBytes(reclaimedBytes - freedBytes) << " used memory, spent " + << succinctMicros(reclaimDurationUs); return reclaimedBytes; } diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index 9c8899093dbd..10e2cd81aff5 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -828,10 +828,14 @@ void Driver::closeOperators() { void Driver::updateStats() { DriverStats stats; - stats.runtimeStats[DriverStats::kTotalPauseTime] = RuntimeMetric( - 1'000'000 * state_.totalPauseTimeMs, RuntimeCounter::Unit::kNanos); - stats.runtimeStats[DriverStats::kTotalOffThreadTime] = RuntimeMetric( - 1'000'000 * state_.totalOffThreadTimeMs, RuntimeCounter::Unit::kNanos); + if (state_.totalPauseTimeMs > 0) { + stats.runtimeStats[DriverStats::kTotalPauseTime] = RuntimeMetric( + 1'000'000 * state_.totalPauseTimeMs, RuntimeCounter::Unit::kNanos); + } + if (state_.totalOffThreadTimeMs > 0) { + stats.runtimeStats[DriverStats::kTotalOffThreadTime] = RuntimeMetric( + 1'000'000 * state_.totalOffThreadTimeMs, RuntimeCounter::Unit::kNanos); + } task()->addDriverStats(ctx_->pipelineId, std::move(stats)); } diff --git a/velox/exec/TaskStats.h b/velox/exec/TaskStats.h index 9603909e3ea1..acf248f6f30b 100644 --- a/velox/exec/TaskStats.h +++ b/velox/exec/TaskStats.h @@ -29,18 +29,18 @@ struct OperatorStats; /// Stores execution stats per pipeline. struct PipelineStats { - // Cumulative OperatorStats for finished Drivers. The subscript is the - // operator id, which is the initial ordinal position of the - // operator in the DriverFactory. + /// Cumulative OperatorStats for finished Drivers. The subscript is the + /// operator id, which is the initial ordinal position of the operator in the + /// DriverFactory. std::vector operatorStats; - // Runtime statistics per driver. + /// Runtime statistics per driver. std::vector driverStats; - // True if contains the source node for the task. + /// True if contains the source node for the task. bool inputPipeline; - // True if contains the sync node for the task. + /// True if contains the sync node for the task. bool outputPipeline; PipelineStats(bool _inputPipeline, bool _outputPipeline) diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 8da0552ff8cb..e2fb897b6b2d 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -18,6 +18,7 @@ #include "folly/experimental/EventCount.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/future/VeloxPromise.h" +#include "velox/common/memory/MemoryArbitrator.h" #include "velox/common/memory/SharedArbitrator.h" #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConnectorSplit.h" @@ -29,6 +30,7 @@ #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/QueryAssertions.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" +#include "velox/vector/fuzzer/VectorFuzzer.h" using namespace facebook::velox; using namespace facebook::velox::common::testutil; @@ -1665,8 +1667,65 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) { // Fail the task to finish test. task->requestAbort(); ASSERT_TRUE(waitForTaskAborted(task.get())); + task.reset(); + waitForAllTasksToBeDeleted(); +} + +DEBUG_ONLY_TEST_F(TaskTest, taskPauseTime) { + auto rowType = ROW({"c0", "c1"}, {INTEGER(), DOUBLE()}); + VectorFuzzer::Options opts; + opts.vectorSize = 32; + VectorFuzzer fuzzer(opts, pool_.get()); + std::vector valueInputs; + for (int32_t i = 0; i < 4; ++i) { + valueInputs.push_back(fuzzer.fuzzRow(rowType)); + } + + const auto plan = + PlanBuilder() + .values(valueInputs) + .partitionedOutput({}, 1, std::vector{"c0"}) + .planFragment(); - taskStats = task->taskStats(); + std::atomic_bool taskPauseWaitFlag{true}; + folly::EventCount taskPauseWait; + SCOPED_TESTVALUE_SET( + "facebook::velox::exec::Values::getOutput", + std::function([&](const exec::Values* values) { + if (taskPauseWaitFlag.exchange(false)) { + taskPauseWait.notifyAll(); + } + // Inject some delay for task pause stats verification. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); // NOLINT + })); + + auto queryPool = memory::memoryManager()->addRootPool( + "taskPauseTime", 1UL << 30, exec::MemoryReclaimer::create()); + auto queryCtx = std::make_shared( + driverExecutor_.get(), + core::QueryConfig{{}}, + std::unordered_map>{}, + nullptr, + std::move(queryPool), + nullptr); + auto task = Task::create("task", std::move(plan), 0, std::move(queryCtx)); + task->start(4, 1); + + // Wait for the task driver starts to run. + taskPauseWait.await([&]() { return !taskPauseWaitFlag.load(); }); + // Pause the task + task->requestPause().wait(); + // Inject some delay for task pause stats verification. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + // Resume the task. + Task::resume(task); + // Inject some delay for task resume to run for a while. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT + // Fail the task to finish test. + task->requestAbort(); + ASSERT_TRUE(waitForTaskAborted(task.get())); + + auto taskStats = task->taskStats(); ASSERT_EQ(taskStats.pipelineStats.size(), 1); ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 1); const auto& driverStats = taskStats.pipelineStats[0].driverStats[0]; @@ -1702,16 +1761,15 @@ TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); task->testingVisitDrivers( [](Driver* driver) { VELOX_CHECK(!driver->isOnThread()); }); + // Sleep a bit to make sure off thread time is not zero. + std::this_thread::sleep_for(std::chrono::milliseconds{2}); task->requestAbort(); ASSERT_TRUE(waitForTaskAborted(task.get())); auto taskStats = task->taskStats(); ASSERT_EQ(taskStats.pipelineStats.size(), 1); ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 4); const auto& driverStats = taskStats.pipelineStats[0].driverStats[0]; - const auto& totalPauseTime = - driverStats.runtimeStats.at(DriverStats::kTotalPauseTime); - ASSERT_EQ(totalPauseTime.count, 1); - ASSERT_GE(totalPauseTime.sum, 0); + ASSERT_EQ(driverStats.runtimeStats.count(DriverStats::kTotalPauseTime), 0); const auto& totalOffThreadTime = driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime); ASSERT_EQ(totalOffThreadTime.count, 1);