Skip to content

Commit

Permalink
random set cluster keys
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Oct 28, 2024
1 parent 7e281cc commit 87ce1a4
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ public class Config extends ConfigBase {
"Default timeout for insert load job, in seconds."})
public static int insert_load_default_timeout_second = 14400; // 4 hour

@ConfField(mutable = true, masterOnly = true, description = {"对mow表随机设置cluster keys,用于测试",
"random set cluster keys for mow table for test"})
public static boolean random_add_cluster_keys_for_mow = false;

@ConfField(mutable = true, masterOnly = true, description = {
"等内部攒批真正写入完成才返回;insert into和stream load默认开启攒批",
"Wait for the internal batch to be written before returning; "
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4645,13 +4645,13 @@ public static short calcShortKeyColumnCount(List<Column> columns, Map<String, St

if (clusterColumns.size() > 0 && shortKeyColumnCount < clusterColumns.size()) {
boolean sameKey = true;
for (int i = 0; i < shortKeyColumnCount; i++) {
for (int i = 0; i < shortKeyColumnCount && i < indexColumns.size(); i++) {
if (!clusterColumns.get(i).getName().equals(indexColumns.get(i).getName())) {
sameKey = false;
break;
}
}
if (sameKey) {
if (sameKey && !Config.random_add_cluster_keys_for_mow) {
throw new DdlException(shortKeyColumnCount + " short keys is a part of unique keys");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,17 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
Expand All @@ -98,7 +102,6 @@
* table info in creating table.
*/
public class CreateTableInfo {

public static final String ENGINE_OLAP = "olap";
public static final String ENGINE_JDBC = "jdbc";
public static final String ENGINE_ELASTICSEARCH = "elasticsearch";
Expand All @@ -110,6 +113,8 @@ public class CreateTableInfo {
private static final ImmutableSet<AggregateType> GENERATED_COLUMN_ALLOW_AGG_TYPE =
ImmutableSet.of(AggregateType.REPLACE, AggregateType.REPLACE_IF_NOT_NULL);

private static final Logger LOG = LogManager.getLogger(CreateTableInfo.class);

private final boolean ifNotExists;
private String ctlName;
private String dbName;
Expand Down Expand Up @@ -421,6 +426,30 @@ public void validate(ConnectContext ctx) {
}
}

if (Config.random_add_cluster_keys_for_mow && isEnableMergeOnWrite && clusterKeysColumnNames.isEmpty()) {
// exclude columns whose data type can not be cluster key, see {@link ColumnDefinition#validate}
List<ColumnDefinition> clusterKeysCandidates = columns.stream().filter(c -> {
DataType type = c.getType();
return !(type.isFloatLikeType() || type.isStringType() || type.isArrayType()
|| type.isBitmapType() || type.isHllType() || type.isQuantileStateType()
|| type.isJsonType()
|| type.isVariantType()
|| type.isMapType()
|| type.isStructType());
}).collect(Collectors.toList());
if (clusterKeysCandidates.size() > 0) {
clusterKeysColumnNames = new ArrayList<>();
Random random = new Random();
int randomClusterKeysCount = random.nextInt(clusterKeysCandidates.size()) + 1;
Collections.shuffle(clusterKeysCandidates);
for (int i = 0; i < randomClusterKeysCount; i++) {
clusterKeysColumnNames.add(clusterKeysCandidates.get(i).getName());
}
LOG.info("Randomly add cluster keys for table {}.{}: {}",
dbName, tableName, clusterKeysColumnNames);
}
}

validateKeyColumns();
if (!clusterKeysColumnNames.isEmpty()) {
if (!isEnableMergeOnWrite) {
Expand Down Expand Up @@ -820,7 +849,7 @@ private void validateKeyColumns() {
break;
}
}
if (sameKey) {
if (sameKey && !Config.random_add_cluster_keys_for_mow) {
throw new AnalysisException("Unique keys and cluster keys should be different.");
}
// check that cluster key column exists
Expand Down

0 comments on commit 87ce1a4

Please sign in to comment.