Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: load RSResult only once #9738

Merged
merged 12 commits into from
Dec 25, 2024
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ void ColumnFileBig::calculateStat(const DMContext & dm_context)
{segment_range},
EMPTY_RS_OPERATOR,
{});
std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
std::tie(valid_rows, valid_bytes) = pack_filter->validRowsAndBytes();
}

void ColumnFileBig::removeData(WriteBatches & wbs) const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe

// Try to get the largest buffer size of reading continuous packs
size_t buffer_size = 0;
const auto & pack_res = reader.pack_filter.getPackResConst();
const auto & pack_res = reader.pack_filter->getPackResConst();
for (size_t i = 0; i < n_packs; /*empty*/)
{
if (!pack_res[i].isUse())
Expand Down
20 changes: 17 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
max_sharing_column_bytes_for_all = 0;
}

// If pack_filter is not set, we will create a default one.
if (!pack_filter)
{
pack_filter
= std::make_shared<DMFilePackFilterResult>(index_cache, file_provider, read_limiter, scan_context, dmfile);
}

DMFileReader reader(
dmfile,
read_columns,
Expand All @@ -73,7 +80,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
enable_del_clean_read,
is_fast_scan,
max_data_version,
*pack_filter,
pack_filter,
mark_cache,
enable_column_cache,
column_cache,
Expand Down Expand Up @@ -165,6 +172,13 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();
bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

// If pack_filter is not set, we will create a default one.
if (!pack_filter)
{
pack_filter
= std::make_shared<DMFilePackFilterResult>(index_cache, file_provider, read_limiter, scan_context, dmfile);
}

DMFileReader rest_columns_reader(
dmfile,
rest_columns,
Expand All @@ -173,7 +187,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
enable_del_clean_read,
is_fast_scan,
max_data_version,
*pack_filter,
pack_filter,
mark_cache,
enable_column_cache,
column_cache,
Expand All @@ -185,7 +199,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
tracing_id,
enable_read_thread,
scan_context,
ReadTag::Query);
read_tag);

if (column_cache_long_term && pk_col_id)
// ColumnCacheLongTerm is only filled in Vector Search.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder setAnnQureyInfo(const ANNQueryInfoPtr & ann_query_info_)
DMFileBlockInputStreamBuilder & setAnnQureyInfo(const ANNQueryInfoPtr & ann_query_info_)
{
ann_query_info = ann_query_info_;
return *this;
Expand All @@ -162,6 +162,7 @@ class DMFileBlockInputStreamBuilder
read_one_pack_every_time = true;
return *this;
}

