From 4516ba0cc4fc3ad4654ed3c2d308e0fadddebb15 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 8 Nov 2024 17:10:18 +0800 Subject: [PATCH 1/2] add rpc error msg to RuntimeFilterContext --- be/src/exprs/runtime_filter.cpp | 43 +++++++++---------- .../runtime/shared_hash_table_controller.h | 1 + 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index fb82450ac4d600..fc0693cb2e938c 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include // IWYU pragma: no_include @@ -1054,30 +1053,33 @@ class SyncSizeClosure : public AutoReleaseClosure _rf_context; - std::string _rf_debug_info; using Base = AutoReleaseClosure>; ENABLE_FACTORY_CREATOR(SyncSizeClosure); + ~SyncSizeClosure() override { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); } + void _process_if_rpc_failed() override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); - LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info; + auto ctx = _rf_context.lock(); + if (!ctx) { + return; + } + + ctx->err_msg = cntl_->ErrorText(); Base::_process_if_rpc_failed(); } void _process_if_meet_error_status(const Status& status) override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + auto ctx = _rf_context.lock(); + if (!ctx) { + return; + } + if (status.is()) { // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. - auto ctx = _rf_context.lock(); - if (ctx) { - ctx->ignored = true; - } else { - LOG(WARNING) << "sync filter size returned but context is released, filter=" - << _rf_debug_info; - } + ctx->ignored = true; } else { - LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info; + ctx->err_msg = status.to_string(); Base::_process_if_meet_error_status(status); } } @@ -1086,11 +1088,8 @@ class SyncSizeClosure : public AutoReleaseClosure req, std::shared_ptr> callback, std::shared_ptr dependency, - RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info) - : Base(req, callback), - _dependency(std::move(dependency)), - _rf_context(rf_context), - _rf_debug_info(rf_debug_info) {} + RuntimeFilterContextSPtr rf_context) + : Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {} }; Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { @@ -1134,8 +1133,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto callback = DummyBrpcCallback::create_shared(); // IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, - _wrapper->_context, this->debug_string()); + auto closure = + SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); @@ -1511,9 +1510,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { std::string IRuntimeFilter::debug_string() const { return fmt::format( "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, " - "build_bf_cardinality: {}", + "build_bf_cardinality: {}, error_msg: {}", _filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join, - _wrapper->get_build_bf_cardinality()); + _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg); } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 7bd032c6b56237..fd3a753be8c0b3 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -47,6 +47,7 @@ struct RuntimeFilterContext { std::shared_ptr bloom_filter_func; std::shared_ptr bitmap_filter_func; bool ignored = false; + std::string err_msg; }; using RuntimeFilterContextSPtr = std::shared_ptr; From d6e89b605e67e69103cf6d9a97801fb81be15b88 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Fri, 8 Nov 2024 18:44:45 +0800 Subject: [PATCH 2/2] fix --- be/src/exprs/runtime_filter.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index fc0693cb2e938c..85f1c535c7038b 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1057,9 +1057,8 @@ class SyncSizeClosure : public AutoReleaseClosure>; ENABLE_FACTORY_CREATOR(SyncSizeClosure); - ~SyncSizeClosure() override { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); } - void _process_if_rpc_failed() override { + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; auto ctx = _rf_context.lock(); if (!ctx) { return; @@ -1070,6 +1069,7 @@ class SyncSizeClosure : public AutoReleaseClosuresub(); }}; auto ctx = _rf_context.lock(); if (!ctx) { return;