Skip to content

Commit

Permalink
[fix](cluster key) support segcompaction
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Dec 18, 2024
1 parent fe42ec9 commit e4c6c9c
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 4 deletions.
1 change: 0 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
Status status = Status::OK();
// if not doing segcompaction, just check segment number
if (!config::enable_segcompaction || !_context.enable_segcompaction ||
!_context.tablet_schema->cluster_key_uids().empty() ||
_context.tablet_schema->num_variant_columns() > 0) {
return _check_segment_number_limit(_num_segment);
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
std::vector<uint32_t>& return_columns,
std::vector<uint32_t>& key_group_cluster_key_idxes,
std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
const auto& ctx = _writer->_context;
bool record_rowids = need_convert_delete_bitmap() && is_key;
Expand Down Expand Up @@ -123,6 +124,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.is_key_column_group = is_key;
reader_params.use_page_cache = false;
reader_params.record_rowids = record_rowids;
reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
return (*reader)->init(reader_params, nullptr);
}

Expand Down Expand Up @@ -281,8 +283,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
auto schema = std::make_shared<Schema>(ctx.tablet_schema->columns(), column_ids);
OlapReaderStatistics reader_stats;
std::unique_ptr<vectorized::VerticalBlockReader> reader;
auto s = _get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
is_key, column_ids, &reader);
auto s =
_get_segcompaction_reader(segments, tablet, schema, &reader_stats, row_sources_buf,
is_key, column_ids, key_group_cluster_key_idxes, &reader);
if (UNLIKELY(reader == nullptr || !s.ok())) {
return Status::Error<SEGCOMPACTION_INIT_READER>(
"failed to get segcompaction reader. err: {}", s.to_string());
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class SegcompactionWorker {
OlapReaderStatistics* stat,
vectorized::RowSourcesBuffer& row_sources_buf, bool is_key,
std::vector<uint32_t>& return_columns,
std::vector<uint32_t>& key_group_cluster_key_idxes,
std::unique_ptr<vectorized::VerticalBlockReader>* reader);
std::unique_ptr<segment_v2::SegmentWriter> _create_segcompaction_writer(uint32_t begin,
uint32_t end);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee

-- !select_default --
47 Lychee Lychee Plum Banana Lychee Lychee Cherry Pineapple Banana Watermelon Mango Apple Apple Peach Raspberry Grapes Raspberry Raspberry Kiwi Orange Apple Plum Blueberry Strawberry Orange Raspberry Strawberry Lemon Orange Blueberry Apple Peach Banana Kiwi Orange Banana Strawberry Lemon Mango Orange Peach Avocado Pineapple Kiwi Lemon Grapes Strawberry Grapes Lychee
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,20 @@ suite("test_segcompaction_unique_keys_mow") {

qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """

String[][] tablets = sql """ show tablets from ${tableName}; """
def row_count = sql """ SELECT count(*) FROM ${tableName}; """
logger.info("row_count: " + row_count)
assertEquals(4999989, row_count[0][0])

def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """
logger.info("duplicated keys: " + result)
assertTrue(result.size() == 0, "There are duplicate keys in the table")

def tablets = sql_return_maparray """ show tablets from ${tableName}; """
for (def tablet in tablets) {
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
}
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_segcompaction_unique_keys_mow_ck") {
for (int i = 0; i < 2; i++) {
def tableName = "segcompaction_unique_keys_regression_test_mow_ck_" + i
String ak = getS3AK()
String sk = getS3SK()
String endpoint = getS3Endpoint()
String region = getS3Region()
String bucket = getS3BucketName()


try {

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20),
`col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20),
`col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20),
`col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20),
`col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20),
`col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20),
`col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20),
`col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20),
`col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20),
`col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20)
)
UNIQUE KEY(`col_0`) cluster by(`col_1`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1
PROPERTIES (
""" + (i == 1 ? "\"function_column.sequence_col\"='col_0', " : "") +
"""
"replication_num" = "1"
);
"""
// "enable_unique_key_merge_on_write" = "true"


def uuid = UUID.randomUUID().toString().replace("-", "0")
def path = "oss://$bucket/regression/segcompaction_test/segcompaction_test.orc"

def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49"
String columns_str = ("$columns" != "") ? "($columns)" : "";

sql """
LOAD LABEL $uuid (
DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc")
INTO TABLE $tableName
FORMAT AS "ORC"
$columns_str
)
WITH S3 (
"AWS_ACCESS_KEY" = "$ak",
"AWS_SECRET_KEY" = "$sk",
"AWS_ENDPOINT" = "$endpoint",
"AWS_REGION" = "$region",
"provider" = "${getS3Provider()}"
)
"""

def max_try_milli_secs = 3600000
while (max_try_milli_secs > 0) {
String[][] result = sql """ show load where label="$uuid" order by createtime desc limit 1; """
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + " $uuid")
break;
}
if (result[0][2].equals("CANCELLED")) {
logger.info("Load CANCELLED " + " $uuid")
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $uuid")
}
}

qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47; """

def row_count = sql """ SELECT count(*) FROM ${tableName}; """
logger.info("row_count: " + row_count)
assertEquals(4999989, row_count[0][0])

def result = sql """ select col_0, count(*) a from ${tableName} group by col_0 having a > 1; """
logger.info("duplicated keys: " + result)
assertTrue(result.size() == 0, "There are duplicate keys in the table")

def tablets = sql_return_maparray """ show tablets from ${tableName}; """
for (def tablet in tablets) {
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablet status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
}
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
}

0 comments on commit e4c6c9c

Please sign in to comment.