Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: load RSResult only once #9738

Merged
merged 12 commits into from
Dec 25, 2024
20 changes: 5 additions & 15 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/DMContext.h>
Expand All @@ -38,22 +37,13 @@ ColumnFileBig::ColumnFileBig(const DMContext & dm_context, const DMFilePtr & fil

void ColumnFileBig::calculateStat(const DMContext & dm_context)
{
auto index_cache = dm_context.global_context.getMinMaxIndexCache();

auto pack_filter = DMFilePackFilter::loadFrom(
auto m = DMFilePackFilter::loadValidRowsAndBytes(
dm_context,
file,
index_cache,
/*set_cache_if_miss*/ false,
{segment_range},
EMPTY_RS_OPERATOR,
{},
dm_context.global_context.getFileProvider(),
dm_context.getReadLimiter(),
dm_context.scan_context,
/*tracing_id*/ dm_context.tracing_id,
ReadTag::Internal);

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
{segment_range});
valid_rows = m.match_rows;
valid_bytes = m.match_bytes;
}

void ColumnFileBig::removeData(WriteBatches & wbs) const
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <IO/FileProvider/ChecksumReadBufferBuilder.h>
#include <Storages/DeltaMerge/File/ColumnStream.h>
#include <Storages/DeltaMerge/File/DMFileReader.h>
#include <Storages/Page/PageUtil.h>
Expand Down Expand Up @@ -157,7 +158,7 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe

// Try to get the largest buffer size of reading continuous packs
size_t buffer_size = 0;
const auto & pack_res = reader.pack_filter.getPackResConst();
const auto & pack_res = reader.pack_filter->getPackRes();
for (size_t i = 0; i < n_packs; /*empty*/)
{
if (!pack_res[i].isUse())
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ size_t DMFile::colIndexSize(ColId id) const
}
else
{
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Index of {} not exist", id);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Index is not exist, col_id={}", id);
}
}
else
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ class DMFile : private boost::noncopyable

UInt32 metaVersion() const { return meta->metaVersion(); }

bool isColIndexExist(const ColId & col_id) const;

private:
DMFile(
UInt64 file_id_,
Expand Down Expand Up @@ -293,8 +295,6 @@ class DMFile : private boost::noncopyable
String colIndexCacheKey(const FileNameBase & file_name_base) const;
String colMarkCacheKey(const FileNameBase & file_name_base) const;

bool isColIndexExist(const ColId & col_id) const;

String encryptionBasePath() const;
EncryptionPath encryptionDataPath(const FileNameBase & file_name_base) const;
EncryptionPath encryptionIndexPath(const FileNameBase & file_name_base) const;
Expand Down
73 changes: 36 additions & 37 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h>
#include <Storages/DeltaMerge/Filter/WithANNQueryInfo.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/ScanContext.h>


namespace DB::DM
{

Expand Down Expand Up @@ -58,19 +56,6 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(

bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
scan_context,
tracing_id,
read_tag);

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();

if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0)
Expand All @@ -79,6 +64,22 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
max_sharing_column_bytes_for_all = 0;
}

// If pack_filter is not set, load from EMPTY_RS_OPERATOR.
if (!pack_filter)
{
pack_filter = DMFilePackFilter::loadFrom(
index_cache,
file_provider,
read_limiter,
scan_context,
dmfile,
true,
rowkey_ranges,
EMPTY_RS_OPERATOR,
read_packs,
tracing_id);
}

DMFileReader reader(
dmfile,
read_columns,
Expand All @@ -87,7 +88,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
enable_del_clean_read,
is_fast_scan,
max_data_version,
std::move(pack_filter),
pack_filter,
mark_cache,
enable_column_cache,
column_cache,
Expand Down Expand Up @@ -140,18 +141,13 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
return build(dmfile, read_columns, rowkey_ranges, scan_context);
};

if (!rs_filter)
return fallback();

auto filter_with_ann = std::dynamic_pointer_cast<WithANNQueryInfo>(rs_filter);
if (!filter_with_ann)
if (!ann_query_info)
return fallback();

if (!bitmap_filter.has_value())
return fallback();

Block header_layout = toEmptyBlock(read_columns);
auto ann_query_info = filter_with_ann->ann_query_info;

// Copy out the vector column for later use. Copy is intentionally performed after the
// fast check so that in fallback conditions we don't need unnecessary copies.
Expand Down Expand Up @@ -181,22 +177,25 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn

// All check passed. Let's read via vector index.

DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom(
dmfile,
index_cache,
/*set_cache_if_miss*/ true,
rowkey_ranges,
rs_filter,
read_packs,
file_provider,
read_limiter,
scan_context,
tracing_id,
ReadTag::Query);

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();
bool is_common_handle = !rowkey_ranges.empty() && rowkey_ranges[0].is_common_handle;

// If pack_filter is not set, load from EMPTY_RS_OPERATOR.
if (!pack_filter)
{
pack_filter = DMFilePackFilter::loadFrom(
index_cache,
file_provider,
read_limiter,
scan_context,
dmfile,
true,
rowkey_ranges,
EMPTY_RS_OPERATOR,
read_packs,
tracing_id);
}

DMFileReader rest_columns_reader(
dmfile,
rest_columns,
Expand All @@ -205,7 +204,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
enable_del_clean_read,
is_fast_scan,
max_data_version,
std::move(pack_filter),
pack_filter,
mark_cache,
enable_column_cache,
column_cache,
Expand All @@ -217,7 +216,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
tracing_id,
enable_read_thread,
scan_context,
ReadTag::Query);
read_tag);

if (column_cache_long_term && pk_col_id)
// ColumnCacheLongTerm is only filled in Vector Search.
Expand Down
17 changes: 13 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setRSOperator(const RSOperatorPtr & filter_)
DMFileBlockInputStreamBuilder & setAnnQureyInfo(const ANNQueryInfoPtr & ann_query_info_)
{
rs_filter = filter_;
ann_query_info = ann_query_info_;
return *this;
}

Expand All @@ -162,6 +162,7 @@ class DMFileBlockInputStreamBuilder
read_one_pack_every_time = true;
return *this;
}

DMFileBlockInputStreamBuilder & setRowsThreshold(size_t rows_threshold_per_read_)
{
rows_threshold_per_read = rows_threshold_per_read_;
Expand All @@ -180,6 +181,12 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & setDMFilePackFilterResult(const DMFilePackFilterResultPtr & pack_filter_)
{
pack_filter = pack_filter_;
return *this;
}

/**
* @note To really enable the long term cache, you also need to ensure
* ColumnCacheLongTerm is initialized in the global context.
Expand Down Expand Up @@ -217,8 +224,6 @@ class DMFileBlockInputStreamBuilder
bool is_fast_scan = false;
bool enable_del_clean_read = false;
UInt64 max_data_version = std::numeric_limits<UInt64>::max();
// Rough set filter
RSOperatorPtr rs_filter;
// packs filter (filter by pack index)
IdSetPtr read_packs;
MarkCachePtr mark_cache;
Expand All @@ -234,6 +239,10 @@ class DMFileBlockInputStreamBuilder
String tracing_id;
ReadTag read_tag = ReadTag::Internal;

DMFilePackFilterResultPtr pack_filter;

ANNQueryInfoPtr ann_query_info = nullptr;

VectorIndexCachePtr vector_index_cache;
// Note: Currently thie field is assigned only for Stable streams, not available for ColumnFileBig
std::optional<BitmapFilterView> bitmap_filter;
Expand Down
Loading