From 4f9f5bc342b8b703413f72b1232ca1ac48ac5c7e Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Thu, 26 Sep 2024 14:44:12 -0700 Subject: [PATCH] Add to track running tasks for velox runtime (#11102) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11102 Track running tasks for velox runtime for runtime task stats collection. Followup can add to periodic print out or report task stats from velox runtime stats reporter per query system needs. Reviewed By: oerling Differential Revision: D63444008 --- velox/exec/Task.cpp | 48 +++++++++++++- velox/exec/Task.h | 76 +++++++++------------- velox/exec/tests/TableScanTest.cpp | 7 +- velox/exec/tests/TaskTest.cpp | 68 ++++++++++++++++--- velox/exec/tests/utils/QueryAssertions.cpp | 46 ++++--------- velox/exec/tests/utils/QueryAssertions.h | 6 -- 6 files changed, 150 insertions(+), 101 deletions(-) diff --git a/velox/exec/Task.cpp b/velox/exec/Task.cpp index 1debe9540150..61b44d9789be 100644 --- a/velox/exec/Task.cpp +++ b/velox/exec/Task.cpp @@ -209,9 +209,6 @@ std::string taskStateString(TaskState state) { } } -std::atomic Task::numCreatedTasks_ = 0; -std::atomic Task::numDeletedTasks_ = 0; - bool registerTaskListener(std::shared_ptr listener) { return listeners().withWLock([&](auto& listeners) { for (const auto& existingListener : listeners) { @@ -277,6 +274,7 @@ std::shared_ptr Task::create( std::move(consumerSupplier), std::move(onError))); task->initTaskPool(); + task->addToTaskList(); return task; } @@ -310,6 +308,8 @@ Task::Task( } Task::~Task() { + removeFromTaskList(); + // TODO(spershin): Temporary code designed to reveal what causes SIGABRT in // jemalloc when destroying some Tasks. std::string clearStage; @@ -355,6 +355,48 @@ Task::~Task() { } } +Task::TaskList& Task::taskList() { + static TaskList taskList; + return taskList; +} + +folly::SharedMutex& Task::taskListLock() { + static folly::SharedMutex lock; + return lock; +} + +size_t Task::numRunningTasks() { + std::shared_lock guard{taskListLock()}; + return taskList().size(); +} + +std::vector> Task::getRunningTasks() { + std::vector> tasks; + std::shared_lock guard(taskListLock()); + tasks.reserve(taskList().size()); + for (auto taskEntry : taskList()) { + if (auto task = taskEntry.taskPtr.lock()) { + tasks.push_back(std::move(task)); + } + } + return tasks; +} + +void Task::addToTaskList() { + VELOX_CHECK(!taskListEntry_.listHook.is_linked()); + taskListEntry_.taskPtr = shared_from_this(); + + std::unique_lock guard{taskListLock()}; + taskList().push_back(taskListEntry_); +} + +void Task::removeFromTaskList() { + std::unique_lock guard{taskListLock()}; + if (taskListEntry_.listHook.is_linked()) { + taskListEntry_.listHook.unlink(); + } +} + uint64_t Task::timeSinceStartMsLocked() const { if (taskStats_.executionStartTimeMs == 0UL) { return 0UL; diff --git a/velox/exec/Task.h b/velox/exec/Task.h index 2f63fa4bf3d6..9d66c1e1548c 100644 --- a/velox/exec/Task.h +++ b/velox/exec/Task.h @@ -640,16 +640,6 @@ class Task : public std::enable_shared_from_this { return driverFactories_[driver->driverCtx()->pipelineId]->numDrivers; } - /// Returns the number of created and deleted tasks since the velox engine - /// starts running so far. - static uint64_t numCreatedTasks() { - return numCreatedTasks_; - } - - static uint64_t numDeletedTasks() { - return numDeletedTasks_; - } - const std::string& spillDirectory() const { return spillDirectory_; } @@ -677,6 +667,12 @@ class Task : public std::enable_shared_from_this { ++numThreads_; } + /// Returns the number of running tasks from velox runtime. + static size_t numRunningTasks(); + + /// Returns the list of running tasks from velox runtime. + static std::vector> getRunningTasks(); + /// Invoked to run provided 'callback' on each alive driver of the task. void testingVisitDrivers(const std::function& callback); @@ -686,6 +682,20 @@ class Task : public std::enable_shared_from_this { } private: + // Hook of system-wide running task list. + struct TaskListEntry { + std::weak_ptr taskPtr; + folly::IntrusiveListHook listHook; + }; + using TaskList = + folly::IntrusiveList; + + // Returns the system-wide running task list. + FOLLY_EXPORT static TaskList& taskList(); + + // Returns the lock that protects the system-wide running task list. + FOLLY_EXPORT static folly::SharedMutex& taskListLock(); + Task( const std::string& taskId, core::PlanFragment planFragment, @@ -695,6 +705,13 @@ class Task : public std::enable_shared_from_this { ConsumerSupplier consumerSupplier, std::function onError = nullptr); + // Invoked to add this to the system-wide running task list on task creation. + void addToTaskList(); + + // Invoked to remove this from the system-wide running task list on task + // destruction. + void removeFromTaskList(); + // Consistency check of the task execution to make sure the execution mode // stays the same. void checkExecutionMode(ExecutionMode mode); @@ -816,22 +833,6 @@ class Task : public std::enable_shared_from_this { std::weak_ptr task_; }; - // Counts the number of created tasks which is incremented on each task - // creation. - static std::atomic numCreatedTasks_; - - // Counts the number of deleted tasks which is incremented on each task - // destruction. - static std::atomic numDeletedTasks_; - - static void taskCreated() { - ++numCreatedTasks_; - } - - static void taskDeleted() { - ++numDeletedTasks_; - } - /// Returns true if state is 'running'. bool isRunningLocked() const; @@ -984,26 +985,6 @@ class Task : public std::enable_shared_from_this { // trace enabled. void maybeInitQueryTrace(); - // The helper class used to maintain 'numCreatedTasks_' and 'numDeletedTasks_' - // on task construction and destruction. - class TaskCounter { - public: - TaskCounter() { - Task::taskCreated(); - } - ~TaskCounter() { - Task::taskDeleted(); - } - }; - friend class Task::TaskCounter; - - // NOTE: keep 'taskCount_' the first member so that it will be the first - // constructed member and the last destructed one. The purpose is to make - // 'numCreatedTasks_' and 'numDeletedTasks_' counting more robust to the - // timing race condition when used in scenarios such as waiting for all the - // tasks to be destructed in test. - const TaskCounter taskCounter_; - // Universally unique identifier of the task. Used to identify the task when // calling TaskListener. const std::string uuid_; @@ -1020,6 +1001,9 @@ class Task : public std::enable_shared_from_this { // executed in a single mode throughout its lifetime const ExecutionMode mode_; + // Hook in the system wide task list. + TaskListEntry taskListEntry_; + // Root MemoryPool for this Task. All member variables that hold references // to pool_ must be defined after pool_, childPools_. std::shared_ptr pool_; diff --git a/velox/exec/tests/TableScanTest.cpp b/velox/exec/tests/TableScanTest.cpp index fd982240c2c6..db96a5089460 100644 --- a/velox/exec/tests/TableScanTest.cpp +++ b/velox/exec/tests/TableScanTest.cpp @@ -1539,7 +1539,7 @@ TEST_F(TableScanTest, preloadingSplitClose) { latch.count_down(); }); } - ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); + ASSERT_EQ(Task::numRunningTasks(), 0); auto task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2); auto stats = getTableScanRuntimeStats(task); @@ -1547,9 +1547,8 @@ TEST_F(TableScanTest, preloadingSplitClose) { ASSERT_GT(stats.at("preloadedSplits").sum, 1); task.reset(); - // Once all task references are cleared, the count of deleted tasks should - // promptly match the count of created tasks. - ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); + // Once all task references are cleared, all the tasks should be destroyed. + ASSERT_EQ(Task::numRunningTasks(), 0); // Clean blocking items in the IO thread pool. for (auto& baton : batons) { baton.post(); diff --git a/velox/exec/tests/TaskTest.cpp b/velox/exec/tests/TaskTest.cpp index 307d168a3f1a..8b8e2efe7bd8 100644 --- a/velox/exec/tests/TaskTest.cpp +++ b/velox/exec/tests/TaskTest.cpp @@ -466,8 +466,9 @@ class TaskTest : public HiveConnectorTestBase { core::PlanFragment plan, const std::unordered_map>& filePaths = {}) { + static std::atomic_uint64_t taskId{0}; auto task = Task::create( - "single.execution.task.0", + fmt::format("single.execution.task.{}", taskId++), plan, 0, core::QueryCtx::create(), @@ -743,15 +744,11 @@ TEST_F(TaskTest, serialExecution) { makeFlatVector(100, [](auto row) { return row + 5; }), }); - uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); { auto [task, results] = executeSerial(plan); assertEqualResults( std::vector{expectedResult, expectedResult}, results); } - ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); - ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks()); // Project + Aggregation. plan = PlanBuilder() @@ -776,14 +773,10 @@ TEST_F(TaskTest, serialExecution) { 995 / 2.0 + 4}), }); - ++numCreatedTasks; - ++numDeletedTasks; { auto [task, results] = executeSerial(plan); assertEqualResults({expectedResult}, results); } - ASSERT_EQ(numCreatedTasks + 1, Task::numCreatedTasks()); - ASSERT_EQ(numDeletedTasks + 1, Task::numDeletedTasks()); // Project + Aggregation over TableScan. auto filePath = TempFilePath::create(); @@ -808,6 +801,63 @@ TEST_F(TaskTest, serialExecution) { VELOX_ASSERT_THROW(executeSerial(plan), "division by zero"); } +TEST_F(TaskTest, runningTaskList) { + const auto data = makeRowVector({ + makeFlatVector(1'000, [](auto row) { return row; }), + }); + + ASSERT_EQ(Task::numRunningTasks(), 0); + ASSERT_TRUE(Task::getRunningTasks().empty()); + + // Filter + Project. + const auto plan = PlanBuilder() + .values({data, data}) + .filter("c0 < 100") + .project({"c0 + 5"}) + .planFragment(); + + std::vector> expectedRunningTasks; + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 1); + ASSERT_EQ(Task::getRunningTasks().size(), 1); + + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 2); + ASSERT_EQ(Task::getRunningTasks().size(), 2); + + expectedRunningTasks.push_back(executeSerial(plan).first); + ASSERT_EQ(Task::numRunningTasks(), 3); + ASSERT_EQ(Task::getRunningTasks().size(), 3); + + std::set expectedTaskIdSet; + for (const auto& task : expectedRunningTasks) { + expectedTaskIdSet.insert(task->taskId()); + } + ASSERT_EQ(expectedTaskIdSet.size(), 3); + std::vector> runningTasks = Task::getRunningTasks(); + ASSERT_EQ(runningTasks.size(), 3); + for (const auto& task : runningTasks) { + ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1); + } + + expectedTaskIdSet.erase(expectedRunningTasks.back()->taskId()); + expectedRunningTasks.pop_back(); + ASSERT_EQ(expectedTaskIdSet.size(), 2); + + runningTasks.clear(); + runningTasks = Task::getRunningTasks(); + ASSERT_EQ(runningTasks.size(), 2); + for (const auto& task : runningTasks) { + ASSERT_EQ(expectedTaskIdSet.count(task->taskId()), 1); + } + + runningTasks.clear(); + expectedRunningTasks.clear(); + + ASSERT_EQ(Task::numRunningTasks(), 0); + ASSERT_TRUE(Task::getRunningTasks().empty()); +} + TEST_F(TaskTest, serialHashJoin) { auto left = makeRowVector( {"t_c0", "t_c1"}, diff --git a/velox/exec/tests/utils/QueryAssertions.cpp b/velox/exec/tests/utils/QueryAssertions.cpp index 50e7a668a94e..f7944e6a9dea 100644 --- a/velox/exec/tests/utils/QueryAssertions.cpp +++ b/velox/exec/tests/utils/QueryAssertions.cpp @@ -1474,48 +1474,28 @@ bool waitForTaskStateChange( } void waitForAllTasksToBeDeleted(uint64_t maxWaitUs) { - const uint64_t numCreatedTasks = Task::numCreatedTasks(); - uint64_t numDeletedTasks = Task::numDeletedTasks(); uint64_t waitUs = 0; - while (numCreatedTasks > numDeletedTasks) { + while (Task::numRunningTasks() != 0) { constexpr uint64_t kWaitInternalUs = 1'000; std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); if (waitUs >= maxWaitUs) { break; } } - VELOX_CHECK_EQ( - numDeletedTasks, - numCreatedTasks, - "{} tasks have been created while only {} have been deleted after waiting for {} us", - numCreatedTasks, - numDeletedTasks, - waitUs); -} - -void waitForAllTasksToBeDeleted( - uint64_t expectedDeletedTasks, - uint64_t maxWaitUs) { - uint64_t numDeletedTasks = Task::numDeletedTasks(); - uint64_t waitUs = 0; - while (expectedDeletedTasks > numDeletedTasks) { - constexpr uint64_t kWaitInternalUs = 1'000; - std::this_thread::sleep_for(std::chrono::microseconds(kWaitInternalUs)); - waitUs += kWaitInternalUs; - numDeletedTasks = Task::numDeletedTasks(); - if (waitUs >= maxWaitUs) { - break; - } + std::vector> pendingTasks = Task::getRunningTasks(); + if (pendingTasks.empty()) { + return; } - VELOX_CHECK_EQ( - numDeletedTasks, - expectedDeletedTasks, - "expected {} tasks to be deleted but only {} have been deleted after waiting for {} us", - expectedDeletedTasks, - numDeletedTasks, - waitUs); + std::vector pendingTaskStats; + pendingTaskStats.reserve(pendingTasks.size()); + for (const auto& task : pendingTasks) { + pendingTaskStats.push_back(task->toString()); + } + VELOX_FAIL( + "{} pending tasks\n{}", + pendingTasks.size(), + folly::join("\n", pendingTaskStats)); } std::shared_ptr assertQuery( diff --git a/velox/exec/tests/utils/QueryAssertions.h b/velox/exec/tests/utils/QueryAssertions.h index 217d12351e32..d1bbe253a2ff 100644 --- a/velox/exec/tests/utils/QueryAssertions.h +++ b/velox/exec/tests/utils/QueryAssertions.h @@ -213,12 +213,6 @@ bool waitForTaskStateChange( /// during this wait call. This is for testing purpose for now. void waitForAllTasksToBeDeleted(uint64_t maxWaitUs = 3'000'000); -/// Similar to above test utility except waiting for a specific number of -/// tasks to be deleted. -void waitForAllTasksToBeDeleted( - uint64_t expectedDeletedTasks, - uint64_t maxWaitUs); - std::shared_ptr assertQuery( const core::PlanNodePtr& plan, const std::string& duckDbSql,