DMFileBlockInputStreamBuilder & setRowsThreshold(size_t rows_threshold_per_read_)
{
rows_threshold_per_read = rows_threshold_per_read_;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
namespace DB::DM
{

DMFilePackFilterResult DMFilePackFilter::load(const DMContext & dm_context)
DMFilePackFilterResultPtr DMFilePackFilter::load(const DMContext & dm_context)
{
Stopwatch watch;
SCOPE_EXIT({ scan_context->total_rs_pack_filter_check_time_ns += watch.elapsed(); });
size_t pack_count = dmfile->getPacks();
DMFilePackFilterResult result(dm_context, dmfile, pack_count);
DMFilePackFilterResult result(dm_context, dmfile);
auto read_all_packs = (rowkey_ranges.size() == 1 && rowkey_ranges[0].all()) || rowkey_ranges.empty();
if (!read_all_packs)
{
Expand Down Expand Up @@ -153,7 +153,7 @@ DMFilePackFilterResult DMFilePackFilter::load(const DMContext & dm_context)
some_count,
all_count,
all_null_count);
return result;
return std::make_shared<DMFilePackFilterResult>(std::move(result));
}

void DMFilePackFilter::loadIndex(
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DMFilePackFilter

public:
// Empty `rowkey_ranges` means do not filter by rowkey_ranges
static DMFilePackFilterResult loadFrom(
static DMFilePackFilterResultPtr loadFrom(
const DMContext & dm_context,
const DMFilePtr & dmfile,
bool set_cache_if_miss,
Expand Down Expand Up @@ -91,7 +91,7 @@ class DMFilePackFilter
, read_limiter(read_limiter_)
{}

DMFilePackFilterResult load(const DMContext & dm_context);
DMFilePackFilterResultPtr load(const DMContext & dm_context);

static void loadIndex(
ColumnIndexes & indexes,
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilterResult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFilePackFilter.h>
#include <Storages/DeltaMerge/File/DMFilePackFilterResult.h>

Expand Down Expand Up @@ -72,12 +71,12 @@ void DMFilePackFilterResult::tryLoadIndex(ColId col_id) const
DMFilePackFilter::loadIndex(
param.indexes,
dmfile,
dm_context.global_context.getFileProvider(),
dm_context.global_context.getMinMaxIndexCache(),
file_provider,
index_cache,
true,
col_id,
dm_context.global_context.getReadLimiter(),
dm_context.scan_context);
read_limiter,
scan_context);
}

} // namespace DB::DM
45 changes: 31 additions & 14 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilterResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/File/DMFile.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>
Expand All @@ -31,15 +32,33 @@ class DMFilePackFilterResult
friend class DMFilePackFilter;

public:
DMFilePackFilterResult(const DMContext & dm_context_, const DMFilePtr & dmfile_, size_t pack_count_)
: dm_context(dm_context_)
DMFilePackFilterResult(const DMContext & dm_context_, const DMFilePtr & dmfile_)
: index_cache(dm_context_.global_context.getMinMaxIndexCache())
, file_provider(dm_context_.global_context.getFileProvider())
, read_limiter(dm_context_.global_context.getReadLimiter())
, scan_context(dm_context_.scan_context)
, dmfile(dmfile_)
, handle_res(pack_count_, RSResult::All)
, handle_res(dmfile->getPacks(), RSResult::All)
, pack_res(dmfile->getPacks(), RSResult::Some)
{}

DMFilePackFilterResult(
const MinMaxIndexCachePtr & index_cache_,
const FileProviderPtr & file_provider_,
const ReadLimiterPtr & read_limiter_,
const ScanContextPtr & scan_context,
const DMFilePtr & dmfile_)
: index_cache(index_cache_)
, file_provider(file_provider_)
, read_limiter(read_limiter_)
, scan_context(scan_context)
, dmfile(dmfile_)
, handle_res(dmfile->getPacks(), RSResult::All)
, pack_res(dmfile->getPacks(), RSResult::Some)
{}

const RSResults & getHandleRes() const { return handle_res; }
const RSResults & getPackResConst() const { return pack_res; }
RSResults & getPackRes() { return pack_res; }
UInt64 countUsePack() const;

Handle getMinHandle(size_t pack_id) const
Expand All @@ -66,19 +85,13 @@ class DMFilePackFilterResult
return minmax_index->getUInt64MinMax(pack_id).second;
}

static DMFilePackFilterResultPtr emptyResult(const DMContext & dm_context, const DMFilePtr & dmfile)
{
return std::make_shared<DMFilePackFilterResult>(dm_context, dmfile, 0);
}

static DMFilePackFilterResults emptyResults(const DMContext & dm_context, const DMFiles & files)
// Only for test
static DMFilePackFilterResults defaultResults(const DMContext & dm_context, const DMFiles & files)
{
DMFilePackFilterResults results;
results.reserve(files.size());
for (const auto & file : files)
{
results.push_back(emptyResult(dm_context, file));
}
results.push_back(std::make_shared<DMFilePackFilterResult>(dm_context, file));
return results;
}

Expand All @@ -92,7 +105,11 @@ class DMFilePackFilterResult
void tryLoadIndex(ColId col_id) const;

private:
const DMContext & dm_context;
MinMaxIndexCachePtr index_cache;
FileProviderPtr file_provider;
ReadLimiterPtr read_limiter;

const ScanContextPtr scan_context;

DMFilePtr dmfile;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
mutable RSCheckParam param;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ DMFileReader::DMFileReader(
bool is_fast_scan_,
UInt64 max_read_version_,
// filters
const DMFilePackFilterResult & pack_filter_,
const DMFilePackFilterResultPtr & pack_filter_,
// caches
const MarkCachePtr & mark_cache_,
bool enable_column_cache_,
Expand Down Expand Up @@ -260,7 +260,7 @@ Block DMFileReader::readImpl(const ReadBlockInfo & read_info)
});
const auto & pack_stats = dmfile->getPackStats();
const auto & pack_properties = dmfile->getPackProperties();
const auto & handle_res = pack_filter.getHandleRes(); // alias of handle_res in pack_filter
const auto & handle_res = pack_filter->getHandleRes(); // alias of handle_res in pack_filter
std::vector<size_t> handle_column_clean_read_packs;
std::vector<size_t> del_column_clean_read_packs;
std::vector<size_t> version_column_clean_read_packs;
Expand Down Expand Up @@ -311,7 +311,7 @@ Block DMFileReader::readImpl(const ReadBlockInfo & read_info)
// If all handle in a pack are in the given range, no not_clean rows, and max version <= max_read_version,
// we do not need to read handle column.
if (handle_res[i] == RSResult::All && pack_stats[i].not_clean == 0
&& pack_filter.getMaxVersion(i) <= max_read_version)
&& pack_filter->getMaxVersion(i) <= max_read_version)
{
handle_column_clean_read_packs.push_back(i);
version_column_clean_read_packs.push_back(i);
Expand Down Expand Up @@ -374,12 +374,12 @@ ColumnPtr DMFileReader::cleanRead(
{
if (is_common_handle)
{
StringRef min_handle = pack_filter.getMinStringHandle(range.first);
StringRef min_handle = pack_filter->getMinStringHandle(range.first);
return cd.type->createColumnConst(rows_count, Field(min_handle.data, min_handle.size));
}
else
{
Handle min_handle = pack_filter.getMinHandle(range.first);
Handle min_handle = pack_filter->getMinHandle(range.first);
return cd.type->createColumnConst(rows_count, Field(min_handle));
}
}
Expand Down Expand Up @@ -706,7 +706,7 @@ void DMFileReader::addSkippedRows(UInt64 rows)

void DMFileReader::initReadBlockInfos()
{
const auto & pack_res = pack_filter.getPackResConst();
const auto & pack_res = pack_filter->getPackResConst();
const auto & pack_stats = dmfile->getPackStats();

const size_t read_pack_limit = read_one_pack_every_time ? 1 : std::numeric_limits<size_t>::max();
Expand Down Expand Up @@ -756,7 +756,7 @@ std::vector<DMFileReader::ReadBlockInfo> DMFileReader::splitReadBlockInfos(
{
const auto pack_end = read_info.start_pack_id + read_info.pack_count;
const size_t start_row_offset = pack_offset[read_info.start_pack_id];
const auto & pack_res = pack_filter.getPackResConst();
const auto & pack_res = pack_filter->getPackResConst();
const auto & pack_stats = dmfile->getPackStats();
std::vector<ReadBlockInfo> new_read_block_infos;
new_read_block_infos.reserve(pack_end - read_info.start_pack_id);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DMFileReader
// The the MVCC filter version. Used by clean read check.
UInt64 max_read_version_,
// filters
const DMFilePackFilterResult & pack_filter_,
const DMFilePackFilterResultPtr & pack_filter_,
// caches
const MarkCachePtr & mark_cache_,
bool enable_column_cache_,
Expand Down Expand Up @@ -184,7 +184,7 @@ class DMFileReader
const UInt64 max_read_version;

/// Filters
const DMFilePackFilterResult & pack_filter;
const DMFilePackFilterResultPtr pack_filter;

/// Caches
MarkCachePtr mark_cache;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void DMFileWithVectorIndexBlockInputStream::updateReadBlockInfos()

read_block_infos.clear();
const auto & pack_stats = dmfile->getPackStats();
const auto & pack_res = reader.pack_filter.getPackResConst();
const auto & pack_res = reader.pack_filter->getPackResConst();

// Update valid_packs_before_search
for (const auto res : pack_res)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/Index/RSResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ class RSResult
static ValueResult logicalAnd(ValueResult v0, ValueResult v1) noexcept;
static ValueResult logicalOr(ValueResult v0, ValueResult v1) noexcept;

// Deleting or privating constructors, so that cannot create invalid objects.
// Use the static member variables below.
RSResult() = delete;
RSResult(ValueResult v_, bool has_null_)
: v(v_)
, has_null(has_null_)
Expand All @@ -60,6 +57,10 @@ class RSResult
bool has_null;

public:
// Deleting constructors, so that cannot create invalid objects.
// Use the static member variables below.
RSResult() = delete;

bool isUse() const noexcept { return v != ValueResult::None; }

bool allMatch() const noexcept { return *this == RSResult::All; }
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,16 +950,17 @@ BlockInputStreamPtr Segment::getInputStream(

// load DMilePackFilterResult for each DMFile
DMFilePackFilterResults pack_filter_results;
pack_filter_results.reserve(segment_snap->stable->getDMFiles().size());
for (const auto & dmfile : segment_snap->stable->getDMFiles())
{
auto result = std::make_shared<DMFilePackFilterResult>(DMFilePackFilter::loadFrom(
auto result = DMFilePackFilter::loadFrom(
dm_context,
dmfile,
/*set_cache_if_miss*/ true,
read_ranges,
filter ? filter->rs_operator : EMPTY_RS_OPERATOR,
/*read_pack*/ {}));
pack_filter_results.emplace_back(std::move(result));
/*read_pack*/ {});
pack_filter_results.push_back(result);
}

switch (read_mode)
Expand Down Expand Up @@ -3525,6 +3526,7 @@ BlockInputStreamPtr Segment::getBitmapFilterInputStream(
read_data_block_rows);
}

std::cout << "getBitmapFilterInputStream" << std::endl;
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved
auto stream = getConcatSkippableBlockInputStream(
bitmap_filter,
segment_snap,
Expand Down
Loading