Skip to content

Commit

Permalink
[improvement](segment) reduce memory usage when open segments
Browse files Browse the repository at this point in the history
  • Loading branch information
jacktengg committed Jan 7, 2025
1 parent d7b28f5 commit 6921b73
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 70 deletions.
17 changes: 9 additions & 8 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
continue;
}

int segment_start = 0;
int64_t segment_start = 0;
auto split = RowSetSplits(reader->clone());

for (size_t i = 0; i != segments_rows.size(); ++i) {
Expand Down Expand Up @@ -179,14 +179,15 @@ Status ParallelScannerBuilder::_load() {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
SegmentCacheHandle segment_cache_handle;

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false));

for (const auto& segment : segment_cache_handle.get_segments()) {
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
auto segment_count = rowset->num_segments();
for (int64_t i = 0; i != segment_count; i++) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
beta_rowset, i, &segment_cache_handle, enable_segment_cache, false));
const auto& segments = segment_cache_handle.get_segments();
_all_segments_rows[rowset_id].emplace_back(segments[0]->num_rows());
}
_total_rows += rowset->num_rows();
}
Expand Down
55 changes: 33 additions & 22 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,32 +226,43 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// When reader type is for query, session variable `enable_segment_cache` should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
enable_segment_cache);
SegmentCacheHandle segment_cache_handle;
{
SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
_rowset, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false));
}

// create iterator for each segment
auto& segments = segment_cache_handle.get_segments();
_segments_rows.resize(segments.size());
for (size_t i = 0; i < segments.size(); i++) {
_segments_rows[i] = segments[i]->num_rows();
auto segment_count = _rowset->num_segments();
std::vector<segment_v2::SegmentSharedPtr> segments(segment_count);
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
seg_end = segment_count;
}
if (_read_context->record_rowids) {
_segments_rows.resize(segment_count);
{
SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
for (int64_t i = 0; i != segment_count; ++i) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
_rowset, i, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false));
const auto& tmp_segments = segment_cache_handle.get_segments();
_segments_rows[i] = tmp_segments[0]->num_rows();
if (i >= seg_start && i < seg_end) {
segments[i] = tmp_segments[0];
}
}
}

// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_num_rows));
}

auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
seg_end = segments.size();
_segments_rows));
} else {
SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
for (int64_t i = seg_start; i != seg_end; ++i) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
_rowset, i, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false));
const auto& tmp_segments = segment_cache_handle.get_segments();
segments[i] = tmp_segments[0];
}
}

const bool is_merge_iterator = _is_merge_iterator();
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class BetaRowsetReader : public RowsetReader {
return _iterator->current_block_row_locations(locations);
}

// Currently only used in UT
Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) override;

bool update_profile(RuntimeProfile* profile) override;
Expand All @@ -97,7 +98,7 @@ class BetaRowsetReader : public RowsetReader {
_rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1;
}

int32_t _get_segment_num() const {
int64_t _get_segment_num() const {
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
Expand All @@ -108,7 +109,7 @@ class BetaRowsetReader : public RowsetReader {

DorisCallOnce<Status> _init_iter_once;

std::pair<int, int> _segment_offsets;
std::pair<int64_t, int64_t> _segment_offsets;
std::vector<RowRanges> _segment_row_ranges;

SchemaSPtr _input_schema;
Expand All @@ -120,6 +121,7 @@ class BetaRowsetReader : public RowsetReader {

std::unique_ptr<RowwiseIterator> _iterator;

// Currently only used when _read_context->record_rowids is true
std::vector<uint32_t> _segments_rows;

StorageReadOptions _read_options;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct RowSetSplits {
// if segment_offsets is not empty, means we only scan
// [pair.first, pair.second) segment in rs_reader, only effective in dup key
// and pipeline
std::pair<int, int> segment_offsets;
std::pair<int64_t, int64_t> segment_offsets;

// RowRanges of each segment.
std::vector<RowRanges> segment_row_ranges;
Expand Down
56 changes: 33 additions & 23 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,37 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {
LRUCachePolicy::erase(key.encode());
}

Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id,
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf,
OlapReaderStatistics* index_load_stats) {
SegmentCache::CacheKey cache_key(rowset->rowset_id(), segment_id);
if (_segment_cache->lookup(cache_key, cache_handle)) {
// Has to check the segment status here, because the segment in cache may has something wrong during
// load index or create column reader.
// Not merge this if logic with previous to make the logic more clear.
if (cache_handle->pop_unhealthy_segment() == nullptr) {
return Status::OK();
}
}
// If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache.
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(segment_id, &segment));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
}
if (use_cache && !config::disable_segment_cache) {
// memory of SegmentCache::CacheValue will be handled by SegmentCache
auto* cache_value = new SegmentCache::CacheValue(segment);
_cache_mem_usage += segment->meta_mem_usage();
_segment_cache->insert(cache_key, *cache_value, cache_handle);
} else {
cache_handle->push_segment(std::move(segment));
}

return Status::OK();
}

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf,
Expand All @@ -60,29 +91,8 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
return Status::OK();
}
for (int64_t i = 0; i < rowset->num_segments(); i++) {
SegmentCache::CacheKey cache_key(rowset->rowset_id(), i);
if (_segment_cache->lookup(cache_key, cache_handle)) {
// Has to check the segment status here, because the segment in cache may has something wrong during
// load index or create column reader.
// Not merge this if logic with previous to make the logic more clear.
if (cache_handle->pop_unhealthy_segment() == nullptr) {
continue;
}
}
// If the segment is not healthy, then will create a new segment and will replace the unhealthy one in SegmentCache.
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
}
if (use_cache && !config::disable_segment_cache) {
// memory of SegmentCache::CacheValue will be handled by SegmentCache
auto* cache_value = new SegmentCache::CacheValue(segment);
_cache_mem_usage += segment->meta_mem_usage();
_segment_cache->insert(cache_key, *cache_value, cache_handle);
} else {
cache_handle->push_segment(std::move(segment));
}
RETURN_IF_ERROR(load_segment(rowset, i, cache_handle, use_cache, need_load_pk_index_and_bf,
index_load_stats));
}
cache_handle->set_inited();
return Status::OK();
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ class SegmentLoader {
bool use_cache = false, bool need_load_pk_index_and_bf = false,
OlapReaderStatistics* index_load_stats = nullptr);

