From 95a0a8fdc650446513e85c2a0e63f754037e1b19 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 28 Oct 2024 10:48:03 +0800 Subject: [PATCH] [fix](flink) Fix scanner be cancelled early on pipelineX (#42421) 1. A query_id is randomly generated to replace t_query_plan_info.query_id. external query does not need to report anything to FE, so the query_id can be changed. Otherwise, multiple independent concurrent open tablet scanners have the same query_id. when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id). 2. (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. 3. External query (flink/spark read tablets) not need to report to FE. --- be/src/runtime/fragment_mgr.cpp | 44 ++++++++++++++++++------------ be/src/runtime/fragment_mgr.h | 1 + be/src/service/backend_service.cpp | 14 ++++++++-- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 96d49be4c126df..a12d24c76084b0 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -299,6 +299,10 @@ Status FragmentMgr::trigger_pipeline_context_report( // including the final status when execution finishes. void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.status.ok() || req.done); // if !status.ok() => done + if (req.coord_addr.hostname == "external") { + // External query (flink/spark read tablets) not need to report to FE. + return; + } Status exec_status = req.status; Status coord_status; FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr, @@ -798,31 +802,33 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, query_ctx->set_merge_controller_handler(handler); } - for (const auto& local_param : params.local_params) { - const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; + { + // (query_id, fragment_id) is executed only on one BE, locks _pipeline_map. std::lock_guard lock(_lock); - auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); - if (iter != _pipeline_map.end()) { - return Status::InternalError("exec_plan_fragment input duplicated fragment_id({})", - params.fragment_id); + for (const auto& local_param : params.local_params) { + const TUniqueId& fragment_instance_id = local_param.fragment_instance_id; + auto iter = _pipeline_map.find({params.query_id, params.fragment_id}); + if (iter != _pipeline_map.end()) { + return Status::InternalError( + "exec_plan_fragment query_id({}) input duplicated fragment_id({})", + print_id(params.query_id), params.fragment_id); + } + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } - query_ctx->fragment_instance_ids.push_back(fragment_instance_id); - } - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { - query_ctx->set_ready_to_execute_only(); - } - - int64 now = duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - { + int64 now = duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); g_fragment_executing_count << 1; g_fragment_last_active_time.set_value(now); - std::lock_guard lock(_lock); // TODO: simplify this mapping _pipeline_map.insert({{params.query_id, params.fragment_id}, context}); } + + if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { + query_ctx->set_ready_to_execute_only(); + } + query_ctx->set_pipeline_context(params.fragment_id, context); RETURN_IF_ERROR(context->submit()); @@ -1032,6 +1038,7 @@ void FragmentMgr::debug(std::stringstream& ss) {} */ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, const TQueryPlanInfo& t_query_plan_info, + const TUniqueId& query_id, const TUniqueId& fragment_instance_id, std::vector* selected_columns) { // set up desc tbl @@ -1072,8 +1079,9 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, // assign the param used for executing of PlanFragment-self TPipelineInstanceParams fragment_exec_params; - exec_fragment_params.query_id = t_query_plan_info.query_id; + exec_fragment_params.query_id = query_id; fragment_exec_params.fragment_instance_id = fragment_instance_id; + exec_fragment_params.coord.hostname = "external"; std::map<::doris::TPlanNodeId, std::vector> per_node_scan_ranges; std::vector scan_ranges; std::vector tablet_ids = params.tablet_ids; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 41b63db0b23ad9..20b2fd8cdc2063 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -112,6 +112,7 @@ class FragmentMgr : public RestMonitorIface { // execute external query, all query info are packed in TScanOpenParams Status exec_external_plan_fragment(const TScanOpenParams& params, const TQueryPlanInfo& t_query_plan_info, + const TUniqueId& query_id, const TUniqueId& fragment_instance_id, std::vector* selected_columns); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index d56aa49b19b1cf..e6fdfaa87657f8 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -802,6 +802,11 @@ void BaseBackendService::submit_routine_load_task(TStatus& t_status, void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenParams& params) { TStatus t_status; TUniqueId fragment_instance_id = generate_uuid(); + // A query_id is randomly generated to replace t_query_plan_info.query_id. + // external query does not need to report anything to FE, so the query_id can be changed. + // Otherwise, multiple independent concurrent open tablet scanners have the same query_id. + // when one of the scanners ends, the other scanners will be canceled through FragmentMgr.cancel(query_id). + TUniqueId query_id = generate_uuid(); std::shared_ptr p_context; static_cast(_exec_env->external_scan_context_mgr()->create_scan_context(&p_context)); p_context->fragment_instance_id = fragment_instance_id; @@ -838,13 +843,18 @@ void BaseBackendService::open_scanner(TScanOpenResult& result_, const TScanOpenP << " deserialize error, should not be modified after returned Doris FE processed"; exec_st = Status::InvalidArgument(msg.str()); } - p_context->query_id = t_query_plan_info.query_id; + p_context->query_id = query_id; } std::vector selected_columns; if (exec_st.ok()) { // start the scan procedure + LOG(INFO) << fmt::format( + "exec external scanner, old_query_id = {}, new_query_id = {}, fragment_instance_id " + "= {}", + print_id(t_query_plan_info.query_id), print_id(query_id), + print_id(fragment_instance_id)); exec_st = _exec_env->fragment_mgr()->exec_external_plan_fragment( - params, t_query_plan_info, fragment_instance_id, &selected_columns); + params, t_query_plan_info, query_id, fragment_instance_id, &selected_columns); } exec_st.to_thrift(&t_status); //return status