Skip to content

Commit

Permalink
[Feature] Support dynamic partition pruning (backport #30319) (#49454)
Browse files Browse the repository at this point in the history
Signed-off-by: Letian Jiang <[email protected]>
  • Loading branch information
letian-jiang authored Aug 7, 2024
1 parent 8e6873b commit 1669bf7
Show file tree
Hide file tree
Showing 16 changed files with 326 additions and 121 deletions.
6 changes: 4 additions & 2 deletions be/src/connector/connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <unordered_map>

#include "exec/pipeline/scan/morsel.h"
#include "exprs/runtime_filter_bank.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -69,7 +70,7 @@ class DataSource {
_runtime_profile->add_info_string("DataSourceType", name());
}
void set_predicates(const std::vector<ExprContext*>& predicates) { _conjunct_ctxs = predicates; }
void set_runtime_filters(const RuntimeFilterProbeCollector* runtime_filters) { _runtime_filters = runtime_filters; }
void set_runtime_filters(RuntimeFilterProbeCollector* runtime_filters) { _runtime_filters = runtime_filters; }
void set_read_limit(const uint64_t limit) { _read_limit = limit; }
void set_split_context(pipeline::ScanSplitContext* split_context) { _split_context = split_context; }
Status parse_runtime_filters(RuntimeState* state);
Expand All @@ -87,7 +88,8 @@ class DataSource {
int64_t _read_limit = -1; // no limit
bool _has_any_predicate = false;
std::vector<ExprContext*> _conjunct_ctxs;
const RuntimeFilterProbeCollector* _runtime_filters = nullptr;
RuntimeFilterProbeCollector* _runtime_filters = nullptr;
RuntimeBloomFilterEvalContext runtime_bloom_filter_eval_context;
RuntimeProfile* _runtime_profile = nullptr;
TupleDescriptor* _tuple_desc = nullptr;
pipeline::ScanSplitContext* _split_context = nullptr;
Expand Down
85 changes: 54 additions & 31 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ Status HiveDataSource::open(RuntimeState* state) {
if (state->query_options().__isset.enable_connector_split_io_tasks) {
_enable_split_tasks = state->query_options().enable_connector_split_io_tasks;
}
if (state->query_options().__isset.enable_dynamic_prune_scan_range) {
_enable_dynamic_prune_scan_range = state->query_options().enable_dynamic_prune_scan_range;
}

RETURN_IF_ERROR(_init_conjunct_ctxs(state));
_init_tuples_and_slots(state);
Expand Down Expand Up @@ -171,44 +174,46 @@ Status HiveDataSource::_init_partition_values() {
}

const auto& partition_values = partition_desc->partition_key_value_evals();
_partition_values = partition_values;

if (_has_partition_conjuncts || _has_scan_range_indicate_const_column) {
ChunkPtr partition_chunk = ChunkHelper::new_chunk(_partition_slots, 1);
// append partition data
for (int i = 0; i < _partition_slots.size(); i++) {
SlotId slot_id = _partition_slots[i]->id();
int partition_col_idx = _partition_index_in_hdfs_partition_columns[i];
ASSIGN_OR_RETURN(auto partition_value_col, partition_values[partition_col_idx]->evaluate(nullptr));
assert(partition_value_col->is_constant());
auto* const_column = ColumnHelper::as_raw_column<ConstColumn>(partition_value_col);
const ColumnPtr& data_column = const_column->data_column();
ColumnPtr& chunk_part_column = partition_chunk->get_column_by_slot_id(slot_id);
if (data_column->is_nullable()) {
chunk_part_column->append_default();
} else {
chunk_part_column->append(*data_column, 0, 1);
_partition_values = partition_desc->partition_key_value_evals();

// init partition chunk
auto partition_chunk = std::make_shared<Chunk>();
for (int i = 0; i < _partition_slots.size(); i++) {
SlotId slot_id = _partition_slots[i]->id();
int partition_col_idx = _partition_index_in_hdfs_partition_columns[i];
ASSIGN_OR_RETURN(auto partition_value_col, partition_values[partition_col_idx]->evaluate(nullptr));
DCHECK(partition_value_col->is_constant());
partition_chunk->append_column(partition_value_col, slot_id);
}

// eval conjuncts and skip if no rows.
if (_has_scan_range_indicate_const_column) {
std::vector<ExprContext*> ctxs;
for (SlotId slotId : _scan_range.identity_partition_slot_ids) {
if (_conjunct_ctxs_by_slot.find(slotId) != _conjunct_ctxs_by_slot.end()) {
ctxs.insert(ctxs.end(), _conjunct_ctxs_by_slot.at(slotId).begin(),
_conjunct_ctxs_by_slot.at(slotId).end());
}
}
RETURN_IF_ERROR(ExecNode::eval_conjuncts(ctxs, partition_chunk.get()));
} else if (_has_partition_conjuncts) {
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_partition_conjunct_ctxs, partition_chunk.get()));
}

// eval conjuncts and skip if no rows.
if (_has_scan_range_indicate_const_column) {
std::vector<ExprContext*> ctxs;
for (SlotId slotId : _scan_range.identity_partition_slot_ids) {
if (_conjunct_ctxs_by_slot.find(slotId) != _conjunct_ctxs_by_slot.end()) {
ctxs.insert(ctxs.end(), _conjunct_ctxs_by_slot.at(slotId).begin(),
_conjunct_ctxs_by_slot.at(slotId).end());
}
}
RETURN_IF_ERROR(ExecNode::eval_conjuncts(ctxs, partition_chunk.get()));
} else {
RETURN_IF_ERROR(ExecNode::eval_conjuncts(_partition_conjunct_ctxs, partition_chunk.get()));
}
if (!partition_chunk->has_rows()) {
_filter_by_eval_partition_conjuncts = true;
return Status::OK();
}

if (_enable_dynamic_prune_scan_range && _runtime_filters) {
_init_rf_counters();
_runtime_filters->evaluate_partial_chunk(partition_chunk.get(), runtime_bloom_filter_eval_context);
if (!partition_chunk->has_rows()) {
_filter_by_eval_partition_conjuncts = true;
return Status::OK();
}
}

return Status::OK();
}

Expand Down Expand Up @@ -453,6 +458,24 @@ void HiveDataSource::_init_counter(RuntimeState* state) {
}
}

void HiveDataSource::_init_rf_counters() {
auto* root = _runtime_profile;
if (runtime_bloom_filter_eval_context.join_runtime_filter_timer == nullptr) {
static const char* prefix = "DynamicPruneScanRange";
ADD_COUNTER(root, prefix, TUnit::NONE);
runtime_bloom_filter_eval_context.join_runtime_filter_timer =
ADD_CHILD_TIMER(root, "JoinRuntimeFilterTime", prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_hash_timer =
ADD_CHILD_TIMER(root, "JoinRuntimeFilterHashTime", prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_input_counter =
ADD_CHILD_COUNTER(root, "JoinRuntimeFilterInputScanRanges", TUnit::UNIT, prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_output_counter =
ADD_CHILD_COUNTER(root, "JoinRuntimeFilterOutputScanRanges", TUnit::UNIT, prefix);
runtime_bloom_filter_eval_context.join_runtime_filter_eval_counter =
ADD_CHILD_COUNTER(root, "JoinRuntimeFilterEvaluate", TUnit::UNIT, prefix);
}
}

Status HiveDataSource::_init_scanner(RuntimeState* state) {
SCOPED_TIMER(_profile.open_file_timer);

Expand Down Expand Up @@ -603,7 +626,7 @@ void HiveDataSource::close(RuntimeState* state) {
if (!_scanner->has_split_tasks()) {
COUNTER_UPDATE(_profile.scan_ranges_counter, 1);
}
_scanner->close(state);
_scanner->close();
}
Expr::close(_min_max_conjunct_ctxs, state);
Expr::close(_partition_conjunct_ctxs, state);
Expand Down
2 changes: 2 additions & 0 deletions be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class HiveDataSource final : public DataSource {
Status _decompose_conjunct_ctxs(RuntimeState* state);
void _init_tuples_and_slots(RuntimeState* state);
void _init_counter(RuntimeState* state);
void _init_rf_counters();

Status _init_partition_values();
Status _init_scanner(RuntimeState* state);
Expand All @@ -111,6 +112,7 @@ class HiveDataSource final : public DataSource {
int32_t _datacache_evict_probability = 0;
bool _use_file_metacache = false;
bool _enable_split_tasks = false;
bool _enable_dynamic_prune_scan_range = true;

// ============ conjuncts =================
std::vector<ExprContext*> _min_max_conjunct_ctxs;
Expand Down
14 changes: 6 additions & 8 deletions be/src/exec/hdfs_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,26 +193,24 @@ Status HdfsScanner::open(RuntimeState* runtime_state) {
return Status::OK();
}

void HdfsScanner::close(RuntimeState* runtime_state) noexcept {
void HdfsScanner::close() noexcept {
VLOG_FILE << "close file success: " << _scanner_params.path << ", scan range = ["
<< _scanner_params.scan_range->offset << ","
<< (_scanner_params.scan_range->length + _scanner_params.scan_range->offset)
<< "], rows = " << _app_stats.rows_read;

if (!_runtime_state) {
return;
}

bool expect = false;
if (!_closed.compare_exchange_strong(expect, true)) return;
update_counter();
do_close(runtime_state);
do_close(_runtime_state);
_file.reset(nullptr);
_mor_processor->close(_runtime_state);
}

void HdfsScanner::finalize() {
if (_runtime_state != nullptr) {
close(_runtime_state);
}
}

Status HdfsScanner::open_random_access_file() {
CHECK(_file == nullptr) << "File has already been opened";
ASSIGN_OR_RETURN(std::unique_ptr<RandomAccessFile> raw_file,
Expand Down
6 changes: 3 additions & 3 deletions be/src/exec/hdfs_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "exec/mor_processor.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/runtime_filter_bank.h"
#include "fs/fs.h"
#include "io/cache_input_stream.h"
#include "io/shared_buffered_input_stream.h"
Expand Down Expand Up @@ -330,11 +331,10 @@ class HdfsScanner {
HdfsScanner() = default;
virtual ~HdfsScanner() = default;

Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
Status open(RuntimeState* runtime_state);
void close(RuntimeState* runtime_state) noexcept;
Status get_next(RuntimeState* runtime_state, ChunkPtr* chunk);
Status init(RuntimeState* runtime_state, const HdfsScannerParams& scanner_params);
void finalize();
void close() noexcept;

int64_t num_bytes_read() const { return _app_stats.bytes_read; }
int64_t raw_rows_read() const { return _app_stats.raw_rows_read; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/jni_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class JniScanner : public HdfsScanner {
JniScanner(std::string factory_class, std::map<std::string, std::string> params)
: _jni_scanner_params(std::move(params)), _jni_scanner_factory_class(std::move(factory_class)) {}

~JniScanner() override { finalize(); }
~JniScanner() override { close(); }

[[nodiscard]] Status do_open(RuntimeState* runtime_state) override;
void do_update_counter(HdfsScanProfile* profile) override;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class ConnectorChunkSource : public ChunkSource {

const int64_t _limit; // -1: no limit
const std::vector<ExprContext*>& _runtime_in_filters;
const RuntimeFilterProbeCollector* _runtime_bloom_filters;
RuntimeFilterProbeCollector* _runtime_bloom_filters;

// copied from scan node and merge predicates from runtime filter.
std::vector<ExprContext*> _conjunct_ctxs;
Expand Down
83 changes: 83 additions & 0 deletions be/src/exprs/runtime_filter_bank.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,72 @@ void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEv
}
}
}

void RuntimeFilterProbeCollector::do_evaluate_partial_chunk(Chunk* partial_chunk,
RuntimeBloomFilterEvalContext& eval_context) {
auto& selection = eval_context.running_context.selection;
eval_context.running_context.use_merged_selection = false;
eval_context.running_context.compatibility =
_runtime_state->func_version() <= 3 || !_runtime_state->enable_pipeline_engine();

// since partial chunk is currently very lightweight (a bunch of const columns), use every runtime filter if possible
// without computing each rf's selectivity
for (auto kv : _descriptors) {
RuntimeFilterProbeDescriptor* rf_desc = kv.second;
const JoinRuntimeFilter* filter = rf_desc->runtime_filter();
if (filter == nullptr || filter->always_true()) {
continue;
}

auto only_reference_existent_slots = [&](ExprContext* expr) {
std::vector<SlotId> slot_ids;
int n = expr->root()->get_slot_ids(&slot_ids);
DCHECK(slot_ids.size() == n);

// do not allow struct subfield
if (expr->root()->get_subfields(nullptr) > 0) {
return false;
}

for (auto slot_id : slot_ids) {
if (!partial_chunk->is_slot_exist(slot_id)) {
return false;
}
}

return true;
};

auto* probe_expr = rf_desc->probe_expr_ctx();
auto* partition_by_exprs = rf_desc->partition_by_expr_contexts();

bool can_use_rf_on_partial_chunk = only_reference_existent_slots(probe_expr);
for (auto* part_by_expr : *partition_by_exprs) {
can_use_rf_on_partial_chunk &= only_reference_existent_slots(part_by_expr);
}

// skip runtime filter that references a non-existent column for the partial chunk
if (!can_use_rf_on_partial_chunk) {
continue;
}

ColumnPtr column = EVALUATE_NULL_IF_ERROR(probe_expr, probe_expr->root(), partial_chunk);
// for colocate grf
compute_hash_values(partial_chunk, column.get(), rf_desc, eval_context);
filter->evaluate(column.get(), &eval_context.running_context);

auto true_count = SIMD::count_nonzero(selection);
eval_context.run_filter_nums += 1;

if (true_count == 0) {
partial_chunk->set_num_rows(0);
return;
} else {
partial_chunk->filter(selection);
}
}
}

void RuntimeFilterProbeCollector::init_counter() {
_eval_context.join_runtime_filter_timer = ADD_TIMER(_runtime_profile, "JoinRuntimeFilterTime");
_eval_context.join_runtime_filter_hash_timer = ADD_TIMER(_runtime_profile, "JoinRuntimeFilterHashTime");
Expand Down Expand Up @@ -456,6 +522,23 @@ void RuntimeFilterProbeCollector::evaluate(Chunk* chunk, RuntimeBloomFilterEvalC
}
}

void RuntimeFilterProbeCollector::evaluate_partial_chunk(Chunk* partial_chunk,
RuntimeBloomFilterEvalContext& eval_context) {
if (_descriptors.empty()) return;
size_t before = partial_chunk->num_rows();
if (before == 0) return;

{
SCOPED_TIMER(eval_context.join_runtime_filter_timer);
eval_context.join_runtime_filter_input_counter->update(before);
eval_context.run_filter_nums = 0;
do_evaluate_partial_chunk(partial_chunk, eval_context);
size_t after = partial_chunk->num_rows();
eval_context.join_runtime_filter_output_counter->update(after);
eval_context.join_runtime_filter_eval_counter->update(eval_context.run_filter_nums);
}
}

void RuntimeFilterProbeCollector::compute_hash_values(Chunk* chunk, Column* column,
RuntimeFilterProbeDescriptor* rf_desc,
RuntimeBloomFilterEvalContext& eval_context) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/exprs/runtime_filter_bank.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class RuntimeFilterProbeCollector {
RuntimeBloomFilterEvalContext& eval_context);
void evaluate(Chunk* chunk);
void evaluate(Chunk* chunk, RuntimeBloomFilterEvalContext& eval_context);
// evaluate partial chunk that may not contain slots referenced by runtime filter
void evaluate_partial_chunk(Chunk* partial_chunk, RuntimeBloomFilterEvalContext& eval_context);
void add_descriptor(RuntimeFilterProbeDescriptor* desc);
// accept RuntimeFilterCollector from parent node
// which means parent node to push down runtime filter.
Expand Down Expand Up @@ -247,6 +249,7 @@ class RuntimeFilterProbeCollector {
// TODO: return a funcion call status
void do_evaluate(Chunk* chunk);
void do_evaluate(Chunk* chunk, RuntimeBloomFilterEvalContext& eval_context);
void do_evaluate_partial_chunk(Chunk* partial_chunk, RuntimeBloomFilterEvalContext& eval_context);
// mapping from filter id to runtime filter descriptor.
std::map<int32_t, RuntimeFilterProbeDescriptor*> _descriptors;
int _wait_timeout_ms = 0;
Expand Down
6 changes: 4 additions & 2 deletions be/src/exprs/subfield_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class SubfieldExpr final : public Expr {
Expr* clone(ObjectPool* pool) const override { return pool->add(new SubfieldExpr(*this)); }

int get_subfields(std::vector<std::vector<std::string>>* subfields) const override {
subfields->push_back(_used_subfield_names);
if (subfields != nullptr) {
subfields->push_back(_used_subfield_names);
}
return 1;
}

Expand All @@ -97,4 +99,4 @@ Expr* SubfieldExprFactory::from_thrift(const TExprNode& node) {
return new SubfieldExpr(node);
}

} // namespace starrocks
} // namespace starrocks
16 changes: 0 additions & 16 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,22 +404,6 @@ StatusOr<bool> FileReader::_filter_group(const tparquet::RowGroup& row_group) {
if (discard) {
return true;
}

// skip topn runtime filter, because it has taken effect on min/max filtering
// if row-group contains exactly one value(i.e. min_value = max_value), use bloom filter to test
if (!filter->always_true() && min_chunk->columns()[0]->equals(0, *max_chunk->columns()[0], 0)) {
ColumnPtr& chunk_part_column = min_chunk->columns()[0];
JoinRuntimeFilter::RunningContext ctx;
ctx.use_merged_selection = false;
ctx.compatibility = false;
auto& selection = ctx.selection;
selection.assign(chunk_part_column->size(), 1);
filter->compute_hash({chunk_part_column.get()}, &ctx);
filter->evaluate(chunk_part_column.get(), &ctx);
if (selection[0] == 0) {
return true;
}
}
}
}

Expand Down
Loading

0 comments on commit 1669bf7

Please sign in to comment.