Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: move ann_query_info to PushDownExecutor #9741

Merged
merged 20 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(
rf_max_wait_time_ms,
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where]
= analyzer->buildPushDownFilter(filter_conditions->conditions);
= analyzer->buildPushDownExecutor(filter_conditions->conditions);
BlockInputStreams ins = storage->read(
column_names,
query_info,
Expand Down Expand Up @@ -273,7 +273,7 @@ void MockStorage::buildExecFromDeltaMerge(
rf_max_wait_time_ms,
context.getTimezoneInfo());
// Not using `auto [before_where, filter_column_name, project_after_where]` just to make the compiler happy.
auto build_ret = analyzer->buildPushDownFilter(filter_conditions->conditions);
auto build_ret = analyzer->buildPushDownExecutor(filter_conditions->conditions);
storage->read(
exec_context_,
group_builder,
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ String DAGExpressionAnalyzer::buildFilterColumn(
return filter_column_name;
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGExpressionAnalyzer::buildPushDownFilter(
std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGExpressionAnalyzer::buildPushDownExecutor(
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownFilter(
std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> buildPushDownExecutor(
const google::protobuf::RepeatedPtrField<tipb::Expr> & conditions,
bool null_as_false = false);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ void executePushedDownFilter(
DAGPipeline & pipeline)
{
auto [before_where, filter_column_name, project_after_where]
= analyzer.buildPushDownFilter(filter_conditions.conditions, true);
= analyzer.buildPushDownExecutor(filter_conditions.conditions, true);

for (auto & stream : pipeline.streams)
{
Expand All @@ -464,7 +464,7 @@ void executePushedDownFilter(
LoggerPtr log)
{
auto [before_where, filter_column_name, project_after_where]
= analyzer.buildPushDownFilter(filter_conditions.conditions, true);
= analyzer.buildPushDownExecutor(filter_conditions.conditions, true);

auto input_header = group_builder.getCurrentHeader();
for (size_t i = 0; i < group_builder.concurrency(); ++i)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/tests/gtest_filter_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ try
}
CATCH

TEST_F(FilterExecutorTestRunner, convert_bool)
TEST_F(FilterExecutorTestRunner, convertBool)
try
{
{
Expand Down Expand Up @@ -282,7 +282,7 @@ try
}
CATCH

TEST_F(FilterExecutorTestRunner, PushDownFilter)
TEST_F(FilterExecutorTestRunner, PushDownExecutor)
try
{
context.mockStorage()->setUseDeltaMerge(true);
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Operators/DMSegmentThreadSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
const DM::SegmentReadTaskPoolPtr & task_pool_,
DM::AfterSegmentRead after_segment_read_,
const DM::ColumnDefines & columns_to_read_,
const DM::PushDownFilterPtr & filter_,
const DM::PushDownExecutorPtr & executor_,
UInt64 start_ts_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
Expand All @@ -40,7 +40,7 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
, task_pool(task_pool_)
, after_segment_read(after_segment_read_)
, columns_to_read(columns_to_read_)
, filter(filter_)
, executor(executor_)
, start_ts(start_ts_)
, expected_block_size(expected_block_size_)
, read_mode(read_mode_)
Expand Down Expand Up @@ -100,7 +100,7 @@ OperatorStatus DMSegmentThreadSourceOp::executeIOImpl()
columns_to_read,
task->read_snapshot,
task->ranges,
filter,
executor,
start_ts,
block_size);
LOG_TRACE(log, "Start to read segment, segment={}", cur_segment->simpleInfo());
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Operators/DMSegmentThreadSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DMSegmentThreadSourceOp : public SourceOp
const DM::SegmentReadTaskPoolPtr & task_pool_,
DM::AfterSegmentRead after_segment_read_,
const DM::ColumnDefines & columns_to_read_,
const DM::PushDownFilterPtr & filter_,
const DM::PushDownExecutorPtr & executor_,
UInt64 start_ts_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
Expand All @@ -56,7 +56,7 @@ class DMSegmentThreadSourceOp : public SourceOp
DM::SegmentReadTaskPoolPtr task_pool;
DM::AfterSegmentRead after_segment_read;
DM::ColumnDefines columns_to_read;
DM::PushDownFilterPtr filter;
DM::PushDownExecutorPtr executor;
const UInt64 start_ts;
const size_t expected_block_size;
const DM::ReadMode read_mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include <Columns/countBytesInFilter.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetWithVectorIndexInputStream.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/Filter/WithANNQueryInfo.h>


namespace DB::DM
Expand Down
22 changes: 12 additions & 10 deletions dbms/src/Storages/DeltaMerge/ConcatSkippableBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,12 @@ Block ConcatVectorIndexBlockInputStream::read()
return block;
}

SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
std::tuple<SkippableBlockInputStreamPtr, bool> ConcatVectorIndexBlockInputStream::build(
const BitmapFilterPtr & bitmap_filter,
std::shared_ptr<ConcatSkippableBlockInputStream<false>> stream,
const ANNQueryInfoPtr & ann_query_info)
{
if (!ann_query_info)
return stream;
assert(ann_query_info != nullptr);
bool has_vector_index_stream = false;
std::vector<VectorIndexBlockInputStream *> index_streams;
index_streams.reserve(stream->children.size());
Expand All @@ -287,13 +286,16 @@ SkippableBlockInputStreamPtr ConcatVectorIndexBlockInputStream::build(
index_streams.push_back(nullptr);
}
if (!has_vector_index_stream)
return stream;

return std::make_shared<ConcatVectorIndexBlockInputStream>(
bitmap_filter,
stream,
std::move(index_streams),
ann_query_info->top_k());
return {stream, false};

return {
std::make_shared<ConcatVectorIndexBlockInputStream>(
bitmap_filter,
stream,
std::move(index_streams),
ann_query_info->top_k()),
true,
};
}

} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ConcatVectorIndexBlockInputStream : public SkippableBlockInputStream
, bitmap_filter(bitmap_filter_)
{}

static SkippableBlockInputStreamPtr build(
static std::tuple<SkippableBlockInputStreamPtr, bool> build(
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
const BitmapFilterPtr & bitmap_filter,
std::shared_ptr<ConcatSkippableBlockInputStream<false>> stream,
const ANNQueryInfoPtr & ann_query_info);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
const SegmentReadTaskPoolPtr & task_pool_,
AfterSegmentRead after_segment_read_,
const ColumnDefines & columns_to_read_,
const PushDownFilterPtr & filter_,
const PushDownExecutorPtr & filter_,
UInt64 start_ts_,
size_t expected_block_size_,
ReadMode read_mode_,
Expand Down Expand Up @@ -127,7 +127,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
SegmentReadTaskPoolPtr task_pool;
AfterSegmentRead after_segment_read;
ColumnDefines columns_to_read;
PushDownFilterPtr filter;
PushDownExecutorPtr filter;
Block header;
const UInt64 start_ts;
const size_t expected_block_size;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
Expand Down Expand Up @@ -1228,12 +1228,12 @@ ReadMode DeltaMergeStore::getReadMode(
const Context & db_context,
bool is_fast_scan,
bool keep_order,
const PushDownFilterPtr & filter)
const PushDownExecutorPtr & filter)
{
auto read_mode = getReadModeImpl(db_context, is_fast_scan, keep_order);
RUNTIME_CHECK_MSG(
!filter || !filter->before_where || read_mode == ReadMode::Bitmap,
"Push down filters needs bitmap, push down filters is empty: {}, read mode: {}",
"Push down executor needs bitmap, push down executor is empty: {}, read mode: {}",
filter == nullptr || filter->before_where == nullptr,
magic_enum::enum_name(read_mode));
return read_mode;
Expand All @@ -1246,7 +1246,7 @@ BlockInputStreams DeltaMergeStore::read(
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand Down Expand Up @@ -1332,7 +1332,7 @@ BlockInputStreams DeltaMergeStore::read(
LOG_INFO(
tracing_logger,
"Read create stream done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
"is_fast_scan={} is_push_down_executor_empty={} pool_id={} num_streams={} columns_to_read={} "
"final_columns_to_read={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
Expand All @@ -1356,7 +1356,7 @@ void DeltaMergeStore::read(
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand Down Expand Up @@ -1452,7 +1452,7 @@ void DeltaMergeStore::read(
LOG_INFO(
tracing_logger,
"Read create PipelineExec done, keep_order={} dt_enable_read_thread={} enable_read_thread={} "
"is_fast_scan={} is_push_down_filter_empty={} pool_id={} num_streams={} columns_to_read={} "
"is_fast_scan={} is_push_down_executor_empty={} pool_id={} num_streams={} columns_to_read={} "
"final_columns_to_read={}",
keep_order,
db_context.getSettingsRef().dt_enable_read_thread,
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/DeltaMerge/Filter/RSOperator_fwd.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
Expand Down Expand Up @@ -460,7 +460,7 @@ class DeltaMergeStore
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand All @@ -485,7 +485,7 @@ class DeltaMergeStore
const RowKeyRanges & sorted_ranges,
size_t num_streams,
UInt64 start_ts,
const PushDownFilterPtr & filter,
const PushDownExecutorPtr & filter,
const RuntimeFilteList & runtime_filter_list,
int rf_max_wait_time_ms,
const String & tracing_id,
Expand Down Expand Up @@ -588,7 +588,7 @@ class DeltaMergeStore
const Context & db_context,
bool is_fast_scan,
bool keep_order,
const PushDownFilterPtr & filter);
const PushDownExecutorPtr & filter);

// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,38 @@
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Filter/PushDownExecutor.h>
#include <Storages/SelectQueryInfo.h>
#include <TiDB/Decode/TypeMapping.h>

namespace DB::DM
{
PushDownFilterPtr PushDownFilter::build(
PushDownExecutorPtr PushDownExecutor::build(
const RSOperatorPtr & rs_operator,
const ANNQueryInfoPtr & ann_query_info,
const TiDB::ColumnInfos & table_scan_column_info,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters,
const ColumnDefines & columns_to_read,
const Context & context,
const LoggerPtr & tracing_logger)
{
// check if the ann_query_info is valid
auto valid_ann_query_info = ann_query_info;
if (ann_query_info)
{
bool is_valid_ann_query = ann_query_info->top_k() != std::numeric_limits<UInt32>::max();
bool is_matching_ann_query = std::any_of(
columns_to_read.begin(),
columns_to_read.end(),
[cid = ann_query_info->column_id()](const ColumnDefine & cd) -> bool { return cd.id == cid; });
if (!is_valid_ann_query || !is_matching_ann_query)
valid_ann_query_info = nullptr;
}

if (pushed_down_filters.empty())
{
LOG_DEBUG(tracing_logger, "Push down filter is empty");
return std::make_shared<PushDownFilter>(rs_operator);
return std::make_shared<PushDownExecutor>(rs_operator, valid_ann_query_info);
}
std::unordered_map<ColumnID, ColumnDefine> columns_to_read_map;
for (const auto & column : columns_to_read)
Expand Down Expand Up @@ -120,7 +134,7 @@ PushDownFilterPtr PushDownFilter::build(
}

// build filter expression actions
auto [before_where, filter_column_name, project_after_where] = analyzer->buildPushDownFilter(pushed_down_filters);
auto [before_where, filter_column_name, project_after_where] = analyzer->buildPushDownExecutor(pushed_down_filters);
LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());

// record current column defines
Expand All @@ -145,8 +159,9 @@ PushDownFilterPtr PushDownFilter::build(
}
}

return std::make_shared<PushDownFilter>(
return std::make_shared<PushDownExecutor>(
rs_operator,
valid_ann_query_info,
before_where,
project_after_where,
filter_columns,
Expand All @@ -155,7 +170,7 @@ PushDownFilterPtr PushDownFilter::build(
columns_after_cast);
}

PushDownFilterPtr PushDownFilter::build(
PushDownExecutorPtr PushDownExecutor::build(
const SelectQueryInfo & query_info,
const ColumnDefines & columns_to_read,
const ColumnDefines & table_column_defines,
Expand All @@ -174,6 +189,10 @@ PushDownFilterPtr PushDownFilter::build(
table_column_defines,
context.getSettingsRef().dt_enable_rough_set_filter,
tracing_logger);
// build ann_query_info
ANNQueryInfoPtr ann_query_info = nullptr;
if (dag_query->ann_query_info.query_type() != tipb::ANNQueryType::InvalidQueryType)
ann_query_info = std::make_shared<tipb::ANNQueryInfo>(dag_query->ann_query_info);
// build push down filter
const auto & pushed_down_filters = dag_query->pushed_down_filters;
if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty())
Expand All @@ -182,16 +201,18 @@ PushDownFilterPtr PushDownFilter::build(
pushed_down_filters.begin(),
pushed_down_filters.end()};
merged_filters.MergeFrom(dag_query->filters);
return PushDownFilter::build(
return PushDownExecutor::build(
rs_operator,
ann_query_info,
columns_to_read_info,
merged_filters,
columns_to_read,
context,
tracing_logger);
}
return PushDownFilter::build(
return PushDownExecutor::build(
rs_operator,
ann_query_info,
columns_to_read_info,
pushed_down_filters,
columns_to_read,
Expand Down
Loading