Skip to content

Commit

Permalink
[Chore](runtime-filter) add rpc error msg to RuntimeFilterContext (#4…
Browse files Browse the repository at this point in the history
…3517)

add rpc error msg to RuntimeFilterContext
  • Loading branch information
BiteTheDDDDt authored Nov 11, 2024
1 parent dae8f60 commit a71d4ec
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 22 deletions.
43 changes: 21 additions & 22 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <stddef.h>

#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
Expand Down Expand Up @@ -1054,30 +1053,33 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
// after query context because the rpc is not returned.
std::weak_ptr<RuntimeFilterContext> _rf_context;
std::string _rf_debug_info;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);

void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info;
Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
auto ctx = _rf_context.lock();
if (!ctx) {
return;
}

ctx->err_msg = cntl_->ErrorText();
Base::_process_if_rpc_failed();
}

void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
auto ctx = _rf_context.lock();
if (!ctx) {
return;
}

if (status.is<ErrorCode::END_OF_FILE>()) {
// rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case.
auto ctx = _rf_context.lock();
if (ctx) {
ctx->ignored = true;
} else {
LOG(WARNING) << "sync filter size returned but context is released, filter="
<< _rf_debug_info;
}
ctx->ignored = true;
} else {
LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info;
ctx->err_msg = status.to_string();
Base::_process_if_meet_error_status(status);
}
}
Expand All @@ -1086,11 +1088,8 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info)
: Base(req, callback),
_dependency(std::move(dependency)),
_rf_context(rf_context),
_rf_debug_info(rf_debug_info) {}
RuntimeFilterContextSPtr rf_context)
: Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1134,8 +1133,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency,
_wrapper->_context, this->debug_string());
auto closure =
SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand Down Expand Up @@ -1511,9 +1510,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, "
"build_bf_cardinality: {}",
"build_bf_cardinality: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join,
_wrapper->get_build_bf_cardinality());
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct RuntimeFilterContext {
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
bool ignored = false;
std::string err_msg;
};

using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>;
Expand Down

0 comments on commit a71d4ec

Please sign in to comment.