Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chore](runtime-filter) add rpc error msg to RuntimeFilterContext #43517

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <stddef.h>

#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
Expand Down Expand Up @@ -1054,30 +1053,33 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
// context, it not the memory is not released. And rpc is in another thread, it will hold rf context
// after query context because the rpc is not returned.
std::weak_ptr<RuntimeFilterContext> _rf_context;
std::string _rf_debug_info;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);

void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
LOG(WARNING) << "sync filter size meet rpc error, filter=" << _rf_debug_info;
Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
auto ctx = _rf_context.lock();
if (!ctx) {
return;
}

ctx->err_msg = cntl_->ErrorText();
Base::_process_if_rpc_failed();
}

void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
Defer defer {[&]() { ((pipeline::CountedFinishDependency*)_dependency.get())->sub(); }};
auto ctx = _rf_context.lock();
if (!ctx) {
return;
}

if (status.is<ErrorCode::END_OF_FILE>()) {
// rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case.
auto ctx = _rf_context.lock();
if (ctx) {
ctx->ignored = true;
} else {
LOG(WARNING) << "sync filter size returned but context is released, filter="
<< _rf_debug_info;
}
ctx->ignored = true;
} else {
LOG(WARNING) << "sync filter size meet error status, filter=" << _rf_debug_info;
ctx->err_msg = status.to_string();
Base::_process_if_meet_error_status(status);
}
}
Expand All @@ -1086,11 +1088,8 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
RuntimeFilterContextSPtr rf_context, std::string_view rf_debug_info)
: Base(req, callback),
_dependency(std::move(dependency)),
_rf_context(rf_context),
_rf_debug_info(rf_debug_info) {}
RuntimeFilterContextSPtr rf_context)
: Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1134,8 +1133,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency,
_wrapper->_context, this->debug_string());
auto closure =
SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand Down Expand Up @@ -1511,9 +1510,9 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile() {
std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, need_local_merge: {}, is_broadcast: {}, "
"build_bf_cardinality: {}",
"build_bf_cardinality: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _need_local_merge, _is_broadcast_join,
_wrapper->get_build_bf_cardinality());
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/shared_hash_table_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct RuntimeFilterContext {
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
bool ignored = false;
std::string err_msg;
};

using RuntimeFilterContextSPtr = std::shared_ptr<RuntimeFilterContext>;
Expand Down
Loading