From 73236678c2f0dd96fa904ad714db77d3a8ca93c8 Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 2 Dec 2024 12:18:51 +0800 Subject: [PATCH] [fix](cluster key) fix cluster key duplicated key --- be/src/cloud/cloud_cumulative_compaction.cpp | 2 - .../cloud_engine_calc_delete_bitmap_task.cpp | 2 +- be/src/olap/compaction.cpp | 31 ++++ be/src/olap/cumulative_compaction.cpp | 2 +- .../org/apache/doris/master/MasterImpl.java | 6 +- .../test_schema_change_and_compaction.out | 8 + .../test_schema_change_and_compaction.groovy | 145 ++++++++++++++++++ 7 files changed, 189 insertions(+), 7 deletions(-) create mode 100644 regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out create mode 100644 regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 2f08082f51b5f3..250bf55b849e1b 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -371,11 +371,9 @@ Status CloudCumulativeCompaction::modify_rowsets() { Status CloudCumulativeCompaction::process_old_version_delete_bitmap() { // agg previously rowset old version delete bitmap std::vector pre_rowsets {}; - std::vector pre_rowset_ids {}; for (const auto& it : cloud_tablet()->rowset_map()) { if (it.first.second < _input_rowsets.front()->start_version()) { pre_rowsets.emplace_back(it.second); - pre_rowset_ids.emplace_back(it.second->rowset_id().to_string()); } } std::sort(pre_rowsets.begin(), pre_rowsets.end(), Rowset::comparator); diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 91611d20c6270b..336117d1012d4d 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -227,7 +227,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { } } auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3; - LOG(INFO) << "calculate delete bitmap successfully on tablet" + LOG(INFO) << "finish calculate delete bitmap on tablet" << ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id << ", tablet_id=" << tablet->tablet_id() << ", get_tablet_time_us=" << get_tablet_time_us diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e71e1862dc8dbb..5901f7c759a10f 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -37,6 +37,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" +#include "cloud/config.h" #include "common/config.h" #include "common/status.h" #include "cpp/sync_point.h" @@ -191,6 +192,36 @@ Status Compaction::merge_input_rowsets() { SCOPED_TIMER(_merge_rowsets_latency_timer); // 1. Merge segment files and write bkd inverted index if (_is_vertical) { + if (!_tablet->tablet_schema()->cluster_key_idxes().empty()) { + // for mow with cluster keys, compaction need delete bitmap + bool is_tablet_notready = false; + { + std::shared_lock meta_rlock(_tablet->get_header_lock()); + is_tablet_notready = _tablet->tablet_state() == TABLET_NOTREADY; + } + if (is_tablet_notready) { + std::vector rowsets; + for (const auto& rowset : _input_rowsets) { + Status st; + if (config::is_cloud_mode()) { + st = _tablet->update_delete_bitmap_without_lock(_tablet, rowset, + &rowsets); + } else { + std::lock_guard rwlock((std::dynamic_pointer_cast(_tablet) + ->get_rowset_update_lock())); + std::shared_lock rlock(_tablet->get_header_lock()); + st = _tablet->update_delete_bitmap_without_lock(_tablet, rowset, + &rowsets); + } + if (!st.ok()) { + LOG(INFO) << "failed update_delete_bitmap_without_lock for tablet_id=" + << _tablet->tablet_id() << ", st=" << st.to_string(); + return st; + } + rowsets.push_back(rowset); + } + } + } res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), get_avg_segment_rows(), way_num, &_stats); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index b961c694ede4d0..2dfd30fb86ed9a 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -145,7 +145,7 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { DCHECK(missing_versions.size() % 2 == 0); LOG(WARNING) << "There are missed versions among rowsets. " << "total missed version size: " << missing_versions.size() / 2 - << " first missed version prev rowset verison=" << missing_versions[0] + << ", first missed version prev rowset verison=" << missing_versions[0] << ", first missed version next rowset version=" << missing_versions[1] << ", tablet=" << _tablet->tablet_id(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 4010a9b564d0a0..09318af34bd381 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -681,9 +681,9 @@ private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest request) CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) task; if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) { calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(), - "backend: " + task.getBackendId() + ", error_tablet_size: " - + request.getErrorTabletIdsSize() + ", err_msg: " - + request.getTaskStatus().getErrorMsgs().toString()); + "backend: " + task.getBackendId() + ", error_tablet_size: " + request.getErrorTabletIdsSize() + + ", error_tablets: " + request.getErrorTabletIds() + + ", err_msg: " + request.getTaskStatus().getErrorMsgs().toString()); } else if (request.isSetRespPartitions() && calcDeleteBitmapTask.isFinishRequestStale(request.getRespPartitions())) { LOG.warn("get staled response from backend: {}, report version: {}. calcDeleteBitmapTask's" diff --git a/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out new file mode 100644 index 00000000000000..35f26a488beff3 --- /dev/null +++ b/regression-test/data/unique_with_mow_c_p0/test_schema_change_and_compaction.out @@ -0,0 +1,8 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +10 20 35 40 + +-- !select2 -- +10 20 40 37 +11 20 40 37 + diff --git a/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy new file mode 100644 index 00000000000000..dfb7facf5ee4cb --- /dev/null +++ b/regression-test/suites/unique_with_mow_c_p0/test_schema_change_and_compaction.groovy @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// The cases is copied from https://github.com/trinodb/trino/tree/master +// /testing/trino-product-tests/src/main/resources/sql-tests/testcases +// and modified by Doris. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_schema_change_and_compaction", "nonConcurrent") { + def tableName = "test_schema_change_and_compaction" + + def getAlterTableState = { job_state -> + def retry = 0 + def last_state = "" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${tableName}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + last_state = state[0][9] + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def block_convert_historical_rowsets = { + if (isCloudMode()) { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + } else { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + } + } + + def unblock = { + if (isCloudMode()) { + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + } else { + GetDebugPoint().disableDebugPointForAllBEs("SchemaChangeJob::_convert_historical_rowsets.block") + } + } + + onFinish { + unblock() + } + + sql """ DROP TABLE IF EXISTS ${tableName} force """ + sql """ + CREATE TABLE ${tableName} ( `k1` int(11), `k2` int(11), `v1` int(11), `v2` int(11) ) ENGINE=OLAP + unique KEY(`k1`, `k2`) cluster by(v1) DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ + sql """ insert into ${tableName} values(10, 20, 30, 40); """ + + // alter table + block_convert_historical_rowsets() + sql """ alter table ${tableName} order by(k1, k2, v2, v1); """ + getAlterTableState("RUNNING") + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + logger.info("tablets: ${tablets}") + assertEquals(2, tablets.size()) + String alterTabletId = "" + String alterTabletBackendId = "" + String alterTabletCompactionUrl = "" + for (Map tablet : tablets) { + if (tablet["State"] == "ALTER") { + alterTabletId = tablet["TabletId"].toLong() + alterTabletBackendId = tablet["BackendId"] + alterTabletCompactionUrl = tablet["CompactionStatus"] + } + } + logger.info("alterTabletId: ${alterTabletId}, alterTabletBackendId: ${alterTabletBackendId}, alterTabletCompactionUrl: ${alterTabletCompactionUrl}") + assertTrue(!alterTabletId.isEmpty()) + + // write some data + sql """ insert into ${tableName} values(10, 20, 31, 40); """ + sql """ insert into ${tableName} values(10, 20, 32, 40); """ + sql """ insert into ${tableName} values(10, 20, 33, 40); """ + sql """ insert into ${tableName} values(10, 20, 34, 40); """ + sql """ insert into ${tableName} values(10, 20, 35, 40); """ + order_qt_select1 """ select * from ${tableName}; """ + + // trigger compaction + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort) + logger.info("ip: " + backendId_to_backendIP.get(alterTabletBackendId) + ", port: " + backendId_to_backendHttpPort.get(alterTabletBackendId)) + def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(alterTabletBackendId), backendId_to_backendHttpPort.get(alterTabletBackendId), alterTabletId+"") + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + + // wait for compaction done + def enable_new_tablet_do_compaction = get_be_param.call("enable_new_tablet_do_compaction") + logger.info("enable_new_tablet_do_compaction: " + enable_new_tablet_do_compaction) + boolean enable = enable_new_tablet_do_compaction.get(alterTabletBackendId).toBoolean() + logger.info("enable: " + enable) + for (int i = 0; i < 10; i++) { + (code, out, err) = curl("GET", alterTabletCompactionUrl) + logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def tabletJson = parseJson(out.trim()) + assert tabletJson.rowsets instanceof List + if (isCloudMode()) { + if (enable) { + if(tabletJson.rowsets.size() < 5) { + break + } + } else { + // "msg": "invalid tablet state. tablet_id=" + break + } + } else { + if(tabletJson.rowsets.size() < 5) { + break + } + } + sleep(2000) + } + + // unblock + unblock() + sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 36, 40), (11, 20, 36, 40); """ + sql """ insert into ${tableName}(k1, k2, v1, v2) values(10, 20, 37, 40), (11, 20, 37, 40); """ + getAlterTableState("FINISHED") + order_qt_select2 """ select * from ${tableName}; """ +}