Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Support dynamic partition pruning (backport #30319) #49454

Merged
merged 6 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <unordered_map>

#include "exec/pipeline/scan/morsel.h"
#include "exprs/runtime_filter_bank.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -69,7 +70,7 @@ class DataSource {
_runtime_profile->add_info_string("DataSourceType", name());
}
void set_predicates(const std::vector<ExprContext*>& predicates) { _conjunct_ctxs = predicates; }
void set_runtime_filters(const RuntimeFilterProbeCollector* runtime_filters) { _runtime_filters = runtime_filters; }
void set_runtime_filters(RuntimeFilterProbeCollector* runtime_filters) { _runtime_filters = runtime_filters; }
void set_read_limit(const uint64_t limit) { _read_limit = limit; }
void set_split_context(pipeline::ScanSplitContext* split_context) { _split_context = split_context; }
Status parse_runtime_filters(RuntimeState* state);
Expand All @@ -87,7 +88,8 @@ class DataSource {
int64_t _read_limit = -1; // no limit
bool _has_any_predicate = false;
std::vector<ExprContext*> _conjunct_ctxs;
const RuntimeFilterProbeCollector* _runtime_filters = nullptr;
RuntimeFilterProbeCollector* _runtime_filters = nullptr;
RuntimeBloomFilterEvalContext runtime_bloom_filter_eval_context;
RuntimeProfile* _runtime_profile = nullptr;
TupleDescriptor* _tuple_desc = nullptr;
pipeline::ScanSplitContext* _split_context = nullptr;
Expand Down
85 changes: 54 additions & 31 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ Status HiveDataSource::open(RuntimeState* state) {
if (state->query_options().__isset.enable_connector_split_io_tasks) {
_enable_split_tasks = state->query_options().enable_connector_split_io_tasks;
}
if (state->query_options().__isset.enable_dynamic_prune_scan_range) {
_enable_dynamic_prune_scan_range = state->query_options().enable_dynamic_prune_scan_range;
}

RETURN_IF_ERROR(_init_conjunct_ctxs(state));
_init_tuples_and_slots(state);
Expand Down Expand Up @@ -171,44 +174,46 @@ Status HiveDataSource::_init_partition_values() {
}

const auto& partition_values = partition_desc->partition_key_value_evals();
_partition_values = partition_values;

if (_has_partition_conjuncts || _has_scan_range_indicate_const_column) {
ChunkPtr partition_chunk = ChunkHelper::new_chunk(_partition_slots, 1);
// append partition data
for (int i = 0; i < _partition_slots.size(); i++) {
SlotId slot_id = _partition_slots[i]->id();
int partition_col_idx = _partition_index_in_hdfs_partition_columns[i];
ASSIGN_OR_RETURN(auto partition_value_col, partition_values[partition_col_idx]->evaluate(nullptr));
assert(partition_value_col->is_constant());
auto* const_column = ColumnHelper::as_raw_column<ConstColumn>(partition_value_col);
const ColumnPtr& data_column = const_column->data_column();
ColumnPtr& chunk_part_column = partition_chunk->get_column_by_slot_id(slot_id);
if (data_column->is_nullable()) {
chunk_part_column->append_default();
} else {
chunk_part_column->append(*data_column, 0, 1);
_partition_values = partition_desc->partition_key_value_evals();

// init partition chunk
auto partition_chunk = std::make_shared<Chunk>();
for (int i = 0; i < _partition_slots.size(); i++) {
SlotId slot_id = _partition_slots[i]->id();
int partition_col_idx = _partition_index_in_hdfs_partition_columns[i];
ASSIGN_OR_RETURN(auto partition_value_col, partition_values[partition_col_idx]->evaluate(nullptr));
DCHECK(partition_value_col->is_constant());
partition_chunk->append_column(partition_value_col, slot_id);
}

// eval conjuncts and skip if no rows.
if (_has_scan_range_indicate_const_column) {
std::vector<ExprContext*> ctxs;
for (SlotId slotId : _scan_range.identity_partition_slot_ids) {
if (_conjunct_ctxs_by_slot.find(slotId) != _conjunct_ctxs_by_slot.end()) {
ctxs.insert(ctxs.end(), _conjunct_ctxs_by_slot.at(slotId).begin(),
_conjunct_ctxs_by_slot.at(slotId).end());
}
}
RETURN_IF_ERROR(ExecNode::eval_conjuncts(ctxs, partition_chunk.get()));
} else if (_has_partition_conjuncts) {
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_partition_conjunct_ctxs, partition_chunk.get()));
}

// eval conjuncts and skip if no rows.
if (_has_scan_range_indicate_const_column) {
std::vector<ExprContext*> ctxs;
for (SlotId slotId : _scan_range.identity_partition_slot_ids) {
if (_conjunct_ctxs_by_slot.find(slotId) != _conjunct_ctxs_by_slot.end()) {
ctxs.insert(ctxs.end(), _conjunct_ctxs_by_slot.at(slotId).begin(),
_conjunct_ctxs_by_slot.at(slotId).end());
}
}
RETURN_IF_ERROR(ExecNode::eval_conjuncts(ctxs, partition_chunk.get()));
} else {
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_partition_conjunct_ctxs, partition_chunk.get()));
}
if (!partition_chunk->has_rows()) {
_filter_by_eval_partition_conjuncts = true;
return Status::OK();
}

if (_enable_dynamic_prune_scan_range && _runtime_filters) {
_init_rf_counters();
_runtime_filters->evaluate_partial_chunk(partition_chunk.get(), runtime_bloom_filter_eval_context);
if (!partition_chunk->has_rows()) {
_filter_by_eval_partition_conjuncts = true;
return Status::OK();
}
}

return Status::OK();
}

