Skip to content

Commit

Permalink
[profile](pipeline) Add key metrics for pipeline initialization (#35073
Browse files Browse the repository at this point in the history
…) (#44738)

pick #35073
  • Loading branch information
Gabriel39 authored Dec 2, 2024
1 parent 8843b17 commit 7278a86
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 72 deletions.
155 changes: 85 additions & 70 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
_runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext");
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
_build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
_init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
_plan_local_shuffle_timer = ADD_TIMER(_runtime_profile, "PlanLocalShuffleTime");
_build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
_prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile, "PrepareAllPipelinesTime");

auto* fragment_context = this;

Expand All @@ -209,89 +214,99 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r
.tag("fragment_id", _fragment_id)
.tag("pthread_id", (uintptr_t)pthread_self());

if (request.query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(request.query_options.is_report_success);
}

// 1. Set up the global runtime state.
_runtime_state = RuntimeState::create_unique(request.query_id, request.fragment_id,
request.query_options, _query_ctx->query_globals,
_exec_env, _query_ctx.get());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
_runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
_runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
_runtime_state->set_load_job_id(request.load_job_id);
}

if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);

const auto& local_params = request.local_params[0];
if (local_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
local_params.runtime_filter_params);
}
if (local_params.__isset.topn_filter_source_node_ids) {
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
} else {
_query_ctx->init_runtime_predicates({0});
}
{
SCOPED_TIMER(_init_context_timer);
if (request.query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(request.query_options.is_report_success);
}

_need_local_merge = request.__isset.parallel_instances;
// 1. Set up the global runtime state.
_runtime_state = RuntimeState::create_unique(
request.query_id, request.fragment_id, request.query_options,
_query_ctx->query_globals, _exec_env, _query_ctx.get());
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
_runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
_runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
_runtime_state->set_load_job_id(request.load_job_id);
}

// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
_runtime_state->obj_pool(), request, *_query_ctx->desc_tbl, &_root_op, root_pipeline));
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl,
&_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
_runtime_state->set_total_load_streams(request.total_load_streams);
_runtime_state->set_num_local_sink(request.num_local_sink);

const auto& local_params = request.local_params[0];
if (local_params.__isset.runtime_filter_params) {
_query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
local_params.runtime_filter_params);
}
if (local_params.__isset.topn_filter_source_node_ids) {
_query_ctx->init_runtime_predicates(local_params.topn_filter_source_node_ids);
} else {
_query_ctx->init_runtime_predicates({0});
}

// 3. Create sink operator
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
_need_local_merge = request.__isset.parallel_instances;
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink, request.fragment.output_exprs,
request, root_pipeline->output_row_desc(), _runtime_state.get(), *_desc_tbl,
root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(root_pipeline->set_sink(_sink));

for (PipelinePtr& pipeline : _pipelines) {
DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size();
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
{
SCOPED_TIMER(_build_pipelines_timer);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(_runtime_state->obj_pool(), request,
*_query_ctx->desc_tbl, &_root_op,
root_pipeline));

// 3. Create sink operator
if (!request.fragment.__isset.output_sink) {
return Status::InternalError("No output sink in this fragment!");
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, root_pipeline->output_row_desc(),
_runtime_state.get(), *_desc_tbl, root_pipeline->id()));
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
RETURN_IF_ERROR(root_pipeline->set_sink(_sink));

for (PipelinePtr& pipeline : _pipelines) {
DCHECK(pipeline->sink_x() != nullptr) << pipeline->operator_xs().size();
RETURN_IF_ERROR(pipeline->sink_x()->set_child(pipeline->operator_xs().back()));
}
}
if (_enable_local_shuffle()) {
SCOPED_TIMER(_plan_local_shuffle_timer);
RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
request.bucket_seq_to_instance_idx,
request.shuffle_idx_to_instance_idx));
}

// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
SCOPED_TIMER(_prepare_all_pipelines_timer);
pipeline->children().clear();
RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get()));
}

// 5. Build pipeline tasks and initialize local state.
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));

{
// 5. Build pipeline tasks and initialize local state.
SCOPED_TIMER(_build_tasks_timer);
RETURN_IF_ERROR(_build_pipeline_x_tasks(request, thread_pool));
}
_init_next_report_time();

_prepared = true;
Expand Down Expand Up @@ -1568,10 +1583,10 @@ Status PipelineXFragmentContext::send_report(bool done) {
}
}
return _report_status_cb(
{true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(),
done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id,
TUniqueId(), _backend_num, _runtime_state.get(),
[this](Status st) { return update_status(st); },
{true, exec_status, runtime_states, _runtime_profile.get(),
_runtime_state->load_channel_profile(), done || !exec_status.ok(),
_query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num,
_runtime_state.get(), [this](Status st) { return update_status(st); },
[this](const PPlanFragmentCancelReason& reason, const std::string& msg) {
cancel(reason, msg);
}},
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ class PipelineXFragmentContext : public PipelineFragmentContext {
int _total_instances = -1;

bool _require_bucket_distribution = false;

RuntimeProfile::Counter* _init_context_timer = nullptr;
RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
RuntimeProfile::Counter* _build_tasks_timer = nullptr;
};

} // namespace pipeline
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,14 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
}

if (enable_profile) {
DCHECK(req.profile != nullptr);
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
detailed_param.__isset.profile = true;
detailed_param.__isset.loadChannelProfile = false;
detailed_param.__set_is_fragment_level(true);
req.profile->to_thrift(&detailed_param.profile);
params.detailed_report.push_back(detailed_param);
for (auto& pipeline_profile : req.runtime_state->pipeline_id_to_profile()) {
TDetailedReportParams detailed_param;
detailed_param.__isset.fragment_instance_id = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,10 @@ public void updateProfile(TReportExecStatusParams params) {
if (isPipelineXProfile) {
int pipelineIdx = 0;
List<RuntimeProfile> taskProfile = Lists.newArrayList();
String suffix = " (host=" + backend.getHeartbeatAddress() + ")";
for (TDetailedReportParams param : params.detailed_report) {
String name = "Pipeline :" + pipelineIdx + " "
+ " (host=" + backend.getHeartbeatAddress() + ")";
String name = param.isSetIsFragmentLevel() && param.is_fragment_level ? "Fragment Level Profile: "
+ suffix : "Pipeline :" + pipelineIdx + " " + suffix;
RuntimeProfile profile = new RuntimeProfile(name);
taskProfile.add(profile);
if (param.isSetProfile()) {
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ struct TDetailedReportParams {
1: optional Types.TUniqueId fragment_instance_id
2: optional RuntimeProfile.TRuntimeProfileTree profile
3: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
4: optional bool is_fragment_level
}


Expand Down

0 comments on commit 7278a86

Please sign in to comment.