diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 6b74e70ee1b4b8..2f08082f51b5f3 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -393,12 +393,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { rowset->rowset_id().to_string(); DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0}; DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version}; - DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id, - pre_max_version - 1}; auto d = _tablet->tablet_meta()->delete_bitmap().get_agg( {rowset->rowset_id(), seg_id, pre_max_version}); - to_remove_vec.emplace_back( - std::make_tuple(_tablet->tablet_id(), start, before_end)); + to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end)); if (d->isEmpty()) { continue; } diff --git a/be/src/cloud/cloud_delete_bitmap_action.cpp b/be/src/cloud/cloud_delete_bitmap_action.cpp index 60db5896dfab8a..86cc535e1bc88e 100644 --- a/be/src/cloud/cloud_delete_bitmap_action.cpp +++ b/be/src/cloud/cloud_delete_bitmap_action.cpp @@ -33,6 +33,7 @@ #include #include +#include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_mgr.h" #include "common/logging.h" @@ -78,8 +79,8 @@ static Status _check_param(HttpRequest* req, uint64_t* tablet_id) { return Status::OK(); } -Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req, - std::string* json_result) { +Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { uint64_t tablet_id = 0; // check & retrieve tablet_id from req if it contains RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); @@ -95,6 +96,50 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); auto size = tablet->tablet_meta()->delete_bitmap().get_size(); + LOG(INFO) << "show_local_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count + << ",cardinality=" << cardinality << ",size=" << size; + + rapidjson::Document root; + root.SetObject(); + root.AddMember("delete_bitmap_count", count, root.GetAllocator()); + root.AddMember("cardinality", cardinality, root.GetAllocator()); + root.AddMember("size", size, root.GetAllocator()); + + // to json string + rapidjson::StringBuffer strbuf; + rapidjson::PrettyWriter writer(strbuf); + root.Accept(writer); + *json_result = std::string(strbuf.GetString()); + + return Status::OK(); +} + +Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { + uint64_t tablet_id = 0; + // check & retrieve tablet_id from req if it contains + RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); + if (tablet_id == 0) { + return Status::InternalError("check param failed: missing tablet_id"); + } + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id + << ", st=" << st.to_string(); + return st; + } + auto tablet = std::make_shared(_engine, std::move(tablet_meta)); + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st; + return st; + } + auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); + auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); + auto size = tablet->tablet_meta()->delete_bitmap().get_size(); + LOG(INFO) << "show_ms_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count + << ",cardinality=" << cardinality << ",size=" << size; rapidjson::Document root; root.SetObject(); @@ -113,9 +158,17 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re void CloudDeleteBitmapAction::handle(HttpRequest* req) { req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); - if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) { + if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) { + std::string json_result; + Status st = _handle_show_local_delete_bitmap_count(req, &json_result); + if (!st.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); + } else { + HttpChannel::send_reply(req, HttpStatus::OK, json_result); + } + } else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS) { std::string json_result; - Status st = _handle_show_delete_bitmap_count(req, &json_result); + Status st = _handle_show_ms_delete_bitmap_count(req, &json_result); if (!st.ok()) { HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); } else { diff --git a/be/src/cloud/cloud_delete_bitmap_action.h b/be/src/cloud/cloud_delete_bitmap_action.h index 9321661374c195..35739a7373efc8 100644 --- a/be/src/cloud/cloud_delete_bitmap_action.h +++ b/be/src/cloud/cloud_delete_bitmap_action.h @@ -31,7 +31,7 @@ class HttpRequest; class ExecEnv; -enum class DeleteBitmapActionType { COUNT_INFO = 1 }; +enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 }; /// This action is used for viewing the delete bitmap status class CloudDeleteBitmapAction : public HttpHandlerWithAuth { @@ -45,7 +45,8 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth { void handle(HttpRequest* req) override; private: - Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result); + Status _handle_show_local_delete_bitmap_count(HttpRequest* req, std::string* json_result); + Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result); private: CloudStorageEngine& _engine; diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 5c699ae0159050..05341d0d4bab82 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -385,7 +385,7 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab } Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data, - bool sync_delete_bitmap) { + bool sync_delete_bitmap, bool full_sync) { using namespace std::chrono; TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); @@ -411,7 +411,11 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ idx->set_partition_id(tablet->partition_id()); { std::shared_lock rlock(tablet->get_header_lock()); - req.set_start_version(tablet->max_version_unlocked() + 1); + if (full_sync) { + req.set_start_version(0); + } else { + req.set_start_version(tablet->max_version_unlocked() + 1); + } req.set_base_compaction_cnt(tablet->base_compaction_cnt()); req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); req.set_cumulative_point(tablet->cumulative_layer_point()); @@ -471,7 +475,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ DeleteBitmap delete_bitmap(tablet_id); int64_t old_max_version = req.start_version() - 1; auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), - resp.stats(), req.idx(), &delete_bitmap); + resp.stats(), req.idx(), &delete_bitmap, full_sync); if (st.is() && tried++ < retry_times) { LOG_WARNING("rowset meta is expired, need to retry") .tag("tablet", tablet->tablet_id()) @@ -617,12 +621,13 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64 Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, - DeleteBitmap* delete_bitmap) { + DeleteBitmap* delete_bitmap, bool full_sync) { if (rs_metas.empty()) { return Status::OK(); } - if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) { + if (!full_sync && + sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) { return Status::OK(); } else { LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id=" diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index a657c0fdd8e350..c49b036ad90c15 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -58,7 +58,7 @@ class CloudMetaMgr { Status get_tablet_meta(int64_t tablet_id, std::shared_ptr* tablet_meta); Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false, - bool sync_delete_bitmap = true); + bool sync_delete_bitmap = true, bool full_sync = false); Status prepare_rowset(const RowsetMeta& rs_meta, std::shared_ptr* existed_rs_meta = nullptr); @@ -116,7 +116,8 @@ class CloudMetaMgr { Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, - const TabletIndexPB& idx, DeleteBitmap* delete_bitmap); + const TabletIndexPB& idx, DeleteBitmap* delete_bitmap, + bool full_sync = false); void check_table_size_correctness(const RowsetMeta& rs_meta); int64_t get_segment_file_size(const RowsetMeta& rs_meta); int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 738087a702f070..d707349132036c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -44,6 +44,7 @@ #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/remote_file_system.h" +#include "io/io_common.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" @@ -345,8 +346,9 @@ bool CompactionMixin::handle_ordered_data_compaction() { if (!config::enable_ordered_data_compaction) { return false; } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // The remote file system does not support to link files. + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION || + compaction_type() == ReaderType::READER_FULL_COMPACTION) { + // The remote file system and full compaction does not support to link files. return false; } if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 9d675f731924c1..529efa2e069faa 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -59,6 +59,9 @@ Status FullCompaction::prepare_compact() { std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock()); tablet()->set_is_full_compaction_running(true); + DBUG_EXECUTE_IF("FullCompaction.prepare_compact.set_cumu_point", + { tablet()->set_cumulative_layer_point(tablet()->max_version_unlocked() + 1); }) + // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); diff --git a/be/src/olap/primary_key_index.cpp b/be/src/olap/primary_key_index.cpp index e416639cfb06cd..5f7bedb01fc8de 100644 --- a/be/src/olap/primary_key_index.cpp +++ b/be/src/olap/primary_key_index.cpp @@ -50,8 +50,8 @@ Status PrimaryKeyIndexBuilder::init() { auto opt = segment_v2::BloomFilterOptions(); opt.fpp = 0.01; - _bloom_filter_index_builder.reset( - new segment_v2::PrimaryKeyBloomFilterIndexWriterImpl(opt, type_info)); + RETURN_IF_ERROR(segment_v2::PrimaryKeyBloomFilterIndexWriterImpl::create( + opt, type_info, &_bloom_filter_index_builder)); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp index 98669ccb141ae7..edc6102703f492 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp @@ -348,5 +348,22 @@ Status NGramBloomFilterIndexWriterImpl::create(const BloomFilterOptions& bf_opti return Status::OK(); } +Status PrimaryKeyBloomFilterIndexWriterImpl::create(const BloomFilterOptions& bf_options, + const TypeInfo* typeinfo, + std::unique_ptr* res) { + FieldType type = typeinfo->type(); + switch (type) { + case FieldType::OLAP_FIELD_TYPE_CHAR: + case FieldType::OLAP_FIELD_TYPE_VARCHAR: + case FieldType::OLAP_FIELD_TYPE_STRING: + *res = std::make_unique(bf_options, typeinfo); + break; + default: + return Status::NotSupported("unsupported type for primary key bloom filter index:{}", + std::to_string(int(type))); + } + return Status::OK(); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h index 2cdf7171e3e276..a94982438f651a 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h @@ -85,6 +85,8 @@ class PrimaryKeyBloomFilterIndexWriterImpl : public BloomFilterIndexWriter { } }; + static Status create(const BloomFilterOptions& bf_options, const TypeInfo* typeinfo, + std::unique_ptr* res); // This method may allocate large memory for bf, will return error // when memory is exhaused to prevent oom. Status add_values(const void* values, size_t count) override; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 4005c818bc5023..9a27b95dbcd446 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1209,9 +1209,13 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector(delete_bitmap_tuple); auto end_bmk = std::get<2>(delete_bitmap_tuple); + // the key range of to be removed is [start_bmk,end_bmk), + // due to the different definitions of the right boundary, + // so use end_bmk as right boundary when removing local delete bitmap, + // use (end_bmk - 1) as right boundary when removing ms delete bitmap remove(start_bmk, end_bmk); to_delete.emplace_back(std::make_tuple(std::get<0>(start_bmk).to_string(), 0, - std::get<2>(end_bmk))); + std::get<2>(end_bmk) - 1)); } _stale_delete_bitmap.erase(version_str); } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index e7b920796a1b98..57600d1f56aae9 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -425,11 +425,16 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); - CloudDeleteBitmapAction* count_delete_bitmap_action = - _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_INFO, _env, engine, + CloudDeleteBitmapAction* count_local_delete_bitmap_action = + _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); - _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count", - count_delete_bitmap_action); + _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_local", + count_local_delete_bitmap_action); + CloudDeleteBitmapAction* count_ms_delete_bitmap_action = + _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_MS, _env, engine, + TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_ms", + count_ms_delete_bitmap_action); #ifdef ENABLE_INJECTION_POINT InjectionPointAction* injection_point_action = _pool.add(new InjectionPointAction); _ev_http_server->register_handler(HttpMethod::GET, "/api/injection_point/{op}", diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp index 258dd9a5ff8b51..69cb343f04bf91 100644 --- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp @@ -59,40 +59,46 @@ class BloomFilterIndexReaderWriterTest : public testing::Test { }; template -void write_bloom_filter_index_file(const std::string& file_name, const void* values, - size_t value_count, size_t null_count, - ColumnIndexMetaPB* index_meta) { +Status write_bloom_filter_index_file(const std::string& file_name, const void* values, + size_t value_count, size_t null_count, + ColumnIndexMetaPB* index_meta, + bool use_primary_key_bloom_filter = false) { const auto* type_info = get_scalar_type_info(); using CppType = typename CppTypeTraits::CppType; std::string fname = dname + "/" + file_name; auto fs = io::global_local_filesystem(); { io::FileWriterPtr file_writer; - Status st = fs->create_file(fname, &file_writer); - EXPECT_TRUE(st.ok()) << st.to_string(); + RETURN_IF_ERROR(fs->create_file(fname, &file_writer)); std::unique_ptr bloom_filter_index_writer; BloomFilterOptions bf_options; - static_cast( - BloomFilterIndexWriter::create(bf_options, type_info, &bloom_filter_index_writer)); + + if (use_primary_key_bloom_filter) { + RETURN_IF_ERROR(PrimaryKeyBloomFilterIndexWriterImpl::create( + bf_options, type_info, &bloom_filter_index_writer)); + } else { + RETURN_IF_ERROR(BloomFilterIndexWriter::create(bf_options, type_info, + &bloom_filter_index_writer)); + } + const CppType* vals = (const CppType*)values; for (int i = 0; i < value_count;) { size_t num = std::min(1024, (int)value_count - i); - static_cast(bloom_filter_index_writer->add_values(vals + i, num)); + RETURN_IF_ERROR(bloom_filter_index_writer->add_values(vals + i, num)); if (i == 2048) { // second page bloom_filter_index_writer->add_nulls(null_count); } - st = bloom_filter_index_writer->flush(); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(bloom_filter_index_writer->flush()); i += 1024; } - st = bloom_filter_index_writer->finish(file_writer.get(), index_meta); - EXPECT_TRUE(st.ok()) << "writer finish status:" << st.to_string(); + RETURN_IF_ERROR(bloom_filter_index_writer->finish(file_writer.get(), index_meta)); EXPECT_TRUE(file_writer->close().ok()); EXPECT_EQ(BLOOM_FILTER_INDEX, index_meta->type()); EXPECT_EQ(bf_options.strategy, index_meta->bloom_filter_index().hash_strategy()); } + return Status::OK(); } void get_bloom_filter_reader_iter(const std::string& file_name, const ColumnIndexMetaPB& meta, @@ -110,13 +116,14 @@ void get_bloom_filter_reader_iter(const std::string& file_name, const ColumnInde } template -void test_bloom_filter_index_reader_writer_template( +Status test_bloom_filter_index_reader_writer_template( const std::string file_name, typename TypeTraits::CppType* val, size_t num, size_t null_num, typename TypeTraits::CppType* not_exist_value, - bool is_slice_type = false) { + bool is_slice_type = false, bool use_primary_key_bloom_filter = false) { using CppType = typename TypeTraits::CppType; ColumnIndexMetaPB meta; - write_bloom_filter_index_file(file_name, val, num, null_num, &meta); + RETURN_IF_ERROR(write_bloom_filter_index_file(file_name, val, num, null_num, &meta, + use_primary_key_bloom_filter)); { BloomFilterIndexReader* reader = nullptr; std::unique_ptr iter; @@ -124,8 +131,7 @@ void test_bloom_filter_index_reader_writer_template( // page 0 std::unique_ptr bf; - auto st = iter->read_bloom_filter(0, &bf); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(iter->read_bloom_filter(0, &bf)); for (int i = 0; i < 1024; ++i) { if (is_slice_type) { Slice* value = (Slice*)(val + i); @@ -136,8 +142,7 @@ void test_bloom_filter_index_reader_writer_template( } // page 1 - st = iter->read_bloom_filter(1, &bf); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(iter->read_bloom_filter(1, &bf)); for (int i = 1024; i < 2048; ++i) { if (is_slice_type) { Slice* value = (Slice*)(val + i); @@ -148,8 +153,7 @@ void test_bloom_filter_index_reader_writer_template( } // page 2 - st = iter->read_bloom_filter(2, &bf); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(iter->read_bloom_filter(2, &bf)); for (int i = 2048; i < 3071; ++i) { if (is_slice_type) { Slice* value = (Slice*)(val + i); @@ -163,6 +167,7 @@ void test_bloom_filter_index_reader_writer_template( delete reader; } + return Status::OK(); } TEST_F(BloomFilterIndexReaderWriterTest, test_int) { @@ -175,8 +180,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_int) { std::string file_name = "bloom_filter_int"; int not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -190,8 +196,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_bigint) { std::string file_name = "bloom_filter_bigint"; int64_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -205,8 +212,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_largeint) { std::string file_name = "bloom_filter_largeint"; int128_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -224,8 +232,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_varchar_type) { } std::string file_name = "bloom_filter_varchar"; Slice not_exist_value("value_not_exist"); - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, slices, num, 1, ¬_exist_value, true); + EXPECT_TRUE(st.ok()); delete[] val; delete[] slices; } @@ -244,8 +253,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_char) { } std::string file_name = "bloom_filter_char"; Slice not_exist_value("char_value_not_exist"); - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, slices, num, 1, ¬_exist_value, true); + EXPECT_TRUE(st.ok()); delete[] val; delete[] slices; } @@ -260,8 +270,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_date) { std::string file_name = "bloom_filter_date"; uint24_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -275,8 +286,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_datetime) { std::string file_name = "bloom_filter_datetime"; int64_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -290,8 +302,45 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_decimal) { std::string file_name = "bloom_filter_decimal"; decimal12_t not_exist_value = {666, 666}; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); + delete[] val; +} + +TEST_F(BloomFilterIndexReaderWriterTest, test_primary_key_bloom_filter_index) { + size_t num = 1024 * 3 - 1; + std::vector val_strings(num); + for (size_t i = 0; i < num; ++i) { + val_strings[i] = "primary_key_" + std::to_string(i); + } + std::vector slices(num); + for (size_t i = 0; i < num; ++i) { + slices[i] = Slice(val_strings[i]); + } + + std::string file_name = "primary_key_bloom_filter_index"; + Slice not_exist_value("primary_key_not_exist"); + + auto st = test_bloom_filter_index_reader_writer_template( + file_name, slices.data(), num, 0, ¬_exist_value, true, true); + EXPECT_TRUE(st.ok()); +} + +TEST_F(BloomFilterIndexReaderWriterTest, test_primary_key_bloom_filter_index_int) { + size_t num = 1024 * 3 - 1; + int* val = new int[num]; + for (int i = 0; i < num; ++i) { + // there will be 3 bloom filter pages + val[i] = 10000 + i + 1; + } + + std::string file_name = "primary_key_bloom_filter_index_int"; + int not_exist_value = 18888; + auto st = test_bloom_filter_index_reader_writer_template( + file_name, val, num, 1, ¬_exist_value, false, true); + EXPECT_FALSE(st.ok()); + EXPECT_EQ(st.code(), TStatusCode::NOT_IMPLEMENTED_ERROR); delete[] val; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java index 8e510ec7a93ff5..ccf688663d2bd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java @@ -21,7 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.nereids.exceptions.MetaNotFoundException; +import org.apache.doris.nereids.exceptions.AnalysisException; import com.google.common.base.Preconditions; import com.google.gson.annotations.SerializedName; @@ -48,15 +48,15 @@ public TableIdentifier(TableIf tableIf) { public TableIf toTableIf() { CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalogIf == null) { - throw new MetaNotFoundException(String.format("Can not find catalog %s in constraint", catalogId)); + throw new AnalysisException(String.format("Can not find catalog %s in constraint", catalogId)); } DatabaseIf databaseIf = catalogIf.getDbNullable(databaseId); if (databaseIf == null) { - throw new MetaNotFoundException(String.format("Can not find database %s in constraint", databaseId)); + throw new AnalysisException(String.format("Can not find database %s in constraint", databaseId)); } TableIf tableIf = databaseIf.getTableNullable(tableId); if (tableIf == null) { - throw new MetaNotFoundException(String.format("Can not find table %s in constraint", databaseId)); + throw new AnalysisException(String.format("Can not find table %s in constraint", databaseId)); } return tableIf; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java deleted file mode 100644 index 3d96e6dd039898..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -/** - * DialectTransformException when have not supported transforming for dialect converters. - */ -public class DialectTransformException extends UnsupportedOperationException { - - public DialectTransformException(String msg) { - super(String.format("Unsupported dialect transformation is %s", msg)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java deleted file mode 100644 index b6253f52c6b5df..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java +++ /dev/null @@ -1,27 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -/** - * Exception for can not fall back error in Nereids. - */ -public class DoNotFallbackException extends RuntimeException { - public DoNotFallbackException(String msg) { - super(msg); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java deleted file mode 100644 index f7d19c3f844ddd..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; - -import java.util.Optional; - -/** Nereids's AnalysisException. */ -public class MetaNotFoundException extends RuntimeException { - private final String message; - private final Optional line; - private final Optional startPosition; - private final Optional plan; - - public MetaNotFoundException(String message, Throwable cause, Optional line, - Optional startPosition, Optional plan) { - super(message, cause); - this.message = message; - this.line = line; - this.startPosition = startPosition; - this.plan = plan; - } - - public MetaNotFoundException(String message, Optional line, - Optional startPosition, Optional plan) { - super(message); - this.message = message; - this.line = line; - this.startPosition = startPosition; - this.plan = plan; - } - - public MetaNotFoundException(String message, Throwable cause) { - this(message, cause, Optional.empty(), Optional.empty(), Optional.empty()); - } - - public MetaNotFoundException(String message) { - this(message, Optional.empty(), Optional.empty(), Optional.empty()); - } - - @Override - public String getMessage() { - String planAnnotation = plan.map(p -> ";\n" + p.treeString()).orElse(""); - return getSimpleMessage() + planAnnotation; - } - - private String getSimpleMessage() { - if (line.isPresent() || startPosition.isPresent()) { - String lineAnnotation = line.map(l -> "line " + l).orElse(""); - String positionAnnotation = startPosition.map(s -> " pos " + s).orElse(""); - return message + ";" + lineAnnotation + positionAnnotation; - } else { - return message; - } - } - - // TODO: support ErrorCode -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java deleted file mode 100644 index 401fdd56bab94e..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -/** - * All exceptions thrown by transform action in {@link org.apache.doris.nereids.rules.Rule} - * should be a subclass of this class. - */ -public class TransformException extends RuntimeException { - public TransformException(String msg) { - super(String.format("Transform error: %s", msg)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java deleted file mode 100644 index cdf7944c61c158..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -import org.apache.doris.nereids.parser.Dialect; - -/** - * UnsupportedDialectException when not match any in - * {@link Dialect}. - */ -public class UnsupportedDialectException extends UnsupportedOperationException { - - public UnsupportedDialectException(Dialect dialect) { - super(String.format("Unsupported dialect name is %s", dialect.getDialectName())); - } - - public UnsupportedDialectException(String type, String msg) { - super(String.format("Unsupported dialect type is %s, msg is %s", type, msg)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java index 8f7ea106236b5d..5f4822ead04be3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.rules; import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.exceptions.TransformException; import org.apache.doris.nereids.pattern.Pattern; import org.apache.doris.nereids.pattern.ProxyPattern; import org.apache.doris.nereids.trees.plans.Plan; @@ -52,7 +51,7 @@ private AppliedAwareRule(Rule rule, BiPredicate matchRootPredicate) } @Override - public List transform(Plan plan, CascadesContext context) throws TransformException { + public List transform(Plan plan, CascadesContext context) { return rule.transform(plan, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java index 7d5b4001d9ae8c..40b6225e98f433 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.rules; import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.exceptions.TransformException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.pattern.Pattern; import org.apache.doris.nereids.rules.RuleType.RuleTypeClass; @@ -73,7 +72,7 @@ public String toString() { return getRuleType().toString(); } - public abstract List transform(Plan node, CascadesContext context) throws TransformException; + public abstract List transform(Plan node, CascadesContext context); /** callback this function when the traverse framework accept a new plan which produce by this rule */ public void acceptPlan(Plan plan) { diff --git a/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out b/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out index 1c3611fe0b7506..37dfa3b93a5878 100644 --- a/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out +++ b/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out @@ -1,29 +1,78 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -0 0 0 -1 8 8 +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 -1 8 8 +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 13 1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 13 1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 18 1 23 23 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 18 1 23 23 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 5 5 1 28 28 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 diff --git a/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy b/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy index 2219cc175b534b..a36cb4579ca487 100644 --- a/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy +++ b/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy @@ -123,11 +123,11 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { } while (running) } - def getDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> + def getLocalDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> boolean running = true StringBuilder sb = new StringBuilder(); sb.append("curl -X GET http://${be_host}:${be_http_port}") - sb.append("/api/delete_bitmap/count?tablet_id=") + sb.append("/api/delete_bitmap/count_local?tablet_id=") sb.append(tablet_id) String command = sb.toString() @@ -135,7 +135,25 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { process = command.execute() code = process.waitFor() out = process.getText() - logger.info("Get delete bitmap count status: =" + code + ", out=" + out) + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def getMSDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_ms?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get ms delete bitmap count status: =" + code + ", out=" + out) assertEquals(code, 0) def deleteBitmapStatus = parseJson(out.trim()) return deleteBitmapStatus @@ -174,21 +192,24 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs("CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets") // 1. test normal sql "sync" - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,1,'1'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,2,'2'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,3,'3'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,4,'4'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,5,'5'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,6,'6'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,7,'7'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,8,'8'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'3'),(3,3,'3'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'4'),(4,4,'4'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'5'),(5,5,'5'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'6'),(6,6,'6'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'7'),(7,7,'7'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'8'),(8,8,'8'); """ qt_sql "select * from ${testTable} order by plan_id" // trigger compaction to generate base rowset def tablets = sql_return_maparray """ show tablets from ${testTable}; """ logger.info("tablets: " + tablets) - def delete_bitmap_count = 0 + def local_delete_bitmap_count = 0 + def ms_delete_bitmap_count = 0 + def local_delete_bitmap_cardinality = 0; + def ms_delete_bitmap_cardinality = 0; for (def tablet in tablets) { String tablet_id = tablet.TabletId def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ @@ -197,9 +218,20 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); // before compaction, delete_bitmap_count is (rowsets num - 1) - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - assertTrue(delete_bitmap_count == 7) - logger.info("delete_bitmap_count:" + delete_bitmap_count) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 7) + assertTrue(local_delete_bitmap_count == ms_delete_bitmap_count) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 7) + assertTrue(local_delete_bitmap_cardinality == ms_delete_bitmap_cardinality) + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], "cumulative", tablet_id).contains("Success")); @@ -211,11 +243,11 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { def now = System.currentTimeMillis() - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,9,'9'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,10,'10'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,11,'11'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,12,'12'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,13,'13'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'9'),(1,9,'9'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'10'),(1,10,'10'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'11'),(1,11,'11'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'12'),(1,12,'12'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'13'),(1,13,'13'); """ def time_diff = System.currentTimeMillis() - now logger.info("time_diff:" + time_diff) @@ -230,11 +262,21 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ logger.info("tablet: " + tablet_info) - // before compaction, delete_bitmap_count is (rowsets num - 1) + // before compaction, local delete_bitmap_count is (total rowsets num - 1), ms delete_bitmap_count is new rowset num String trigger_backend_id = tablet.BackendId - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - logger.info("delete_bitmap_count:" + delete_bitmap_count) - assertTrue(delete_bitmap_count == 12) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 12) + assertTrue(ms_delete_bitmap_count == 5) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 17) + assertTrue(ms_delete_bitmap_cardinality == 10) getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], @@ -244,9 +286,19 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { Thread.sleep(1000) // after compaction, delete_bitmap_count is 1 - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - logger.info("delete_bitmap_count:" + delete_bitmap_count) - assertTrue(delete_bitmap_count == 1) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 1) + assertTrue(local_delete_bitmap_count == ms_delete_bitmap_count) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 2) + assertTrue(ms_delete_bitmap_cardinality == 2) } qt_sql "select * from ${testTable} order by plan_id" @@ -255,11 +307,11 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { now = System.currentTimeMillis() - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,19,'19'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,20,'20'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,21,'21'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,22,'22'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,23,'23'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'14'),(1,19,'19'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'15'),(1,20,'20'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'16'),(1,21,'21'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'17'),(1,22,'22'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'18'),(1,23,'23'); """ time_diff = System.currentTimeMillis() - now logger.info("time_diff:" + time_diff) @@ -273,9 +325,19 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { logger.info("tablet: " + tablet_info) String trigger_backend_id = tablet.BackendId - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - assertTrue(delete_bitmap_count == 6) - logger.info("delete_bitmap_count:" + delete_bitmap_count) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 6) + assertTrue(local_delete_bitmap_count == ms_delete_bitmap_count) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 12) + assertTrue(ms_delete_bitmap_cardinality == 12) getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], @@ -283,28 +345,29 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); - // update fail, delete_bitmap_count will not change + // update fail, local delete_bitmap_count will not change Thread.sleep(1000) - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - assertTrue(delete_bitmap_count == 6) - logger.info("delete_bitmap_count:" + delete_bitmap_count) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 6) } qt_sql "select * from ${testTable} order by plan_id" now = System.currentTimeMillis() - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,24,'24'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,25,'25'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,26,'26'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,27,'27'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,28,'28'); """ + sql """ INSERT INTO ${testTable} VALUES (0,1,'1'),(1,24,'24'); """ + sql """ INSERT INTO ${testTable} VALUES (0,3,'2'),(1,25,'25'); """ + sql """ INSERT INTO ${testTable} VALUES (0,3,'3'),(1,26,'26'); """ + sql """ INSERT INTO ${testTable} VALUES (0,4,'4'),(1,27,'27'); """ + sql """ INSERT INTO ${testTable} VALUES (0,5,'5'),(1,28,'28'); """ time_diff = System.currentTimeMillis() - now logger.info("time_diff:" + time_diff) assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") qt_sql "select * from ${testTable} order by plan_id" + GetDebugPoint().disableDebugPointForAllBEs("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed") } finally { reset_be_param("compaction_promotion_version_count") diff --git a/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy b/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy new file mode 100644 index 00000000000000..c6dfa6b885cf6c --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_full_compaction_with_ordered_data","nonConcurrent") { + if (isCloudMode()) { + return + } + def tableName = "test_full_compaction_with_ordered_data" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + String backend_id; + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 3 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (0,0),(1,1),(2,2)""" + sql """ delete from ${tableName} where k=0""" + sql """ delete from ${tableName} where k=1""" + sql """ delete from ${tableName} where k=2""" + + def exception = false; + try { + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + // before full compaction, there are 12 rowsets. + int rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 5 * replicaNum * 3) + + // trigger full compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + } + + // wait for full compaction done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 1 * replicaNum * 3) + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + assertFalse(exception) + } + + sql """ delete from ${tableName} where k=0""" + sql """ delete from ${tableName} where k=1""" + sql """ delete from ${tableName} where k=2""" + sql """ delete from ${tableName} where k=3""" + sql """ delete from ${tableName} where k=4""" + sql """ delete from ${tableName} where k=5""" + sql """ delete from ${tableName} where k=6""" + sql """ delete from ${tableName} where k=7""" + sql """ delete from ${tableName} where k=8""" + sql """ delete from ${tableName} where k=9""" + sql """ INSERT INTO ${tableName} VALUES (10,10)""" + + GetDebugPoint().clearDebugPointsForAllBEs() + + exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.prepare_compact.set_cumu_point") + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + // before full compaction, there are 12 rowsets. + int rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 12 * replicaNum * 3) + + // trigger full compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + } + + // wait for full compaction done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 1 * replicaNum * 3) + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.prepare_compact.set_cumu_point") + assertFalse(exception) + } +}