From e4c6c9c0109b67d48af2d65c94a12a08a8614c05 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 18 Dec 2024 18:47:06 +0800 Subject: [PATCH] [fix](cluster key) support segcompaction --- be/src/olap/rowset/beta_rowset_writer.cpp | 1 - be/src/olap/rowset/segcompaction.cpp | 7 +- be/src/olap/rowset/segcompaction.h | 1 + .../test_segcompaction_unique_keys_mow_ck.out | 6 + .../test_segcompaction_unique_keys_mow.groovy | 14 ++- ...st_segcompaction_unique_keys_mow_ck.groovy | 116 ++++++++++++++++++ 6 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out create mode 100644 regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index dc155efe0165bc..dca20b13fe726c 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -583,7 +583,6 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() { Status status = Status::OK(); // if not doing segcompaction, just check segment number if (!config::enable_segcompaction || !_context.enable_segcompaction || - !_context.tablet_schema->cluster_key_uids().empty() || _context.tablet_schema->num_variant_columns() > 0) { return _check_segment_number_limit(_num_segment); } diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 427236a6119673..6d8c32c4ae5269 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -87,6 +87,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( std::shared_ptr schema, OlapReaderStatistics* stat, vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, std::vector& return_columns, + std::vector& key_group_cluster_key_idxes, std::unique_ptr* reader) { const auto& ctx = _writer->_context; bool record_rowids = need_convert_delete_bitmap() && is_key; @@ -123,6 +124,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.is_key_column_group = is_key; reader_params.use_page_cache = false; reader_params.record_rowids = record_rowids; + reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes; return (*reader)->init(reader_params, nullptr); } @@ -281,8 +283,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt auto schema = std::make_shared(ctx.tablet_schema->columns(), column_ids); OlapReaderStatistics reader_stats; std::unique_ptr reader; - auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, - is_key, column_ids, &reader); + auto s = + _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf, + is_key, column_ids, key_group_cluster_key_idxes, &reader); if (UNLIKELY(reader == nullptr || !s.ok())) { return Status::Error( "failed to get segcompaction reader. err: {}", s.to_string()); diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 5ec74c0e660963..ac9ba66f7fb017 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -87,6 +87,7 @@ class SegcompactionWorker { OlapReaderStatistics* stat, vectorized::RowSourcesBuffer& row_sources_buf, bool is_key, std::vector& return_columns, + std::vector& key_group_cluster_key_idxes, std::unique_ptr* reader); std::unique_ptr _create_segcompaction_writer(uint32_t begin, uint32_t end); diff --git a/regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out b/regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out new file mode 100644 index 00000000000000..1d49e6a589aabb --- /dev/null +++ b/regression-test/data/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- +47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee + +-- !select_default -- +47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee diff --git a/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy index cc7a40c20aca91..4b6544306fb755 100644 --- a/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy +++ b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow.groovy @@ -92,8 +92,20 @@ suite("test_segcompaction_unique_keys_mow") { qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """ - String[][] tablets = sql """ show tablets from ${tableName}; """ + def row_count = sql """ SELECT count(*) FROM ${tableName}; """ + logger.info("row_count: " + row_count) + assertEquals(4999989, row_count[0][0]) + def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """ + logger.info("duplicated keys: " + result) + assertTrue(result.size() == 0, "There are duplicate keys in the table") + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } } finally { try_sql("DROP TABLE IF EXISTS ${tableName}") } diff --git a/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy new file mode 100644 index 00000000000000..6fa8f6bbcb4d94 --- /dev/null +++ b/regression-test/suites/segcompaction_p2/test_segcompaction_unique_keys_mow_ck.groovy @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("test_segcompaction_unique_keys_mow_ck") { + for (int i = 0; i < 2; i++) { + def tableName = "segcompaction_unique_keys_regression_test_mow_ck_" + i + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = getS3BucketName() + + + try { + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + UNIQUE KEY(`col_0`) cluster by(`col_1`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( + """ + (i == 1 ? "\"function_column.sequence_col\"='col_0', " : "") + + """ + "replication_num" = "1" + ); + """ + // "enable_unique_key_merge_on_write" = "true" + + + def uuid = UUID.randomUUID().toString().replace("-", "0") + def path = "oss://$bucket/regression/segcompaction_test/segcompaction_test.orc" + + def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" + String columns_str = ("$columns" != "") ? "($columns)" : ""; + + sql """ + LOAD LABEL $uuid ( + DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc") + INTO TABLE $tableName + FORMAT AS "ORC" + $columns_str + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "$endpoint", + "AWS_REGION" = "$region", + "provider" = "${getS3Provider()}" + ) + """ + + def max_try_milli_secs = 3600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + " $uuid") + break; + } + if (result[0][2].equals("CANCELLED")) { + logger.info("Load CANCELLED " + " $uuid") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $uuid") + } + } + + qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """ + + def row_count = sql """ SELECT count(*) FROM ${tableName}; """ + logger.info("row_count: " + row_count) + assertEquals(4999989, row_count[0][0]) + + def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """ + logger.info("duplicated keys: " + result) + assertTrue(result.size() == 0, "There are duplicate keys in the table") + + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + def (code, out, err) = curl("GET", tablet.CompactionStatus) + logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + } finally { + // try_sql("DROP TABLE IF EXISTS ${tableName}") + } + } +}