diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index b2817e291cadd5..d7bef524f01146 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -22,7 +22,6 @@ #include #include #include -#include #include // IWYU pragma: no_include @@ -1038,23 +1037,34 @@ class SyncSizeClosure : public AutoReleaseClosure _dependency; RuntimeFilterContextSPtr _rf_context; std::string _rf_debug_info; + using Base = AutoReleaseClosure>; ENABLE_FACTORY_CREATOR(SyncSizeClosure); void _process_if_rpc_failed() override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); - LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info; + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context; + if (!ctx) { + return; + } + + ctx->err_msg = cntl_->ErrorText(); Base::_process_if_rpc_failed(); } void _process_if_meet_error_status(const Status& status) override { - ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); + Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }}; + auto ctx = _rf_context; + if (!ctx) { + return; + } + if (status.is()) { // rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case. - _rf_context->ignored = true; + ctx->ignored = true; } else { - LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info; + ctx->err_msg = status.to_string(); Base::_process_if_meet_error_status(status); } } @@ -1063,11 +1073,8 @@ class SyncSizeClosure : public AutoReleaseClosure req, std::shared_ptr> callback, std::shared_ptr dependency, - RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info) - : Base(req, callback), - _dependency(std::move(dependency)), - _rf_context(rf_context), - _rf_debug_info(rf_debug_info) {} + RuntimeFilterContextSPtr rf_context) + : Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {} }; Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { @@ -1111,8 +1118,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto callback = DummyBrpcCallback::create_shared(); // IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, - _wrapper->_context, this->debug_string()); + auto closure = + SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); @@ -1576,9 +1583,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() { std::string IRuntimeFilter::debug_string() const { return fmt::format( "RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, " - "build_bf_cardinality: {}", + "build_bf_cardinality: {}, error_msg: {}", _filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join, - _wrapper->get_build_bf_cardinality()); + _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg); } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 4581bb762e80e3..aba441f282a815 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -46,6 +46,7 @@ struct RuntimeFilterContext { std::shared_ptr bloom_filter_func; std::shared_ptr bitmap_filter_func; bool ignored = false; + std::string err_msg; }; using RuntimeFilterContextSPtr = std::shared_ptr;