Expand Down Expand Up @@ -453,6 +458,24 @@ void HiveDataSource::_init_counter(RuntimeState* state) {
}
}

void HiveDataSource::_init_rf_counters() {
auto* root = _runtime_profile;
if (runtime_bloom_filter_eval_context.join_runtime_filter_timer == nullptr) {
static const char* prefix = "DynamicPruneScanRange";
ADD_COUNTER(root, prefix, TUnit::NONE);
runtime_bloom_filter_eval_context.join_runtime_filter_timer =
ADD_CHILD_TIMER(root, "JoinRuntimeFilterTime", prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_hash_timer =
ADD_CHILD_TIMER(root, "JoinRuntimeFilterHashTime", prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_input_counter =
ADD_CHILD_COUNTER(root, "JoinRuntimeFilterInputScanRanges", TUnit::UNIT, prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_output_counter =
ADD_CHILD_COUNTER(root, "JoinRuntimeFilterOutputScanRanges", TUnit::UNIT, prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_eval_counter =
ADD_CHILD_COUNTER(root, "JoinRuntimeFilterEvaluate", TUnit::UNIT, prefix);
}
}

Status HiveDataSource::_init_scanner(RuntimeState* state) {
SCOPED_TIMER(_profile.open_file_timer);

Expand Down Expand Up @@ -603,7 +626,7 @@ void HiveDataSource::close(RuntimeState* state) {
if (!_scanner->has_split_tasks()) {
COUNTER_UPDATE(_profile.scan_ranges_counter, 1);
}
_scanner->close(state);
_scanner->close();
}
Expr::close(_min_max_conjunct_ctxs, state);
Expr::close(_partition_conjunct_ctxs, state);
Expand Down
2 changes: 2 additions & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class HiveDataSource final : public DataSource {
Status _decompose_conjunct_ctxs(RuntimeState* state);
void _init_tuples_and_slots(RuntimeState* state);
void _init_counter(RuntimeState* state);
void _init_rf_counters();

Status _init_partition_values();
Status _init_scanner(RuntimeState* state);
Expand All @@ -111,6 +112,7 @@ class HiveDataSource final : public DataSource {
int32_t _datacache_evict_probability = 0;
bool _use_file_metacache = false;
bool _enable_split_tasks = false;
bool _enable_dynamic_prune_scan_range = true;

// ============ conjuncts =================
std::vector<ExprContext*> _min_max_conjunct_ctxs;
Expand Down
14 changes: 6 additions & 8 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,26 +193,24 @@ Status HdfsScanner::open(RuntimeState* runtime_state) {
return Status::OK();
}

void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
void HdfsScanner::close() noexcept {
VLOG_FILE << "close file success: " << _scanner_params.path << ", scan range = ["
<< _scanner_params.scan_range->offset << ","
<< (_scanner_params.scan_range->length + _scanner_params.scan_range->offset)
<< "], rows = " << _app_stats.rows_read;

if (!_runtime_state) {
return;
}

bool expect = false;
if (!_closed.compare_exchange_strong(expect, true)) return;
update_counter();
do_close(runtime_state);
do_close(_runtime_state);
_file.reset(nullptr);
_mor_processor->close(_runtime_state);
}

void HdfsScanner::finalize() {
if (_runtime_state != nullptr) {
close(_runtime_state);
}
}

Status HdfsScanner::open_random_access_file() {
CHECK(_file == nullptr) << "File has already been opened";
ASSIGN_OR_RETURN(std::unique_ptr<RandomAccessFile> raw_file,
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "exec/mor_processor.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/runtime_filter_bank.h"
#include "fs/fs.h"
#include "io/cache_input_stream.h"
#include "io/shared_buffered_input_stream.h"
Expand Down Expand Up @@ -330,11 +331,10 @@ class HdfsScanner {
HdfsScanner() = default;
virtual ~HdfsScanner() = default;

Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
Status open(RuntimeState* runtime_state);
void close(RuntimeState* runtime_state) noexcept;
Status get_next(RuntimeState* runtime_state, ChunkPtr* chunk);
Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
void finalize();
void close() noexcept;

int64_t num_bytes_read() const { return _app_stats.bytes_read; }
int64_t raw_rows_read() const { return _app_stats.raw_rows_read; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/jni_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class JniScanner : public HdfsScanner {
JniScanner(std::string factory_class, std::map<std::string, std::string> params)
: _jni_scanner_params(std::move(params)), _jni_scanner_factory_class(std::move(factory_class)) {}

~JniScanner() override { finalize(); }
~JniScanner() override { close(); }

[[nodiscard]] Status do_open(RuntimeState* runtime_state) override;
void do_update_counter(HdfsScanProfile* profile) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class ConnectorChunkSource : public ChunkSource {

const int64_t _limit; // -1: no limit
const std::vector<ExprContext*>& _runtime_in_filters;
const RuntimeFilterProbeCollector* _runtime_bloom_filters;
RuntimeFilterProbeCollector* _runtime_bloom_filters;

// copied from scan node and merge predicates from runtime filter.
std::vector<ExprContext*> _conjunct_ctxs;
Expand Down
83 changes: 83 additions & 0 deletions be/src/exprs/runtime_filter_bank.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,72 @@ void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEv
}
}
}

void RuntimeFilterProbeCollector::do_evaluate_partial_chunk(Chunk* partial_chunk,
RuntimeBloomFilterEvalContext& eval_context) {
auto& selection = eval_context.running_context.selection;
eval_context.running_context.use_merged_selection = false;
eval_context.running_context.compatibility =
_runtime_state->func_version() <= 3 || !_runtime_state->enable_pipeline_engine();

// since partial chunk is currently very lightweight (a bunch of const columns), use every runtime filter if possible
// without computing each rf's selectivity
for (auto kv : _descriptors) {
RuntimeFilterProbeDescriptor* rf_desc = kv.second;
const JoinRuntimeFilter* filter = rf_desc->runtime_filter();
if (filter == nullptr || filter->always_true()) {
continue;
}

auto only_reference_existent_slots = [&](ExprContext* expr) {
std::vector<SlotId> slot_ids;
int n = expr->root()->get_slot_ids(&slot_ids);
DCHECK(slot_ids.size() == n);

// do not allow struct subfield
if (expr->root()->get_subfields(nullptr) > 0) {
return false;
}

for (auto slot_id : slot_ids) {
if (!partial_chunk->is_slot_exist(slot_id)) {
return false;
}
}

return true;
};

auto* probe_expr = rf_desc->probe_expr_ctx();
auto* partition_by_exprs = rf_desc->partition_by_expr_contexts();

bool can_use_rf_on_partial_chunk = only_reference_existent_slots(probe_expr);
for (auto* part_by_expr : *partition_by_exprs) {
can_use_rf_on_partial_chunk &= only_reference_existent_slots(part_by_expr);
}

// skip runtime filter that references a non-existent column for the partial chunk
if (!can_use_rf_on_partial_chunk) {
continue;
}

ColumnPtr column = EVALUATE_NULL_IF_ERROR(probe_expr, probe_expr->root(), partial_chunk);
// for colocate grf
compute_hash_values(partial_chunk, column.get(), rf_desc, eval_context);
filter->evaluate(column.get(), &eval_context.running_context);

auto true_count = SIMD::count_nonzero(selection);
eval_context.run_filter_nums += 1;

if (true_count == 0) {
partial_chunk->set_num_rows(0);
return;
} else {
partial_chunk->filter(selection);
}
}
}

void RuntimeFilterProbeCollector::init_counter() {
_eval_context.join_runtime_filter_timer = ADD_TIMER(_runtime_profile, "JoinRuntimeFilterTime");
_eval_context.join_runtime_filter_hash_timer = ADD_TIMER(_runtime_profile, "JoinRuntimeFilterHashTime");
Expand Down Expand Up @@ -456,6 +522,23 @@ void RuntimeFilterProbeCollector::evaluate(Chunk* chunk, RuntimeBloomFilterEvalC
}
}

void RuntimeFilterProbeCollector::evaluate_partial_chunk(Chunk* partial_chunk,
RuntimeBloomFilterEvalContext& eval_context) {
if (_descriptors.empty()) return;
size_t before = partial_chunk->num_rows();
if (before == 0) return;

{
SCOPED_TIMER(eval_context.join_runtime_filter_timer);
eval_context.join_runtime_filter_input_counter->update(before);
eval_context.run_filter_nums = 0;
do_evaluate_partial_chunk(partial_chunk, eval_context);
size_t after = partial_chunk->num_rows();
eval_context.join_runtime_filter_output_counter->update(after);
eval_context.join_runtime_filter_eval_counter->update(eval_context.run_filter_nums);
}
}

void RuntimeFilterProbeCollector::compute_hash_values(Chunk* chunk, Column* column,
RuntimeFilterProbeDescriptor* rf_desc,
RuntimeBloomFilterEvalContext& eval_context) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/runtime_filter_bank.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class RuntimeFilterProbeCollector {
RuntimeBloomFilterEvalContext& eval_context);
void evaluate(Chunk* chunk);
void evaluate(Chunk* chunk, RuntimeBloomFilterEvalContext& eval_context);
// evaluate partial chunk that may not contain slots referenced by runtime filter
void evaluate_partial_chunk(Chunk* partial_chunk, RuntimeBloomFilterEvalContext& eval_context);
void add_descriptor(RuntimeFilterProbeDescriptor* desc);
// accept RuntimeFilterCollector from parent node
// which means parent node to push down runtime filter.
Expand Down Expand Up @@ -247,6 +249,7 @@ class RuntimeFilterProbeCollector {
// TODO: return a funcion call status
void do_evaluate(Chunk* chunk);
void do_evaluate(Chunk* chunk, RuntimeBloomFilterEvalContext& eval_context);
void do_evaluate_partial_chunk(Chunk* partial_chunk, RuntimeBloomFilterEvalContext& eval_context);
// mapping from filter id to runtime filter descriptor.
std::map<int32_t, RuntimeFilterProbeDescriptor*> _descriptors;
int _wait_timeout_ms = 0;
Expand Down
6 changes: 4 additions & 2 deletions be/src/exprs/subfield_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class SubfieldExpr final : public Expr {
Expr* clone(ObjectPool* pool) const override { return pool->add(new SubfieldExpr(*this)); }

int get_subfields(std::vector<std::vector<std::string>>* subfields) const override {
subfields->push_back(_used_subfield_names);
if (subfields != nullptr) {
subfields->push_back(_used_subfield_names);
}
return 1;
}

Expand All @@ -97,4 +99,4 @@ Expr* SubfieldExprFactory::from_thrift(const TExprNode& node) {
return new SubfieldExpr(node);
}

} // namespace starrocks
} // namespace starrocks
16 changes: 0 additions & 16 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,22 +404,6 @@ StatusOr<bool> FileReader::_filter_group(const tparquet::RowGroup& row_group) {
if (discard) {
return true;
}

// skip topn runtime filter, because it has taken effect on min/max filtering
// if row-group contains exactly one value(i.e. min_value = max_value), use bloom filter to test
if (!filter->always_true() && min_chunk->columns()[0]->equals(0, *max_chunk->columns()[0], 0)) {
ColumnPtr& chunk_part_column = min_chunk->columns()[0];
JoinRuntimeFilter::RunningContext ctx;
ctx.use_merged_selection = false;
ctx.compatibility = false;
auto& selection = ctx.selection;
selection.assign(chunk_part_column->size(), 1);
filter->compute_hash({chunk_part_column.get()}, &ctx);
filter->evaluate(chunk_part_column.get(), &ctx);
if (selection[0] == 0) {
return true;
}
}
}
}

Expand Down
Loading
Loading