From 65ac74564d7b730df9fc963d94bc68ce86cce6e9 Mon Sep 17 00:00:00 2001 From: morrySnow Date: Thu, 21 Nov 2024 21:14:21 +0800 Subject: [PATCH 1/4] [chore](Nereids) remove useless exception (#44356) - DialectTransformException - DoNotFallbackException - UnsupportedDialectException - TransformException - MetaNotFoundException --- .../catalog/constraint/TableIdentifier.java | 8 +- .../exceptions/DialectTransformException.java | 28 ------- .../exceptions/DoNotFallbackException.java | 27 ------- .../exceptions/MetaNotFoundException.java | 74 ------------------- .../exceptions/TransformException.java | 28 ------- .../UnsupportedDialectException.java | 35 --------- .../doris/nereids/rules/AppliedAwareRule.java | 3 +- .../org/apache/doris/nereids/rules/Rule.java | 3 +- 8 files changed, 6 insertions(+), 200 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java index 8e510ec7a93ff5..ccf688663d2bd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/constraint/TableIdentifier.java @@ -21,7 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.nereids.exceptions.MetaNotFoundException; +import org.apache.doris.nereids.exceptions.AnalysisException; import com.google.common.base.Preconditions; import com.google.gson.annotations.SerializedName; @@ -48,15 +48,15 @@ public TableIdentifier(TableIf tableIf) { public TableIf toTableIf() { CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId); if (catalogIf == null) { - throw new MetaNotFoundException(String.format("Can not find catalog %s in constraint", catalogId)); + throw new AnalysisException(String.format("Can not find catalog %s in constraint", catalogId)); } DatabaseIf databaseIf = catalogIf.getDbNullable(databaseId); if (databaseIf == null) { - throw new MetaNotFoundException(String.format("Can not find database %s in constraint", databaseId)); + throw new AnalysisException(String.format("Can not find database %s in constraint", databaseId)); } TableIf tableIf = databaseIf.getTableNullable(tableId); if (tableIf == null) { - throw new MetaNotFoundException(String.format("Can not find table %s in constraint", databaseId)); + throw new AnalysisException(String.format("Can not find table %s in constraint", databaseId)); } return tableIf; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java deleted file mode 100644 index 3d96e6dd039898..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DialectTransformException.java +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -/** - * DialectTransformException when have not supported transforming for dialect converters. - */ -public class DialectTransformException extends UnsupportedOperationException { - - public DialectTransformException(String msg) { - super(String.format("Unsupported dialect transformation is %s", msg)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java deleted file mode 100644 index b6253f52c6b5df..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/DoNotFallbackException.java +++ /dev/null @@ -1,27 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -/** - * Exception for can not fall back error in Nereids. - */ -public class DoNotFallbackException extends RuntimeException { - public DoNotFallbackException(String msg) { - super(msg); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java deleted file mode 100644 index f7d19c3f844ddd..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/MetaNotFoundException.java +++ /dev/null @@ -1,74 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; - -import java.util.Optional; - -/** Nereids's AnalysisException. */ -public class MetaNotFoundException extends RuntimeException { - private final String message; - private final Optional line; - private final Optional startPosition; - private final Optional plan; - - public MetaNotFoundException(String message, Throwable cause, Optional line, - Optional startPosition, Optional plan) { - super(message, cause); - this.message = message; - this.line = line; - this.startPosition = startPosition; - this.plan = plan; - } - - public MetaNotFoundException(String message, Optional line, - Optional startPosition, Optional plan) { - super(message); - this.message = message; - this.line = line; - this.startPosition = startPosition; - this.plan = plan; - } - - public MetaNotFoundException(String message, Throwable cause) { - this(message, cause, Optional.empty(), Optional.empty(), Optional.empty()); - } - - public MetaNotFoundException(String message) { - this(message, Optional.empty(), Optional.empty(), Optional.empty()); - } - - @Override - public String getMessage() { - String planAnnotation = plan.map(p -> ";\n" + p.treeString()).orElse(""); - return getSimpleMessage() + planAnnotation; - } - - private String getSimpleMessage() { - if (line.isPresent() || startPosition.isPresent()) { - String lineAnnotation = line.map(l -> "line " + l).orElse(""); - String positionAnnotation = startPosition.map(s -> " pos " + s).orElse(""); - return message + ";" + lineAnnotation + positionAnnotation; - } else { - return message; - } - } - - // TODO: support ErrorCode -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java deleted file mode 100644 index 401fdd56bab94e..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/TransformException.java +++ /dev/null @@ -1,28 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -/** - * All exceptions thrown by transform action in {@link org.apache.doris.nereids.rules.Rule} - * should be a subclass of this class. - */ -public class TransformException extends RuntimeException { - public TransformException(String msg) { - super(String.format("Transform error: %s", msg)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java deleted file mode 100644 index cdf7944c61c158..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/exceptions/UnsupportedDialectException.java +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.exceptions; - -import org.apache.doris.nereids.parser.Dialect; - -/** - * UnsupportedDialectException when not match any in - * {@link Dialect}. - */ -public class UnsupportedDialectException extends UnsupportedOperationException { - - public UnsupportedDialectException(Dialect dialect) { - super(String.format("Unsupported dialect name is %s", dialect.getDialectName())); - } - - public UnsupportedDialectException(String type, String msg) { - super(String.format("Unsupported dialect type is %s, msg is %s", type, msg)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java index 8f7ea106236b5d..5f4822ead04be3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/AppliedAwareRule.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.rules; import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.exceptions.TransformException; import org.apache.doris.nereids.pattern.Pattern; import org.apache.doris.nereids.pattern.ProxyPattern; import org.apache.doris.nereids.trees.plans.Plan; @@ -52,7 +51,7 @@ private AppliedAwareRule(Rule rule, BiPredicate matchRootPredicate) } @Override - public List transform(Plan plan, CascadesContext context) throws TransformException { + public List transform(Plan plan, CascadesContext context) { return rule.transform(plan, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java index 7d5b4001d9ae8c..40b6225e98f433 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/Rule.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.rules; import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.exceptions.TransformException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.pattern.Pattern; import org.apache.doris.nereids.rules.RuleType.RuleTypeClass; @@ -73,7 +72,7 @@ public String toString() { return getRuleType().toString(); } - public abstract List transform(Plan node, CascadesContext context) throws TransformException; + public abstract List transform(Plan node, CascadesContext context); /** callback this function when the traverse framework accept a new plan which produce by this rule */ public void acceptPlan(Plan plan) { From fc549eca51d5dc16dc254e373a438c0971af14f5 Mon Sep 17 00:00:00 2001 From: airborne12 Date: Thu, 21 Nov 2024 21:34:44 +0800 Subject: [PATCH 2/4] [opt](primary key bf) enhance primary key bloomfilter by fixed slice type (#44397) Problem Summary: Currently, the primary key Bloom filter index can be created with any data type. However, when adding values, it only supports slice values. This inconsistency may lead to potential misuse or future issues. --- be/src/olap/primary_key_index.cpp | 4 +- .../segment_v2/bloom_filter_index_writer.cpp | 17 +++ .../segment_v2/bloom_filter_index_writer.h | 2 + .../bloom_filter_index_reader_writer_test.cpp | 107 +++++++++++++----- 4 files changed, 99 insertions(+), 31 deletions(-) diff --git a/be/src/olap/primary_key_index.cpp b/be/src/olap/primary_key_index.cpp index e416639cfb06cd..5f7bedb01fc8de 100644 --- a/be/src/olap/primary_key_index.cpp +++ b/be/src/olap/primary_key_index.cpp @@ -50,8 +50,8 @@ Status PrimaryKeyIndexBuilder::init() { auto opt = segment_v2::BloomFilterOptions(); opt.fpp = 0.01; - _bloom_filter_index_builder.reset( - new segment_v2::PrimaryKeyBloomFilterIndexWriterImpl(opt, type_info)); + RETURN_IF_ERROR(segment_v2::PrimaryKeyBloomFilterIndexWriterImpl::create( + opt, type_info, &_bloom_filter_index_builder)); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp index 98669ccb141ae7..edc6102703f492 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.cpp @@ -348,5 +348,22 @@ Status NGramBloomFilterIndexWriterImpl::create(const BloomFilterOptions& bf_opti return Status::OK(); } +Status PrimaryKeyBloomFilterIndexWriterImpl::create(const BloomFilterOptions& bf_options, + const TypeInfo* typeinfo, + std::unique_ptr* res) { + FieldType type = typeinfo->type(); + switch (type) { + case FieldType::OLAP_FIELD_TYPE_CHAR: + case FieldType::OLAP_FIELD_TYPE_VARCHAR: + case FieldType::OLAP_FIELD_TYPE_STRING: + *res = std::make_unique(bf_options, typeinfo); + break; + default: + return Status::NotSupported("unsupported type for primary key bloom filter index:{}", + std::to_string(int(type))); + } + return Status::OK(); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h index 2cdf7171e3e276..a94982438f651a 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_writer.h @@ -85,6 +85,8 @@ class PrimaryKeyBloomFilterIndexWriterImpl : public BloomFilterIndexWriter { } }; + static Status create(const BloomFilterOptions& bf_options, const TypeInfo* typeinfo, + std::unique_ptr* res); // This method may allocate large memory for bf, will return error // when memory is exhaused to prevent oom. Status add_values(const void* values, size_t count) override; diff --git a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp index 258dd9a5ff8b51..69cb343f04bf91 100644 --- a/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp +++ b/be/test/olap/rowset/segment_v2/bloom_filter_index_reader_writer_test.cpp @@ -59,40 +59,46 @@ class BloomFilterIndexReaderWriterTest : public testing::Test { }; template -void write_bloom_filter_index_file(const std::string& file_name, const void* values, - size_t value_count, size_t null_count, - ColumnIndexMetaPB* index_meta) { +Status write_bloom_filter_index_file(const std::string& file_name, const void* values, + size_t value_count, size_t null_count, + ColumnIndexMetaPB* index_meta, + bool use_primary_key_bloom_filter = false) { const auto* type_info = get_scalar_type_info(); using CppType = typename CppTypeTraits::CppType; std::string fname = dname + "/" + file_name; auto fs = io::global_local_filesystem(); { io::FileWriterPtr file_writer; - Status st = fs->create_file(fname, &file_writer); - EXPECT_TRUE(st.ok()) << st.to_string(); + RETURN_IF_ERROR(fs->create_file(fname, &file_writer)); std::unique_ptr bloom_filter_index_writer; BloomFilterOptions bf_options; - static_cast( - BloomFilterIndexWriter::create(bf_options, type_info, &bloom_filter_index_writer)); + + if (use_primary_key_bloom_filter) { + RETURN_IF_ERROR(PrimaryKeyBloomFilterIndexWriterImpl::create( + bf_options, type_info, &bloom_filter_index_writer)); + } else { + RETURN_IF_ERROR(BloomFilterIndexWriter::create(bf_options, type_info, + &bloom_filter_index_writer)); + } + const CppType* vals = (const CppType*)values; for (int i = 0; i < value_count;) { size_t num = std::min(1024, (int)value_count - i); - static_cast(bloom_filter_index_writer->add_values(vals + i, num)); + RETURN_IF_ERROR(bloom_filter_index_writer->add_values(vals + i, num)); if (i == 2048) { // second page bloom_filter_index_writer->add_nulls(null_count); } - st = bloom_filter_index_writer->flush(); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(bloom_filter_index_writer->flush()); i += 1024; } - st = bloom_filter_index_writer->finish(file_writer.get(), index_meta); - EXPECT_TRUE(st.ok()) << "writer finish status:" << st.to_string(); + RETURN_IF_ERROR(bloom_filter_index_writer->finish(file_writer.get(), index_meta)); EXPECT_TRUE(file_writer->close().ok()); EXPECT_EQ(BLOOM_FILTER_INDEX, index_meta->type()); EXPECT_EQ(bf_options.strategy, index_meta->bloom_filter_index().hash_strategy()); } + return Status::OK(); } void get_bloom_filter_reader_iter(const std::string& file_name, const ColumnIndexMetaPB& meta, @@ -110,13 +116,14 @@ void get_bloom_filter_reader_iter(const std::string& file_name, const ColumnInde } template -void test_bloom_filter_index_reader_writer_template( +Status test_bloom_filter_index_reader_writer_template( const std::string file_name, typename TypeTraits::CppType* val, size_t num, size_t null_num, typename TypeTraits::CppType* not_exist_value, - bool is_slice_type = false) { + bool is_slice_type = false, bool use_primary_key_bloom_filter = false) { using CppType = typename TypeTraits::CppType; ColumnIndexMetaPB meta; - write_bloom_filter_index_file(file_name, val, num, null_num, &meta); + RETURN_IF_ERROR(write_bloom_filter_index_file(file_name, val, num, null_num, &meta, + use_primary_key_bloom_filter)); { BloomFilterIndexReader* reader = nullptr; std::unique_ptr iter; @@ -124,8 +131,7 @@ void test_bloom_filter_index_reader_writer_template( // page 0 std::unique_ptr bf; - auto st = iter->read_bloom_filter(0, &bf); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(iter->read_bloom_filter(0, &bf)); for (int i = 0; i < 1024; ++i) { if (is_slice_type) { Slice* value = (Slice*)(val + i); @@ -136,8 +142,7 @@ void test_bloom_filter_index_reader_writer_template( } // page 1 - st = iter->read_bloom_filter(1, &bf); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(iter->read_bloom_filter(1, &bf)); for (int i = 1024; i < 2048; ++i) { if (is_slice_type) { Slice* value = (Slice*)(val + i); @@ -148,8 +153,7 @@ void test_bloom_filter_index_reader_writer_template( } // page 2 - st = iter->read_bloom_filter(2, &bf); - EXPECT_TRUE(st.ok()); + RETURN_IF_ERROR(iter->read_bloom_filter(2, &bf)); for (int i = 2048; i < 3071; ++i) { if (is_slice_type) { Slice* value = (Slice*)(val + i); @@ -163,6 +167,7 @@ void test_bloom_filter_index_reader_writer_template( delete reader; } + return Status::OK(); } TEST_F(BloomFilterIndexReaderWriterTest, test_int) { @@ -175,8 +180,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_int) { std::string file_name = "bloom_filter_int"; int not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -190,8 +196,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_bigint) { std::string file_name = "bloom_filter_bigint"; int64_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -205,8 +212,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_largeint) { std::string file_name = "bloom_filter_largeint"; int128_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -224,8 +232,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_varchar_type) { } std::string file_name = "bloom_filter_varchar"; Slice not_exist_value("value_not_exist"); - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, slices, num, 1, ¬_exist_value, true); + EXPECT_TRUE(st.ok()); delete[] val; delete[] slices; } @@ -244,8 +253,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_char) { } std::string file_name = "bloom_filter_char"; Slice not_exist_value("char_value_not_exist"); - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, slices, num, 1, ¬_exist_value, true); + EXPECT_TRUE(st.ok()); delete[] val; delete[] slices; } @@ -260,8 +270,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_date) { std::string file_name = "bloom_filter_date"; uint24_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -275,8 +286,9 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_datetime) { std::string file_name = "bloom_filter_datetime"; int64_t not_exist_value = 18888; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); delete[] val; } @@ -290,8 +302,45 @@ TEST_F(BloomFilterIndexReaderWriterTest, test_decimal) { std::string file_name = "bloom_filter_decimal"; decimal12_t not_exist_value = {666, 666}; - test_bloom_filter_index_reader_writer_template( + auto st = test_bloom_filter_index_reader_writer_template( file_name, val, num, 1, ¬_exist_value); + EXPECT_TRUE(st.ok()); + delete[] val; +} + +TEST_F(BloomFilterIndexReaderWriterTest, test_primary_key_bloom_filter_index) { + size_t num = 1024 * 3 - 1; + std::vector val_strings(num); + for (size_t i = 0; i < num; ++i) { + val_strings[i] = "primary_key_" + std::to_string(i); + } + std::vector slices(num); + for (size_t i = 0; i < num; ++i) { + slices[i] = Slice(val_strings[i]); + } + + std::string file_name = "primary_key_bloom_filter_index"; + Slice not_exist_value("primary_key_not_exist"); + + auto st = test_bloom_filter_index_reader_writer_template( + file_name, slices.data(), num, 0, ¬_exist_value, true, true); + EXPECT_TRUE(st.ok()); +} + +TEST_F(BloomFilterIndexReaderWriterTest, test_primary_key_bloom_filter_index_int) { + size_t num = 1024 * 3 - 1; + int* val = new int[num]; + for (int i = 0; i < num; ++i) { + // there will be 3 bloom filter pages + val[i] = 10000 + i + 1; + } + + std::string file_name = "primary_key_bloom_filter_index_int"; + int not_exist_value = 18888; + auto st = test_bloom_filter_index_reader_writer_template( + file_name, val, num, 1, ¬_exist_value, false, true); + EXPECT_FALSE(st.ok()); + EXPECT_EQ(st.code(), TStatusCode::NOT_IMPLEMENTED_ERROR); delete[] val; } From 867bd153ad1fb1def3f25be0114bce8b3d11c616 Mon Sep 17 00:00:00 2001 From: huanghaibin Date: Thu, 21 Nov 2024 23:16:08 +0800 Subject: [PATCH 3/4] [fix](cloud-mow) Fix the issue of missing and removing some old version delete bitmap (#44300) ### What problem does this PR solve? Related PR: #40204 Problem Summary: pr #40204 support removing old delete bitmap, however it donesn't consider boundary which will lead to miss some delete bitmap should be removed on be , it only affect local delete bitmap, the delete bitmap store in fdb is right, and these missing delete bitmaps can only be deleted by next base compaciton before this pr. --- be/src/cloud/cloud_cumulative_compaction.cpp | 5 +- be/src/cloud/cloud_delete_bitmap_action.cpp | 61 ++++++- be/src/cloud/cloud_delete_bitmap_action.h | 5 +- be/src/cloud/cloud_meta_mgr.cpp | 15 +- be/src/cloud/cloud_meta_mgr.h | 5 +- be/src/olap/tablet_meta.cpp | 6 +- be/src/service/http_service.cpp | 13 +- ...ction_remove_old_version_delete_bitmap.out | 67 ++++++-- ...on_remove_old_version_delete_bitmap.groovy | 151 +++++++++++++----- 9 files changed, 253 insertions(+), 75 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 6b74e70ee1b4b8..2f08082f51b5f3 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -393,12 +393,9 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { rowset->rowset_id().to_string(); DeleteBitmap::BitmapKey start {rowset->rowset_id(), seg_id, 0}; DeleteBitmap::BitmapKey end {rowset->rowset_id(), seg_id, pre_max_version}; - DeleteBitmap::BitmapKey before_end {rowset->rowset_id(), seg_id, - pre_max_version - 1}; auto d = _tablet->tablet_meta()->delete_bitmap().get_agg( {rowset->rowset_id(), seg_id, pre_max_version}); - to_remove_vec.emplace_back( - std::make_tuple(_tablet->tablet_id(), start, before_end)); + to_remove_vec.emplace_back(std::make_tuple(_tablet->tablet_id(), start, end)); if (d->isEmpty()) { continue; } diff --git a/be/src/cloud/cloud_delete_bitmap_action.cpp b/be/src/cloud/cloud_delete_bitmap_action.cpp index 60db5896dfab8a..86cc535e1bc88e 100644 --- a/be/src/cloud/cloud_delete_bitmap_action.cpp +++ b/be/src/cloud/cloud_delete_bitmap_action.cpp @@ -33,6 +33,7 @@ #include #include +#include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_tablet.h" #include "cloud/cloud_tablet_mgr.h" #include "common/logging.h" @@ -78,8 +79,8 @@ static Status _check_param(HttpRequest* req, uint64_t* tablet_id) { return Status::OK(); } -Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* req, - std::string* json_result) { +Status CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { uint64_t tablet_id = 0; // check & retrieve tablet_id from req if it contains RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); @@ -95,6 +96,50 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); auto size = tablet->tablet_meta()->delete_bitmap().get_size(); + LOG(INFO) << "show_local_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count + << ",cardinality=" << cardinality << ",size=" << size; + + rapidjson::Document root; + root.SetObject(); + root.AddMember("delete_bitmap_count", count, root.GetAllocator()); + root.AddMember("cardinality", cardinality, root.GetAllocator()); + root.AddMember("size", size, root.GetAllocator()); + + // to json string + rapidjson::StringBuffer strbuf; + rapidjson::PrettyWriter writer(strbuf); + root.Accept(writer); + *json_result = std::string(strbuf.GetString()); + + return Status::OK(); +} + +Status CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req, + std::string* json_result) { + uint64_t tablet_id = 0; + // check & retrieve tablet_id from req if it contains + RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param failed"); + if (tablet_id == 0) { + return Status::InternalError("check param failed: missing tablet_id"); + } + TabletMetaSharedPtr tablet_meta; + auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); + if (!st.ok()) { + LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id + << ", st=" << st.to_string(); + return st; + } + auto tablet = std::make_shared(_engine, std::move(tablet_meta)); + st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true, true); + if (!st.ok()) { + LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st; + return st; + } + auto count = tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); + auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality(); + auto size = tablet->tablet_meta()->delete_bitmap().get_size(); + LOG(INFO) << "show_ms_delete_bitmap_count,tablet_id=" << tablet_id << ",count=" << count + << ",cardinality=" << cardinality << ",size=" << size; rapidjson::Document root; root.SetObject(); @@ -113,9 +158,17 @@ Status CloudDeleteBitmapAction::_handle_show_delete_bitmap_count(HttpRequest* re void CloudDeleteBitmapAction::handle(HttpRequest* req) { req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data()); - if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_INFO) { + if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) { + std::string json_result; + Status st = _handle_show_local_delete_bitmap_count(req, &json_result); + if (!st.ok()) { + HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); + } else { + HttpChannel::send_reply(req, HttpStatus::OK, json_result); + } + } else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS) { std::string json_result; - Status st = _handle_show_delete_bitmap_count(req, &json_result); + Status st = _handle_show_ms_delete_bitmap_count(req, &json_result); if (!st.ok()) { HttpChannel::send_reply(req, HttpStatus::OK, st.to_json()); } else { diff --git a/be/src/cloud/cloud_delete_bitmap_action.h b/be/src/cloud/cloud_delete_bitmap_action.h index 9321661374c195..35739a7373efc8 100644 --- a/be/src/cloud/cloud_delete_bitmap_action.h +++ b/be/src/cloud/cloud_delete_bitmap_action.h @@ -31,7 +31,7 @@ class HttpRequest; class ExecEnv; -enum class DeleteBitmapActionType { COUNT_INFO = 1 }; +enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 }; /// This action is used for viewing the delete bitmap status class CloudDeleteBitmapAction : public HttpHandlerWithAuth { @@ -45,7 +45,8 @@ class CloudDeleteBitmapAction : public HttpHandlerWithAuth { void handle(HttpRequest* req) override; private: - Status _handle_show_delete_bitmap_count(HttpRequest* req, std::string* json_result); + Status _handle_show_local_delete_bitmap_count(HttpRequest* req, std::string* json_result); + Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string* json_result); private: CloudStorageEngine& _engine; diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 5c699ae0159050..05341d0d4bab82 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -385,7 +385,7 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tab } Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data, - bool sync_delete_bitmap) { + bool sync_delete_bitmap, bool full_sync) { using namespace std::chrono; TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", Status::OK(), tablet); @@ -411,7 +411,11 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ idx->set_partition_id(tablet->partition_id()); { std::shared_lock rlock(tablet->get_header_lock()); - req.set_start_version(tablet->max_version_unlocked() + 1); + if (full_sync) { + req.set_start_version(0); + } else { + req.set_start_version(tablet->max_version_unlocked() + 1); + } req.set_base_compaction_cnt(tablet->base_compaction_cnt()); req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt()); req.set_cumulative_point(tablet->cumulative_layer_point()); @@ -471,7 +475,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_ DeleteBitmap delete_bitmap(tablet_id); int64_t old_max_version = req.start_version() - 1; auto st = sync_tablet_delete_bitmap(tablet, old_max_version, resp.rowset_meta(), - resp.stats(), req.idx(), &delete_bitmap); + resp.stats(), req.idx(), &delete_bitmap, full_sync); if (st.is() && tried++ < retry_times) { LOG_WARNING("rowset meta is expired, need to retry") .tag("tablet", tablet->tablet_id()) @@ -617,12 +621,13 @@ bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64 Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, const TabletIndexPB& idx, - DeleteBitmap* delete_bitmap) { + DeleteBitmap* delete_bitmap, bool full_sync) { if (rs_metas.empty()) { return Status::OK(); } - if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) { + if (!full_sync && + sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) { return Status::OK(); } else { LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id=" diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index a657c0fdd8e350..c49b036ad90c15 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -58,7 +58,7 @@ class CloudMetaMgr { Status get_tablet_meta(int64_t tablet_id, std::shared_ptr* tablet_meta); Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false, - bool sync_delete_bitmap = true); + bool sync_delete_bitmap = true, bool full_sync = false); Status prepare_rowset(const RowsetMeta& rs_meta, std::shared_ptr* existed_rs_meta = nullptr); @@ -116,7 +116,8 @@ class CloudMetaMgr { Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version, std::ranges::range auto&& rs_metas, const TabletStatsPB& stats, - const TabletIndexPB& idx, DeleteBitmap* delete_bitmap); + const TabletIndexPB& idx, DeleteBitmap* delete_bitmap, + bool full_sync = false); void check_table_size_correctness(const RowsetMeta& rs_meta); int64_t get_segment_file_size(const RowsetMeta& rs_meta); int64_t get_inverted_index_file_szie(const RowsetMeta& rs_meta); diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 4005c818bc5023..9a27b95dbcd446 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1209,9 +1209,13 @@ void DeleteBitmap::remove_stale_delete_bitmap_from_queue(const std::vector(delete_bitmap_tuple); auto end_bmk = std::get<2>(delete_bitmap_tuple); + // the key range of to be removed is [start_bmk,end_bmk), + // due to the different definitions of the right boundary, + // so use end_bmk as right boundary when removing local delete bitmap, + // use (end_bmk - 1) as right boundary when removing ms delete bitmap remove(start_bmk, end_bmk); to_delete.emplace_back(std::make_tuple(std::get<0>(start_bmk).to_string(), 0, - std::get<2>(end_bmk))); + std::get<2>(end_bmk) - 1)); } _stale_delete_bitmap.erase(version_str); } diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index e7b920796a1b98..57600d1f56aae9 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -425,11 +425,16 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction/run_status", run_status_compaction_action); - CloudDeleteBitmapAction* count_delete_bitmap_action = - _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_INFO, _env, engine, + CloudDeleteBitmapAction* count_local_delete_bitmap_action = + _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); - _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count", - count_delete_bitmap_action); + _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_local", + count_local_delete_bitmap_action); + CloudDeleteBitmapAction* count_ms_delete_bitmap_action = + _pool.add(new CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_MS, _env, engine, + TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/delete_bitmap/count_ms", + count_ms_delete_bitmap_action); #ifdef ENABLE_INJECTION_POINT InjectionPointAction* injection_point_action = _pool.add(new InjectionPointAction); _ev_http_server->register_handler(HttpMethod::GET, "/api/injection_point/{op}", diff --git a/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out b/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out index 1c3611fe0b7506..37dfa3b93a5878 100644 --- a/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out +++ b/regression-test/data/compaction/test_cu_compaction_remove_old_version_delete_bitmap.out @@ -1,29 +1,78 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -0 0 0 -1 8 8 +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 -1 8 8 +0 0 8 +1 1 1 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 13 1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 13 1 13 13 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 18 1 23 23 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 0 18 1 23 23 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 -- !sql -- -0 0 0 +0 5 5 1 28 28 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 diff --git a/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy b/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy index 2219cc175b534b..a36cb4579ca487 100644 --- a/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy +++ b/regression-test/suites/compaction/test_cu_compaction_remove_old_version_delete_bitmap.groovy @@ -123,11 +123,11 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { } while (running) } - def getDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> + def getLocalDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> boolean running = true StringBuilder sb = new StringBuilder(); sb.append("curl -X GET http://${be_host}:${be_http_port}") - sb.append("/api/delete_bitmap/count?tablet_id=") + sb.append("/api/delete_bitmap/count_local?tablet_id=") sb.append(tablet_id) String command = sb.toString() @@ -135,7 +135,25 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { process = command.execute() code = process.waitFor() out = process.getText() - logger.info("Get delete bitmap count status: =" + code + ", out=" + out) + logger.info("Get local delete bitmap count status: =" + code + ", out=" + out) + assertEquals(code, 0) + def deleteBitmapStatus = parseJson(out.trim()) + return deleteBitmapStatus + } + + def getMSDeleteBitmapStatus = { be_host, be_http_port, tablet_id -> + boolean running = true + StringBuilder sb = new StringBuilder(); + sb.append("curl -X GET http://${be_host}:${be_http_port}") + sb.append("/api/delete_bitmap/count_ms?tablet_id=") + sb.append(tablet_id) + + String command = sb.toString() + logger.info(command) + process = command.execute() + code = process.waitFor() + out = process.getText() + logger.info("Get ms delete bitmap count status: =" + code + ", out=" + out) assertEquals(code, 0) def deleteBitmapStatus = parseJson(out.trim()) return deleteBitmapStatus @@ -174,21 +192,24 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { GetDebugPoint().enableDebugPointForAllBEs("CloudCumulativeCompaction.modify_rowsets.delete_expired_stale_rowsets") // 1. test normal sql "sync" - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,1,'1'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,2,'2'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,3,'3'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,4,'4'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,5,'5'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,6,'6'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,7,'7'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,8,'8'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'1'),(1,1,'1'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'2'),(2,2,'2'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'3'),(3,3,'3'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'4'),(4,4,'4'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'5'),(5,5,'5'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'6'),(6,6,'6'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'7'),(7,7,'7'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'8'),(8,8,'8'); """ qt_sql "select * from ${testTable} order by plan_id" // trigger compaction to generate base rowset def tablets = sql_return_maparray """ show tablets from ${testTable}; """ logger.info("tablets: " + tablets) - def delete_bitmap_count = 0 + def local_delete_bitmap_count = 0 + def ms_delete_bitmap_count = 0 + def local_delete_bitmap_cardinality = 0; + def ms_delete_bitmap_cardinality = 0; for (def tablet in tablets) { String tablet_id = tablet.TabletId def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ @@ -197,9 +218,20 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); // before compaction, delete_bitmap_count is (rowsets num - 1) - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - assertTrue(delete_bitmap_count == 7) - logger.info("delete_bitmap_count:" + delete_bitmap_count) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 7) + assertTrue(local_delete_bitmap_count == ms_delete_bitmap_count) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 7) + assertTrue(local_delete_bitmap_cardinality == ms_delete_bitmap_cardinality) + assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], "cumulative", tablet_id).contains("Success")); @@ -211,11 +243,11 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { def now = System.currentTimeMillis() - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,9,'9'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,10,'10'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,11,'11'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,12,'12'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,13,'13'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'9'),(1,9,'9'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'10'),(1,10,'10'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'11'),(1,11,'11'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'12'),(1,12,'12'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'13'),(1,13,'13'); """ def time_diff = System.currentTimeMillis() - now logger.info("time_diff:" + time_diff) @@ -230,11 +262,21 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """ logger.info("tablet: " + tablet_info) - // before compaction, delete_bitmap_count is (rowsets num - 1) + // before compaction, local delete_bitmap_count is (total rowsets num - 1), ms delete_bitmap_count is new rowset num String trigger_backend_id = tablet.BackendId - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - logger.info("delete_bitmap_count:" + delete_bitmap_count) - assertTrue(delete_bitmap_count == 12) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 12) + assertTrue(ms_delete_bitmap_count == 5) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 17) + assertTrue(ms_delete_bitmap_cardinality == 10) getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], @@ -244,9 +286,19 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { Thread.sleep(1000) // after compaction, delete_bitmap_count is 1 - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - logger.info("delete_bitmap_count:" + delete_bitmap_count) - assertTrue(delete_bitmap_count == 1) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 1) + assertTrue(local_delete_bitmap_count == ms_delete_bitmap_count) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 2) + assertTrue(ms_delete_bitmap_cardinality == 2) } qt_sql "select * from ${testTable} order by plan_id" @@ -255,11 +307,11 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { now = System.currentTimeMillis() - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,19,'19'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,20,'20'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,21,'21'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,22,'22'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,23,'23'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'14'),(1,19,'19'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'15'),(1,20,'20'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'16'),(1,21,'21'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'17'),(1,22,'22'); """ + sql """ INSERT INTO ${testTable} VALUES (0,0,'18'),(1,23,'23'); """ time_diff = System.currentTimeMillis() - now logger.info("time_diff:" + time_diff) @@ -273,9 +325,19 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { logger.info("tablet: " + tablet_info) String trigger_backend_id = tablet.BackendId - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - assertTrue(delete_bitmap_count == 6) - logger.info("delete_bitmap_count:" + delete_bitmap_count) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + ms_delete_bitmap_count = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + logger.info("ms_delete_bitmap_count:" + ms_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 6) + assertTrue(local_delete_bitmap_count == ms_delete_bitmap_count) + + local_delete_bitmap_cardinality = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + ms_delete_bitmap_cardinality = getMSDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).cardinality + logger.info("local_delete_bitmap_cardinality:" + local_delete_bitmap_cardinality) + logger.info("ms_delete_bitmap_cardinality:" + ms_delete_bitmap_cardinality) + assertTrue(local_delete_bitmap_cardinality == 12) + assertTrue(ms_delete_bitmap_cardinality == 12) getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], @@ -283,28 +345,29 @@ suite("test_cu_compaction_remove_old_version_delete_bitmap", "nonConcurrent") { waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id) getTabletStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id); - // update fail, delete_bitmap_count will not change + // update fail, local delete_bitmap_count will not change Thread.sleep(1000) - delete_bitmap_count = getDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count - assertTrue(delete_bitmap_count == 6) - logger.info("delete_bitmap_count:" + delete_bitmap_count) + local_delete_bitmap_count = getLocalDeleteBitmapStatus(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id).delete_bitmap_count + logger.info("local_delete_bitmap_count:" + local_delete_bitmap_count) + assertTrue(local_delete_bitmap_count == 6) } qt_sql "select * from ${testTable} order by plan_id" now = System.currentTimeMillis() - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,24,'24'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,25,'25'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,26,'26'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,27,'27'); """ - sql """ INSERT INTO ${testTable} VALUES (0,0,'0'),(1,28,'28'); """ + sql """ INSERT INTO ${testTable} VALUES (0,1,'1'),(1,24,'24'); """ + sql """ INSERT INTO ${testTable} VALUES (0,3,'2'),(1,25,'25'); """ + sql """ INSERT INTO ${testTable} VALUES (0,3,'3'),(1,26,'26'); """ + sql """ INSERT INTO ${testTable} VALUES (0,4,'4'),(1,27,'27'); """ + sql """ INSERT INTO ${testTable} VALUES (0,5,'5'),(1,28,'28'); """ time_diff = System.currentTimeMillis() - now logger.info("time_diff:" + time_diff) assertTrue(time_diff <= timeout, "wait_for_insert_into_values timeout") qt_sql "select * from ${testTable} order by plan_id" + GetDebugPoint().disableDebugPointForAllBEs("CloudCumulativeCompaction.modify_rowsets.update_delete_bitmap_failed") } finally { reset_be_param("compaction_promotion_version_count") From ca579c10a5ed8da4cedf2064a3c2e10056c24316 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Thu, 21 Nov 2024 23:23:42 +0800 Subject: [PATCH 4/4] [Fix](full compaction) Full compaction should not do ordered data compaction (#44359) Problem: For a duplicate table with the following distribution, if it has already completed cumulative compaction and then undergoes full compaction, it will cause a BE core issue. Check failed: new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point Unexpected cumulative point: 1087, origin: 2801. "rowsets": [ "[0-386] 0 DATA NONOVERLAPPING 02000000000198aabe4290f2b0f5f35610c08a233a061892 0", "[387-387] 0 DELETE OVERLAP_UNKNOWN 0200000000541310ac4d76e7580a708a2823a4d7a4f06090 0", "[388-388] 0 DELETE OVERLAP_UNKNOWN 0200000000541d76ac4d76e7580a708a2823a4d7a4f06090 0", "[389-389] 0 DELETE OVERLAP_UNKNOWN 0200000000543b4dac4d76e7580a708a2823a4d7a4f06090 0", "[390-390] 0 DELETE OVERLAP_UNKNOWN 02000000005453aeac4d76e7580a708a2823a4d7a4f06090 0", "[391-391] 0 DELETE OVERLAP_UNKNOWN 0200000000546a44ac4d76e7580a708a2823a4d7a4f06090 0", "[392-392] 0 DELETE OVERLAP_UNKNOWN 02000000005480dbac4d76e7580a708a2823a4d7a4f06090 0", "[393-393] 0 DELETE OVERLAP_UNKNOWN 0200000000548cb3ac4d76e7580a708a2823a4d7a4f06090 0", "[394-394] 0 DELETE OVERLAP_UNKNOWN 0200000000549a25ac4d76e7580a708a2823a4d7a4f06090 0", "[395-395] 0 DELETE OVERLAP_UNKNOWN 020000000054b359ac4d76e7580a708a2823a4d7a4f06090 0", "[396-396] 0 DELETE OVERLAP_UNKNOWN 020000000054c19dac4d76e7580a708a2823a4d7a4f06090 0", "[397-397] 0 DELETE OVERLAP_UNKNOWN 020000000054d757ac4d76e7580a708a2823a4d7a4f06090 0", ... "[1085-1085] 0 DELETE OVERLAP_UNKNOWN 02000000002a0b20bd4798638f237008ff42fbca276b52a2 0", "[1087-1506] 1 DATA NONOVERLAPPING 020000000000047e3b452de14ceaad2e78a87526026d2290 326.10 KB", "[1087-1506] 1 DATA NONOVERLAPPING 020000000000047e3b452de14ceaad2e78a87526026d2290 326.10 KB", ... "[2800-2800] 0 DELETE OVERLAP_UNKNOWN 02000000002f12d6bd4798638f237008ff42fbca276b52a2 0" Reason: The duplicate table will go through ordered data compaction. Due to the special distribution of the table, the input rowset will be cut by the ordered data compaction, resulting in the full compaction only being performed on a part of the rowsets. Solution: For full compaction, prohibit ordered data compaction. --- be/src/olap/compaction.cpp | 6 +- be/src/olap/full_compaction.cpp | 3 + ...t_full_compaction_with_ordered_data.groovy | 208 ++++++++++++++++++ 3 files changed, 215 insertions(+), 2 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 738087a702f070..d707349132036c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -44,6 +44,7 @@ #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/remote_file_system.h" +#include "io/io_common.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" @@ -345,8 +346,9 @@ bool CompactionMixin::handle_ordered_data_compaction() { if (!config::enable_ordered_data_compaction) { return false; } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // The remote file system does not support to link files. + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION || + compaction_type() == ReaderType::READER_FULL_COMPACTION) { + // The remote file system and full compaction does not support to link files. return false; } if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 9d675f731924c1..529efa2e069faa 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -59,6 +59,9 @@ Status FullCompaction::prepare_compact() { std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock()); tablet()->set_is_full_compaction_running(true); + DBUG_EXECUTE_IF("FullCompaction.prepare_compact.set_cumu_point", + { tablet()->set_cumulative_layer_point(tablet()->max_version_unlocked() + 1); }) + // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); diff --git a/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy b/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy new file mode 100644 index 00000000000000..c6dfa6b885cf6c --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_full_compaction_with_ordered_data","nonConcurrent") { + if (isCloudMode()) { + return + } + def tableName = "test_full_compaction_with_ordered_data" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + String backend_id; + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 3 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (0,0),(1,1),(2,2)""" + sql """ delete from ${tableName} where k=0""" + sql """ delete from ${tableName} where k=1""" + sql """ delete from ${tableName} where k=2""" + + def exception = false; + try { + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + // before full compaction, there are 12 rowsets. + int rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 5 * replicaNum * 3) + + // trigger full compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + } + + // wait for full compaction done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 1 * replicaNum * 3) + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + assertFalse(exception) + } + + sql """ delete from ${tableName} where k=0""" + sql """ delete from ${tableName} where k=1""" + sql """ delete from ${tableName} where k=2""" + sql """ delete from ${tableName} where k=3""" + sql """ delete from ${tableName} where k=4""" + sql """ delete from ${tableName} where k=5""" + sql """ delete from ${tableName} where k=6""" + sql """ delete from ${tableName} where k=7""" + sql """ delete from ${tableName} where k=8""" + sql """ delete from ${tableName} where k=9""" + sql """ INSERT INTO ${tableName} VALUES (10,10)""" + + GetDebugPoint().clearDebugPointsForAllBEs() + + exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.prepare_compact.set_cumu_point") + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + // before full compaction, there are 12 rowsets. + int rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 12 * replicaNum * 3) + + // trigger full compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + } + + // wait for full compaction done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 1 * replicaNum * 3) + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.prepare_compact.set_cumu_point") + assertFalse(exception) + } +}