Skip to content

Commit

Permalink
[BugFix] Capture resource group for scan task (#51121)
Browse files Browse the repository at this point in the history
Signed-off-by: zihe.liu <[email protected]>
(cherry picked from commit 3317f49)
  • Loading branch information
ZiheLiu authored and mergify[bot] committed Sep 20, 2024
1 parent c2b00bc commit b5bdb0d
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 21 deletions.
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/exchange/mem_limited_chunk_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ Status MemLimitedChunkQueue::_submit_flush_task() {
}
};

auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup().get(), std::move(flush_task));
auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(flush_task));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
return Status::OK();
}
Expand Down Expand Up @@ -551,7 +551,7 @@ Status MemLimitedChunkQueue::_submit_load_task(Block* block) {
_update_io_task_status(status);
}
};
auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup().get(), std::move(load_task));
auto io_task = workgroup::ScanTask(_state->fragment_ctx()->workgroup(), std::move(load_task));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ Status SpillableHashJoinProbeOperator::_load_all_partition_build_side(RuntimeSta
}
};
auto yield_func = [&](workgroup::ScanTask&& task) { spill::IOTaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_join_builder->spiller()->options().wg.get(), std::move(task),
std::move(yield_func));
auto io_task =
workgroup::ScanTask(_join_builder->spiller()->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(spill::IOTaskExecutor::submit(std::move(io_task)));
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
int32_t driver_id = CurrentThread::current().get_driver_id();

workgroup::ScanTask task;
task.workgroup = _workgroup.get();
task.workgroup = _workgroup;
// TODO: consider more factors, such as scan bytes and i/o time.
task.priority = OlapScanNode::compute_priority(_submit_task_counter->value());
task.task_group = down_cast<const ScanOperatorFactory*>(_factory)->scan_task_group();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/spill/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ struct IOTaskExecutor {
auto io_ctx = std::any_cast<SpillIOTaskContextPtr>(task_ctx.task_context_data);
use_local_io_executor = io_ctx->use_local_io_executor;
}
auto* pool = get_executor(task.workgroup, use_local_io_executor);
auto* pool = get_executor(task.workgroup.get(), use_local_io_executor);
if (pool->submit(std::move(task))) {
return Status::OK();
} else {
Expand All @@ -114,7 +114,7 @@ struct IOTaskExecutor {
static void force_submit(workgroup::ScanTask task) {
const auto& task_ctx = task.get_work_context();
auto io_ctx = std::any_cast<SpillIOTaskContextPtr>(task_ctx.task_context_data);
auto* pool = get_executor(task.workgroup, io_ctx->use_local_io_executor);
auto* pool = get_executor(task.workgroup.get(), io_ctx->use_local_io_executor);
pool->force_submit(std::move(task));
}

Expand Down
7 changes: 3 additions & 4 deletions be/src/exec/spill/spiller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ Status RawSpillerWriter::flush(RuntimeState* state, MemGuard&& guard) {
};

auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);
Expand Down Expand Up @@ -255,8 +255,7 @@ Status SpillerReader::trigger_restore(RuntimeState* state, MemGuard&& guard) {
auto ctx = std::any_cast<SpillIOTaskContextPtr>(task.get_work_context().task_context_data);
TaskExecutor::force_submit(std::move(task));
};
auto io_task =
workgroup::ScanTask(_spiller->options().wg.get(), std::move(restore_task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(restore_task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().restore_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_restore_io_task_count, _running_restore_tasks);
Expand Down Expand Up @@ -347,7 +346,7 @@ Status PartitionedSpillerWriter::flush(RuntimeState* state, bool is_final_flush,
return Status::OK();
};
auto yield_func = [&](workgroup::ScanTask&& task) { TaskExecutor::force_submit(std::move(task)); };
auto io_task = workgroup::ScanTask(_spiller->options().wg.get(), std::move(task), std::move(yield_func));
auto io_task = workgroup::ScanTask(_spiller->options().wg, std::move(task), std::move(yield_func));
RETURN_IF_ERROR(TaskExecutor::submit(std::move(io_task)));
COUNTER_UPDATE(_spiller->metrics().flush_io_task_count, 1);
COUNTER_SET(_spiller->metrics().peak_flush_io_task_count, _running_flush_tasks);
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/workgroup/scan_task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ bool WorkGroupScanTaskQueue::try_offer(ScanTask task) {
task.peak_scan_task_queue_size_counter->set(_num_tasks);
}

auto* wg_entity = _sched_entity(task.workgroup);
auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
RETURN_IF_UNLIKELY(!wg_entity->queue()->try_offer(std::move(task)), false);

Expand All @@ -137,7 +137,7 @@ void WorkGroupScanTaskQueue::force_put(ScanTask task) {
task.peak_scan_task_queue_size_counter->set(_num_tasks);
}

auto* wg_entity = _sched_entity(task.workgroup);
auto* wg_entity = _sched_entity(task.workgroup.get());
wg_entity->set_in_queue(this);
wg_entity->queue()->force_put(std::move(task));

Expand All @@ -151,7 +151,7 @@ void WorkGroupScanTaskQueue::force_put(ScanTask task) {

void WorkGroupScanTaskQueue::update_statistics(ScanTask& task, int64_t runtime_ns) {
std::lock_guard<std::mutex> lock(_global_mutex);
auto* wg = task.workgroup;
auto* wg = task.workgroup.get();
auto* wg_entity = _sched_entity(wg);

// Update sched entity information.
Expand Down
13 changes: 7 additions & 6 deletions be/src/exec/workgroup/scan_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <queue>
#include <set>
#include <unordered_set>
#include <utility>

#include "common/statusor.h"
#include "exec/workgroup/work_group_fwd.h"
Expand Down Expand Up @@ -69,11 +70,11 @@ struct ScanTask {
using YieldFunction = std::function<void(ScanTask&&)>;

ScanTask() : ScanTask(nullptr, nullptr) {}
explicit ScanTask(WorkFunction work_function) : workgroup(nullptr), work_function(std::move(work_function)) {}
ScanTask(WorkGroup* workgroup, WorkFunction work_function)
: workgroup(workgroup), work_function(std::move(work_function)) {}
ScanTask(WorkGroup* workgroup, WorkFunction work_function, YieldFunction yield_function)
: workgroup(workgroup),
explicit ScanTask(WorkFunction work_function) : ScanTask(nullptr, std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function)
: workgroup(std::move(workgroup)), work_function(std::move(work_function)) {}
ScanTask(WorkGroupPtr workgroup, WorkFunction work_function, YieldFunction yield_function)
: workgroup(std::move(workgroup)),
work_function(std::move(work_function)),
yield_function(std::move(yield_function)) {}
~ScanTask() = default;
Expand Down Expand Up @@ -103,7 +104,7 @@ struct ScanTask {
const YieldContext& get_work_context() const { return work_context; }

public:
WorkGroup* workgroup;
WorkGroupPtr workgroup;
YieldContext work_context;
WorkFunction work_function;
YieldFunction yield_function;
Expand Down
2 changes: 1 addition & 1 deletion be/src/udf/java/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ PromiseStatusPtr call_hdfs_scan_function_in_pthread(const std::function<Status()
PromiseStatusPtr ms = std::make_unique<PromiseStatus>();
if (bthread_self()) {
ExecEnv::GetInstance()->connector_scan_executor()->submit(workgroup::ScanTask(
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup().get(),
ExecEnv::GetInstance()->workgroup_manager()->get_default_workgroup(),
[promise = ms.get(), func](workgroup::YieldContext&) { promise->set_value(func()); }));
} else {
ms->set_value(func());
Expand Down

0 comments on commit b5bdb0d

Please sign in to comment.