// Load one segment of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id,
SegmentCacheHandle* cache_handle, bool use_cache = false,
bool need_load_pk_index_and_bf = false,
OlapReaderStatistics* index_load_stats = nullptr);

void erase_segment(const SegmentCache::CacheKey& key);

void erase_segments(const RowsetId& rowset_id, int64_t num_segments);
Expand Down
2 changes: 0 additions & 2 deletions be/test/olap/ordered_data_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,6 @@ TEST_F(OrderedDataCompactionTest, test_01) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
Expand Down
1 change: 1 addition & 0 deletions be/test/olap/rowid_conversion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ class TestRowIdConversion : public testing::TestWithParam<std::tuple<KeysType, b

// create output rowset reader
RowsetReaderContext reader_context;
reader_context.record_rowids = true;
reader_context.tablet_schema = tablet_schema;
reader_context.need_ordered_result = false;
std::vector<uint32_t> return_columns = {0, 1};
Expand Down
1 change: 1 addition & 0 deletions be/test/olap/segcompaction_mow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class SegCompactionMoWTest : public ::testing::TestWithParam<std::string> {
int expect_total_rows, int rows_mark_deleted,
bool skip_value_check = false) {
RowsetReaderContext reader_context;
reader_context.record_rowids = true;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = ReaderType::READER_QUERY;
Expand Down
3 changes: 3 additions & 0 deletions be/test/olap/segcompaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {

{ // read
RowsetReaderContext reader_context;
reader_context.record_rowids = true;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION;
Expand Down Expand Up @@ -849,6 +850,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadUniqueTableSmall) {

{ // read
RowsetReaderContext reader_context;
reader_context.record_rowids = true;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION;
Expand Down Expand Up @@ -1113,6 +1115,7 @@ TEST_F(SegCompactionTest, SegCompactionThenReadAggTableSmall) {

{ // read
RowsetReaderContext reader_context;
reader_context.record_rowids = true;
reader_context.tablet_schema = tablet_schema;
// use this type to avoid cache from other ut
reader_context.reader_type = ReaderType::READER_CUMULATIVE_COMPACTION;
Expand Down
12 changes: 0 additions & 12 deletions be/test/vec/olap/vertical_compaction_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
Expand Down Expand Up @@ -628,8 +626,6 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_input_rowset * num_segments * rows_per_segment);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
Expand Down Expand Up @@ -736,8 +732,6 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_segments * rows_per_segment);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
Expand Down Expand Up @@ -848,8 +842,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) {
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(),
num_input_rowset * num_segments * rows_per_segment - num_input_rowset * 100);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// All keys less than 1000 are deleted by delete handler
for (auto& item : output_data) {
ASSERT_GE(std::get<0>(item), 100);
Expand Down Expand Up @@ -951,8 +943,6 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) {
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(),
num_input_rowset * num_segments * rows_per_segment - num_input_rowset * 100);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// All keys less than 1000 are deleted by delete handler
for (auto& item : output_data) {
ASSERT_GE(std::get<0>(item), 100);
Expand Down Expand Up @@ -1042,8 +1032,6 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_segments * rows_per_segment);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
Expand Down

0 comments on commit 6921b73

Please sign in to comment.