Skip to content

Commit

Permalink
Storages: move ann_query_info to PushDownExecutor (#9741)
Browse files Browse the repository at this point in the history
ref #6233

1. move `ann_query_info` to `PushDownFilter`
2. rename `PushDownFilter` to `PushDownExecutor`

Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger authored Dec 27, 2024
1 parent f0faa0c commit 9836c27
Show file tree
Hide file tree
Showing 40 changed files with 351 additions and 322 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
rf_max_wait_time_ms,
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where]
= analyzer->buildPushDownFilter(filter_conditions->conditions);
= analyzer->buildPushDownExecutor(filter_conditions->conditions);
BlockInputStreams ins = storage->read(
column_names,
query_info,
Expand Down Expand Up @@ -273,7 +273,7 @@ void MockStorage::buildExecFromDeltaMerge(
rf_max_wait_time_ms,
context.getTimezoneInfo());
// Not using `auto [before_where, filter_column_name, project_after_where]` just to make the compiler happy.
auto build_ret = analyzer->buildPushDownFilter(filter_conditions->conditions);
auto build_ret = analyzer->buildPushDownExecutor(filter_conditions->conditions);
storage->read(
exec_context_,
group_builder,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ String DAGExpressionAnalyzer::buildFilterColumn(
return filter_column_name;
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGExpressionAnalyzer::buildPushDownFilter(
std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGExpressionAnalyzer::buildPushDownExecutor(
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownExecutor(
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ void executePushedDownFilter(
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where]
= analyzer.buildPushDownFilter(filter_conditions.conditions, true);
= analyzer.buildPushDownExecutor(filter_conditions.conditions, true);

for (auto & stream : pipeline.streams)
{
Expand All @@ -464,7 +464,7 @@ void executePushedDownFilter(
LoggerPtr log)
{
auto [before_where, filter_column_name, project_after_where]
= analyzer.buildPushDownFilter(filter_conditions.conditions, true);
= analyzer.buildPushDownExecutor(filter_conditions.conditions, true);

auto input_header = group_builder.getCurrentHeader();
for (size_t i = 0; i < group_builder.concurrency(); ++i)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/tests/gtest_filter_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ try
}
CATCH

TEST_F(FilterExecutorTestRunner, convert_bool)
TEST_F(FilterExecutorTestRunner, convertBool)
try
{
{
Expand Down Expand Up @@ -282,7 +282,7 @@ try
}
CATCH

TEST_F(FilterExecutorTestRunner, PushDownFilter)
TEST_F(FilterExecutorTestRunner, PushDownExecutor)
try
{
context.mockStorage()->setUseDeltaMerge(true);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Operators/DMSegmentThreadSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
const DM::SegmentReadTaskPoolPtr & task_pool_,
DM::AfterSegmentRead after_segment_read_,
const DM::ColumnDefines & columns_to_read_,
const DM::PushDownFilterPtr & filter_,
const DM::PushDownExecutorPtr & executor_,
UInt64 start_ts_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
Expand All @@ -40,7 +40,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
, task_pool(task_pool_)
, after_segment_read(after_segment_read_)
, columns_to_read(columns_to_read_)
, filter(filter_)
, executor(executor_)
, start_ts(start_ts_)
, expected_block_size(expected_block_size_)
, read_mode(read_mode_)
Expand Down Expand Up @@ -100,7 +100,7 @@ OperatorStatus DMSegmentThreadSourceOp::executeIOImpl()
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
executor,
start_ts,
block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Operators/DMSegmentThreadSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DMSegmentThreadSourceOp : public SourceOp
const DM::SegmentReadTaskPoolPtr & task_pool_,
DM::AfterSegmentRead after_segment_read_,
const DM::ColumnDefines & columns_to_read_,
const DM::PushDownFilterPtr & filter_,
const DM::PushDownExecutorPtr & executor_,
UInt64 start_ts_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
Expand All @@ -56,7 +56,7 @@ class DMSegmentThreadSourceOp : public SourceOp
DM::SegmentReadTaskPoolPtr task_pool;
DM::AfterSegmentRead after_segment_read;
DM::ColumnDefines columns_to_read;
DM::PushDownFilterPtr filter;
DM::PushDownExecutorPtr executor;
const UInt64 start_ts;
const size_t expected_block_size;
const DM::ReadMode read_mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <Columns/countBytesInFilter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/Filter/WithANNQueryInfo.h>


namespace DB::DM
Expand Down
22 changes: 12 additions & 10 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,12 @@ Block ConcatVectorIndexBlockInputStream::read()
return block;
}

SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
std::tuple<SkippableBlockInputStreamPtr, bool> ConcatVectorIndexBlockInputStream::build(
const BitmapFilterPtr & bitmap_filter,
std::shared_ptr<ConcatSkippableBlockInputStream<false>> stream,
const ANNQueryInfoPtr & ann_query_info)
{
if (!ann_query_info)
return stream;
assert(ann_query_info != nullptr);
bool has_vector_index_stream = false;
std::vector<VectorIndexBlockInputStream *> index_streams;
index_streams.reserve(stream->children.size());
Expand All @@ -287,13 +286,16 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
index_streams.push_back(nullptr);
}
if (!has_vector_index_stream)
return stream;

return std::make_shared<ConcatVectorIndexBlockInputStream>(
bitmap_filter,
stream,
std::move(index_streams),
ann_query_info->top_k());
return {stream, false};

return {
std::make_shared<ConcatVectorIndexBlockInputStream>(
bitmap_filter,
stream,
std::move(index_streams),
ann_query_info->top_k()),
true,
};
}

} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream
, bitmap_filter(bitmap_filter_)
{}

static SkippableBlockInputStreamPtr build(
// Returns <InputStream, is_vector_stream>
static std::tuple<SkippableBlockInputStreamPtr, bool> build(
const BitmapFilterPtr & bitmap_filter,
std::shared_ptr<ConcatSkippableBlockInputStream<false>> stream,
const ANNQueryInfoPtr & ann_query_info);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const SegmentReadTaskPoolPtr & task_pool_,
AfterSegmentRead after_segment_read_,
const ColumnDefines & columns_to_read_,
const PushDownFilterPtr & filter_,
const PushDownExecutorPtr & filter_,
UInt64 start_ts_,
size_t expected_block_size_,
ReadMode read_mode_,
Expand Down Expand Up @@ -127,7 +127,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
SegmentReadTaskPoolPtr task_pool;
AfterSegmentRead after_segment_read;
ColumnDefines columns_to_read;
PushDownFilterPtr filter;
PushDownExecutorPtr filter;
Block header;
const UInt64 start_ts;
const size_t expected_block_size;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
Expand Down Expand Up @@ -1228,12 +1228,12 @@ ReadMode DeltaMergeStore::getReadMode(
const Context & db_context,
bool is_fast_scan,
bool keep_order,
const PushDownFilterPtr & filter)
const PushDownExecutorPtr & filter)
{
auto read_mode = getReadModeImpl(db_context, is_fast_scan, keep_order);
RUNTIME_CHECK_MSG(
!filter || !filter->before_where || read_mode == ReadMode::Bitmap,
"Push down filters needs bitmap, push down filters is empty: {}, read mode: {}",
"Push down executor needs bitmap, push down executor is empty: {}, read mode: {}",
filter == nullptr || filter->before_where == nullptr,
magic_enum::enum_name(read_mode));
return read_mode;
Expand All @@ -1246,7 +1246,7 @@ BlockInputStreams DeltaMergeStore::read(
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand Down Expand Up @@ -1332,7 +1332,7 @@ BlockInputStreams DeltaMergeStore::read(
LOG_INFO(
tracing_logger,
"Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
"is_fast_scan={} is_push_down_executor_empty={} pool_id={} num_streams={} columns_to_read={} "
"final_columns_to_read={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
Expand All @@ -1356,7 +1356,7 @@ void DeltaMergeStore::read(
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand Down Expand Up @@ -1452,7 +1452,7 @@ void DeltaMergeStore::read(
LOG_INFO(
tracing_logger,
"Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
"is_fast_scan={} is_push_down_executor_empty={} pool_id={} num_streams={} columns_to_read={} "
"final_columns_to_read={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/DeltaMerge/Filter/RSOperator_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
Expand Down Expand Up @@ -460,7 +460,7 @@ class DeltaMergeStore
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand All @@ -485,7 +485,7 @@ class DeltaMergeStore
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand Down Expand Up @@ -588,7 +588,7 @@ class DeltaMergeStore
const Context & db_context,
bool is_fast_scan,
bool keep_order,
const PushDownFilterPtr & filter);
const PushDownExecutorPtr & filter);

// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,38 @@
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/SelectQueryInfo.h>
#include <TiDB/Decode/TypeMapping.h>

namespace DB::DM
{
PushDownFilterPtr PushDownFilter::build(
PushDownExecutorPtr PushDownExecutor::build(
const RSOperatorPtr & rs_operator,
const ANNQueryInfoPtr & ann_query_info,
const TiDB::ColumnInfos & table_scan_column_info,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters,
const ColumnDefines & columns_to_read,
const Context & context,
const LoggerPtr & tracing_logger)
{
// check if the ann_query_info is valid
auto valid_ann_query_info = ann_query_info;
if (ann_query_info)
{
bool is_valid_ann_query = ann_query_info->top_k() != std::numeric_limits<UInt32>::max();
bool is_matching_ann_query = std::any_of(
columns_to_read.begin(),
columns_to_read.end(),
[cid = ann_query_info->column_id()](const ColumnDefine & cd) -> bool { return cd.id == cid; });
if (!is_valid_ann_query || !is_matching_ann_query)
valid_ann_query_info = nullptr;
}

if (pushed_down_filters.empty())
{
LOG_DEBUG(tracing_logger, "Push down filter is empty");
return std::make_shared<PushDownFilter>(rs_operator);
return std::make_shared<PushDownExecutor>(rs_operator, valid_ann_query_info);
}
std::unordered_map<ColumnID, ColumnDefine> columns_to_read_map;
for (const auto & column : columns_to_read)
Expand Down Expand Up @@ -120,7 +134,7 @@ PushDownFilterPtr PushDownFilter::build(
}

// build filter expression actions
auto [before_where, filter_column_name, project_after_where] = analyzer->buildPushDownFilter(pushed_down_filters);
auto [before_where, filter_column_name, project_after_where] = analyzer->buildPushDownExecutor(pushed_down_filters);
LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());

// record current column defines
Expand All @@ -145,8 +159,9 @@ PushDownFilterPtr PushDownFilter::build(
}
}

return std::make_shared<PushDownFilter>(
return std::make_shared<PushDownExecutor>(
rs_operator,
valid_ann_query_info,
before_where,
project_after_where,
filter_columns,
Expand All @@ -155,7 +170,7 @@ PushDownFilterPtr PushDownFilter::build(
columns_after_cast);
}

PushDownFilterPtr PushDownFilter::build(
PushDownExecutorPtr PushDownExecutor::build(
const SelectQueryInfo & query_info,
const ColumnDefines & columns_to_read,
const ColumnDefines & table_column_defines,
Expand All @@ -174,6 +189,10 @@ PushDownFilterPtr PushDownFilter::build(
table_column_defines,
context.getSettingsRef().dt_enable_rough_set_filter,
tracing_logger);
// build ann_query_info
ANNQueryInfoPtr ann_query_info = nullptr;
if (dag_query->ann_query_info.query_type() != tipb::ANNQueryType::InvalidQueryType)
ann_query_info = std::make_shared<tipb::ANNQueryInfo>(dag_query->ann_query_info);
// build push down filter
const auto & pushed_down_filters = dag_query->pushed_down_filters;
if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty())
Expand All @@ -182,16 +201,18 @@ PushDownFilterPtr PushDownFilter::build(
pushed_down_filters.begin(),
pushed_down_filters.end()};
merged_filters.MergeFrom(dag_query->filters);
return PushDownFilter::build(
return PushDownExecutor::build(
rs_operator,
ann_query_info,
columns_to_read_info,
merged_filters,
columns_to_read,
context,
tracing_logger);
}
return PushDownFilter::build(
return PushDownExecutor::build(
rs_operator,
ann_query_info,
columns_to_read_info,
pushed_down_filters,
columns_to_read,
Expand Down
Loading

0 comments on commit 9836c27

Please sign in to comment.