diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 738087a702f070..d707349132036c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -44,6 +44,7 @@ #include "io/fs/file_system.h" #include "io/fs/file_writer.h" #include "io/fs/remote_file_system.h" +#include "io/io_common.h" #include "olap/cumulative_compaction_policy.h" #include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" @@ -345,8 +346,9 @@ bool CompactionMixin::handle_ordered_data_compaction() { if (!config::enable_ordered_data_compaction) { return false; } - if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION) { - // The remote file system does not support to link files. + if (compaction_type() == ReaderType::READER_COLD_DATA_COMPACTION || + compaction_type() == ReaderType::READER_FULL_COMPACTION) { + // The remote file system and full compaction does not support to link files. return false; } if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 9d675f731924c1..529efa2e069faa 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -59,6 +59,9 @@ Status FullCompaction::prepare_compact() { std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock()); tablet()->set_is_full_compaction_running(true); + DBUG_EXECUTE_IF("FullCompaction.prepare_compact.set_cumu_point", + { tablet()->set_cumulative_layer_point(tablet()->max_version_unlocked() + 1); }) + // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); diff --git a/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy b/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy new file mode 100644 index 00000000000000..c6dfa6b885cf6c --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_full_compaction_with_ordered_data.groovy @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_full_compaction_with_ordered_data","nonConcurrent") { + if (isCloudMode()) { + return + } + def tableName = "test_full_compaction_with_ordered_data" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + String backend_id; + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DUPLICATE KEY(k) + DISTRIBUTED BY HASH(k) + BUCKETS 3 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + sql """ INSERT INTO ${tableName} VALUES (0,0),(1,1),(2,2)""" + sql """ delete from ${tableName} where k=0""" + sql """ delete from ${tableName} where k=1""" + sql """ delete from ${tableName} where k=2""" + + def exception = false; + try { + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + // before full compaction, there are 12 rowsets. + int rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 5 * replicaNum * 3) + + // trigger full compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + } + + // wait for full compaction done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 1 * replicaNum * 3) + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + assertFalse(exception) + } + + sql """ delete from ${tableName} where k=0""" + sql """ delete from ${tableName} where k=1""" + sql """ delete from ${tableName} where k=2""" + sql """ delete from ${tableName} where k=3""" + sql """ delete from ${tableName} where k=4""" + sql """ delete from ${tableName} where k=5""" + sql """ delete from ${tableName} where k=6""" + sql """ delete from ${tableName} where k=7""" + sql """ delete from ${tableName} where k=8""" + sql """ delete from ${tableName} where k=9""" + sql """ INSERT INTO ${tableName} VALUES (10,10)""" + + GetDebugPoint().clearDebugPointsForAllBEs() + + exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.prepare_compact.set_cumu_point") + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + + def replicaNum = get_table_replica_num(tableName) + logger.info("get table replica num: " + replicaNum) + // before full compaction, there are 12 rowsets. + int rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 12 * replicaNum * 3) + + // trigger full compactions for all tablets in ${tableName} + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + times = 1 + + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(2000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + } + + // wait for full compaction done + for (def tablet in tablets) { + boolean running = true + do { + Thread.sleep(1000) + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + } + + // after full compaction, there is only 1 rowset. + + rowsetCount = 0 + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + rowsetCount +=((List) tabletJson.rowsets).size() + } + assert (rowsetCount == 1 * replicaNum * 3) + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.prepare_compact.set_cumu_point") + assertFalse(exception) + } +}