Skip to content

Commit

Permalink
Improve logs, runtime stats collection and task pause tests (facebook…
Browse files Browse the repository at this point in the history
…incubator#9377)

Summary:
Only collect task pause time if it is not zero
Include reclaim duration in task reclamation log

Pull Request resolved: facebookincubator#9377

Reviewed By: tanjialiang

Differential Revision: D55786597

Pulled By: xiaoxmeng

fbshipit-source-id: e35aeff84a6abee602e8ccdb5546310e2f1f762f
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Apr 7, 2024
1 parent 84ae6bf commit e9cd7a6
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 16 deletions.
3 changes: 2 additions & 1 deletion velox/common/memory/SharedArbitrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ uint64_t SharedArbitrator::reclaim(
<< "Reclaimed from memory pool " << pool->name() << " with target of "
<< succinctBytes(targetBytes) << ", actually reclaimed "
<< succinctBytes(freedBytes) << " free memory and "
<< succinctBytes(reclaimedBytes - freedBytes) << " used memory";
<< succinctBytes(reclaimedBytes - freedBytes) << " used memory, spent "
<< succinctMicros(reclaimDurationUs);
return reclaimedBytes;
}

Expand Down
12 changes: 8 additions & 4 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -828,10 +828,14 @@ void Driver::closeOperators() {

void Driver::updateStats() {
DriverStats stats;
stats.runtimeStats[DriverStats::kTotalPauseTime] = RuntimeMetric(
1'000'000 * state_.totalPauseTimeMs, RuntimeCounter::Unit::kNanos);
stats.runtimeStats[DriverStats::kTotalOffThreadTime] = RuntimeMetric(
1'000'000 * state_.totalOffThreadTimeMs, RuntimeCounter::Unit::kNanos);
if (state_.totalPauseTimeMs > 0) {
stats.runtimeStats[DriverStats::kTotalPauseTime] = RuntimeMetric(
1'000'000 * state_.totalPauseTimeMs, RuntimeCounter::Unit::kNanos);
}
if (state_.totalOffThreadTimeMs > 0) {
stats.runtimeStats[DriverStats::kTotalOffThreadTime] = RuntimeMetric(
1'000'000 * state_.totalOffThreadTimeMs, RuntimeCounter::Unit::kNanos);
}
task()->addDriverStats(ctx_->pipelineId, std::move(stats));
}

Expand Down
12 changes: 6 additions & 6 deletions velox/exec/TaskStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ struct OperatorStats;

/// Stores execution stats per pipeline.
struct PipelineStats {
// Cumulative OperatorStats for finished Drivers. The subscript is the
// operator id, which is the initial ordinal position of the
// operator in the DriverFactory.
/// Cumulative OperatorStats for finished Drivers. The subscript is the
/// operator id, which is the initial ordinal position of the operator in the
/// DriverFactory.
std::vector<OperatorStats> operatorStats;

// Runtime statistics per driver.
/// Runtime statistics per driver.
std::vector<DriverStats> driverStats;

// True if contains the source node for the task.
/// True if contains the source node for the task.
bool inputPipeline;

// True if contains the sync node for the task.
/// True if contains the sync node for the task.
bool outputPipeline;

PipelineStats(bool _inputPipeline, bool _outputPipeline)
Expand Down
68 changes: 63 additions & 5 deletions velox/exec/tests/TaskTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/future/VeloxPromise.h"
#include "velox/common/memory/MemoryArbitrator.h"
#include "velox/common/memory/SharedArbitrator.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
Expand All @@ -29,6 +30,7 @@
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

using namespace facebook::velox;
using namespace facebook::velox::common::testutil;
Expand Down Expand Up @@ -1665,8 +1667,65 @@ DEBUG_ONLY_TEST_F(TaskTest, taskReclaimStats) {
// Fail the task to finish test.
task->requestAbort();
ASSERT_TRUE(waitForTaskAborted(task.get()));
task.reset();
waitForAllTasksToBeDeleted();
}

DEBUG_ONLY_TEST_F(TaskTest, taskPauseTime) {
auto rowType = ROW({"c0", "c1"}, {INTEGER(), DOUBLE()});
VectorFuzzer::Options opts;
opts.vectorSize = 32;
VectorFuzzer fuzzer(opts, pool_.get());
std::vector<RowVectorPtr> valueInputs;
for (int32_t i = 0; i < 4; ++i) {
valueInputs.push_back(fuzzer.fuzzRow(rowType));
}

const auto plan =
PlanBuilder()
.values(valueInputs)
.partitionedOutput({}, 1, std::vector<std::string>{"c0"})
.planFragment();

taskStats = task->taskStats();
std::atomic_bool taskPauseWaitFlag{true};
folly::EventCount taskPauseWait;
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Values::getOutput",
std::function<void(const exec::Values*)>([&](const exec::Values* values) {
if (taskPauseWaitFlag.exchange(false)) {
taskPauseWait.notifyAll();
}
// Inject some delay for task pause stats verification.
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // NOLINT
}));

auto queryPool = memory::memoryManager()->addRootPool(
"taskPauseTime", 1UL << 30, exec::MemoryReclaimer::create());
auto queryCtx = std::make_shared<core::QueryCtx>(
driverExecutor_.get(),
core::QueryConfig{{}},
std::unordered_map<std::string, std::shared_ptr<Config>>{},
nullptr,
std::move(queryPool),
nullptr);
auto task = Task::create("task", std::move(plan), 0, std::move(queryCtx));
task->start(4, 1);

// Wait for the task driver starts to run.
taskPauseWait.await([&]() { return !taskPauseWaitFlag.load(); });
// Pause the task
task->requestPause().wait();
// Inject some delay for task pause stats verification.
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
// Resume the task.
Task::resume(task);
// Inject some delay for task resume to run for a while.
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // NOLINT
// Fail the task to finish test.
task->requestAbort();
ASSERT_TRUE(waitForTaskAborted(task.get()));

auto taskStats = task->taskStats();
ASSERT_EQ(taskStats.pipelineStats.size(), 1);
ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 1);
const auto& driverStats = taskStats.pipelineStats[0].driverStats[0];
Expand Down Expand Up @@ -1702,16 +1761,15 @@ TEST_F(TaskTest, updateStatsWhileCloseOffThreadDriver) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
task->testingVisitDrivers(
[](Driver* driver) { VELOX_CHECK(!driver->isOnThread()); });
// Sleep a bit to make sure off thread time is not zero.
std::this_thread::sleep_for(std::chrono::milliseconds{2});
task->requestAbort();
ASSERT_TRUE(waitForTaskAborted(task.get()));
auto taskStats = task->taskStats();
ASSERT_EQ(taskStats.pipelineStats.size(), 1);
ASSERT_EQ(taskStats.pipelineStats[0].driverStats.size(), 4);
const auto& driverStats = taskStats.pipelineStats[0].driverStats[0];
const auto& totalPauseTime =
driverStats.runtimeStats.at(DriverStats::kTotalPauseTime);
ASSERT_EQ(totalPauseTime.count, 1);
ASSERT_GE(totalPauseTime.sum, 0);
ASSERT_EQ(driverStats.runtimeStats.count(DriverStats::kTotalPauseTime), 0);
const auto& totalOffThreadTime =
driverStats.runtimeStats.at(DriverStats::kTotalOffThreadTime);
ASSERT_EQ(totalOffThreadTime.count, 1);
Expand Down

0 comments on commit e9cd7a6

Please sign in to comment.