Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Nov 15, 2024
1 parent 279ea62 commit c32cb5e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
7 changes: 3 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,11 +787,10 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
{ return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });

auto* rf_ctx = RuntimeFilterParamsContext::create(query_ctx.get());
rf_ctx->set_state(context->get_runtime_state());
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
RETURN_IF_ERROR(_runtimefilter_controller.add_entity(params.local_params[0], params.query_id,
params.query_options, &handler, rf_ctx));
RETURN_IF_ERROR(_runtimefilter_controller.add_entity(
params.local_params[0], params.query_id, params.query_options, &handler,
RuntimeFilterParamsContext::create(context->get_runtime_state())));
if (handler) {
query_ctx->set_merge_controller_handler(handler);
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,14 @@ void RuntimeFilterMergeController::remove_entity(UniqueId query_id) {
_filter_controller_map[shard].erase(query_id);
}

RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* state) {
RuntimeFilterParamsContext* params =
state->get_query_ctx()->obj_pool.add(new RuntimeFilterParamsContext());
params->_query_ctx = state->get_query_ctx();
params->_state = state;
return params;
}

RuntimeFilterMgr* RuntimeFilterParamsContext::global_runtime_filter_mgr() {
return _query_ctx->runtime_filter_mgr();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ class RuntimeFilterMergeController {
// 1. Global runtime filter. Managed by QueryContext's RuntimeFilterMgr which is produced by multiple producers and shared by multiple consumers.
// 2. Local runtime filter. Managed by RuntimeState's RuntimeFilterMgr which is 1-producer-1-consumer mode.
struct RuntimeFilterParamsContext {
static RuntimeFilterParamsContext* create(RuntimeState* state);
static RuntimeFilterParamsContext* create(QueryContext* query_ctx);

QueryContext* get_query_ctx() const { return _query_ctx; }
Expand Down

0 comments on commit c32cb5e

Please sign in to comment.