Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](cloud) Support multi default compute group #44502

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3250,6 +3250,15 @@ public static int metaServiceRpcRetryTimes() {
+ "model is set to 300 times, which is approximately 5 minutes by default."})
public static int auto_start_wait_to_resume_times = 300;

@ConfField(mutable = true, description = {"存算分离模式是否启用多default compute group,用户设置多default compute group后,"
+ "若当前正在使用的default compute group中所有be都异常,读写会自动切换到下一个default compute group",
"In cloud mode enabled for multiple default compute groups, "
+ "if user sets multiple default compute groups, "
+ "when all backends are abnormal which default compute group in use, "
+ "doris will switch to the next default compute group automatically"
})
public static boolean enable_multi_default_compute_group = false;

@ConfField(description = {"Get tablet stat task的最大并发数。",
"Maximal concurrent num of get tablet stat job."})
public static int max_get_tablet_stat_task_threads_num = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@
package org.apache.doris.analysis;

import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.nereids.trees.plans.commands.info.SetUserPropertyVarOp;
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SetUserPropertyVar extends SetVar {
private static final Logger LOG = LogManager.getLogger(SetUserPropertyVar.class);
public static final String DOT_SEPARATOR = ".";

private final String key;
Expand Down Expand Up @@ -85,19 +88,17 @@ private void checkAccess(Analyzer analyzer, boolean isSelf) throws AnalysisExcep
"GRANT");
}
if (Config.isCloudMode()) {
// check value, clusterName is valid.
if (key.equals(UserProperty.DEFAULT_CLOUD_CLUSTER)
&& !Strings.isNullOrEmpty(value)
&& !((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNames().contains(value)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
}

if (key.equals(UserProperty.DEFAULT_COMPUTE_GROUP)
&& !Strings.isNullOrEmpty(value)
&& !((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNames().contains(value)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
if (Config.enable_multi_default_compute_group) {
for (String defaultComputeGroup : UserProperty.getDefaultComputeGroups(value)) {
SetUserPropertyVarOp.checkComputeGroupExist(key, defaultComputeGroup);
}
} else {
if (value.contains(",")) {
LOG.warn("set cloud default compute group error,"
+ " not enable multi default compute group, but use ',' split {}", value);
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
}
SetUserPropertyVarOp.checkComputeGroupExist(key, value);
}
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,26 +389,21 @@ public long hashReplicaToBe(String clusterId, boolean isBackGround) throws Compu
// use alive be to exec sql
List<Backend> availableBes = new ArrayList<>();
List<Backend> decommissionAvailBes = new ArrayList<>();
for (Backend be : clusterBes) {
long lastUpdateMs = be.getLastUpdateMs();
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
// be core or restart must in heartbeat_interval_second
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
&& !be.isSmoothUpgradeSrc()) {
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
} else {
availableBes.add(be);
}
}
}
CloudSystemInfoService.getBesStatus(clusterBes, decommissionAvailBes, availableBes);
if (availableBes.isEmpty()) {
availableBes = decommissionAvailBes;
}
if (availableBes.isEmpty()) {
if (!isBackGround) {
LOG.warn("failed to get available be, clusterId: {}", clusterId);
}
if (Config.enable_multi_default_compute_group) {
LOG.info("failed to get available be, origin clusterName-clusterId: {}-{},"
+ " so select bes from multi default compute groups", clusterName, clusterId);
ConnectContext.chooseCloudComputeGroupHasAliveBes(decommissionAvailBes, availableBes,
clusterName, true);

}
throw new ComputeGroupException(
String.format("All the Backend nodes in the current compute group %s are in an abnormal state",
clusterName),
Expand Down Expand Up @@ -454,19 +449,7 @@ public List<Long> hashReplicaToBes(String clusterId, boolean isBackGround, int r
// use alive be to exec sql
List<Backend> availableBes = new ArrayList<>();
List<Backend> decommissionAvailBes = new ArrayList<>();
for (Backend be : clusterBes) {
long lastUpdateMs = be.getLastUpdateMs();
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
// be core or restart must in heartbeat_interval_second
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
&& !be.isSmoothUpgradeSrc()) {
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
} else {
availableBes.add(be);
}
}
}
CloudSystemInfoService.getBesStatus(clusterBes, decommissionAvailBes, availableBes);
if (availableBes.isEmpty()) {
availableBes = decommissionAvailBes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,4 +1158,21 @@ public String getInstanceId(String cloudUniqueId) throws IOException {
throw new IOException("Failed to get instance info");
}
}

public static void getBesStatus(List<Backend> clusterBes,
List<Backend> decommissionAvailBes, List<Backend> availableBes) {
for (Backend be : clusterBes) {
long lastUpdateMs = be.getLastUpdateMs();
long missTimeMs = Math.abs(lastUpdateMs - System.currentTimeMillis());
// be core or restart must in heartbeat_interval_second
if ((be.isAlive() || missTimeMs <= Config.heartbeat_interval_second * 1000L)
&& !be.isSmoothUpgradeSrc()) {
if (be.isDecommissioned()) {
decommissionAvailBes.add(be);
} else {
availableBes.add(be);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -409,19 +410,50 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
defaultCloudCluster = newDefaultCloudCluster;
}

public static String[] getDefaultComputeGroups(String value) {
if (!Config.enable_multi_default_compute_group || Strings.isNullOrEmpty(value)) {
return new String[]{value == null ? "" : value};
}
// single compute group in multi default compute group
if (!value.contains(",")) {
return new String[]{value};
}

return Arrays.stream(value.split(",")).filter(StringUtils::isNotBlank)
.map(StringUtils::trim).distinct().toArray(String[]::new);
}


private String checkCloudDefaultCluster(String[] keyArr, String value, String defaultComputeGroup, boolean isReplay)
throws ComputeGroupException, DdlException {
// isReplay not check auth, not throw exception
if (isReplay) {
return value;
}
// check cluster auth
if (!Strings.isNullOrEmpty(value) && !Env.getCurrentEnv().getAuth().checkCloudPriv(
new UserIdentity(qualifiedUser, "%"), value, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
throw new ComputeGroupException(String.format("set default compute group failed, "
+ "user %s has no permission to use compute group '%s', please grant use privilege first ",
qualifiedUser, value),
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_COMPUTE_GROUP);
String[] defaultComputeGroups = getDefaultComputeGroups(value);
if (Config.enable_multi_default_compute_group) {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
Arrays.stream(defaultComputeGroups).forEach(s -> {
sb.append(s).append(" ");
});
LOG.debug("get default compute groups: {}", sb.toString());
}
for (String computeGroup : defaultComputeGroups) {
// check auth
checkUserHasComputeGroupAuth(computeGroup);
}
// value = "cluster1", "cluster2"
if (defaultComputeGroups.length > 1) {
value = String.join(",", defaultComputeGroups);
}
} else {
// check value not contain ','
value = defaultComputeGroups[0];
if (value.contains(",")) {
throw new DdlException("not multi default compute groups should not contain ','");
}
checkUserHasComputeGroupAuth(value);
}
// set property "DEFAULT_CLOUD_CLUSTER" = "cluster1"
if (keyArr.length != 1) {
Expand All @@ -430,9 +462,22 @@ private String checkCloudDefaultCluster(String[] keyArr, String value, String de
if (value == null) {
value = "";
}
// in Config.enable_multi_default_compute_group = true, value maybe "cluster1" or "cluster1, cluster2"
// in Config.enable_multi_default_compute_group = false, value just "cluster1"
return value;
}

private void checkUserHasComputeGroupAuth(String value) throws ComputeGroupException {
// check cluster auth
if (!Strings.isNullOrEmpty(value) && !Env.getCurrentEnv().getAuth().checkCloudPriv(
new UserIdentity(qualifiedUser, "%"), value, PrivPredicate.USAGE, ResourceTypeEnum.CLUSTER)) {
throw new ComputeGroupException(String.format("set default compute group failed, "
+ "user %s has no permission to use compute group '%s', please grant use privilege first ",
qualifiedUser, value),
ComputeGroupException.FailedTypeEnum.CURRENT_USER_NO_AUTH_TO_USE_COMPUTE_GROUP);
}
}

private long getLongProperty(String key, String value, String[] keyArr, String propName) throws DdlException {
// eg: set property "load_mem_limit" = "2147483648";
if (keyArr.length != 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,15 @@ public List<String> getCloudClusterUsers(Set<String> users, String clusterName)
if (userProperty == null) {
return;
}
if (clusterName.equals(userProperty.getDefaultCloudCluster())) {
ret.add(ClusterNamespace.getNameFromFullName(u));
if (Config.enable_multi_default_compute_group) {
if (userProperty.getDefaultCloudCluster() != null
&& userProperty.getDefaultCloudCluster().contains(clusterName)) {
ret.add(ClusterNamespace.getNameFromFullName(u));
}
} else {
if (clusterName.equals(userProperty.getDefaultCloudCluster())) {
ret.add(ClusterNamespace.getNameFromFullName(u));
}
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.doris.qe.ConnectContext;

import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -37,6 +39,7 @@
* SetUserPropertyVarOp
*/
public class SetUserPropertyVarOp {
private static final Logger LOG = LogManager.getLogger(SetUserPropertyVarOp.class);
private final String user;
private final String key;
private final String value;
Expand Down Expand Up @@ -85,19 +88,17 @@ public void validate(ConnectContext ctx) throws UserException {
"GRANT");
}
if (Config.isCloudMode()) {
// check value, clusterName is valid.
if (key.equals(UserProperty.DEFAULT_CLOUD_CLUSTER)
&& !Strings.isNullOrEmpty(value)
&& !((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNames().contains(value)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
}

if (key.equals(UserProperty.DEFAULT_COMPUTE_GROUP)
&& !Strings.isNullOrEmpty(value)
&& !((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNames().contains(value)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
if (Config.enable_multi_default_compute_group) {
for (String defaultComputeGroup : UserProperty.getDefaultComputeGroups(value)) {
checkComputeGroupExist(key, defaultComputeGroup);
}
} else {
if (value.contains(",")) {
LOG.warn("set cloud default compute group error,"
+ " not enable multi default compute group, but use ',' split {}", value);
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, value);
}
checkComputeGroupExist(key, value);
}
}
return;
Expand All @@ -122,4 +123,15 @@ public String toSql() {
}
return sb.toString();
}

/** check cloud compute group exist **/
public static void checkComputeGroupExist(String key, String computeGroup) throws AnalysisException {
// check value, clusterName is valid.
if (key.equals(UserProperty.DEFAULT_CLOUD_CLUSTER) || key.equals(UserProperty.DEFAULT_COMPUTE_GROUP)
&& !Strings.isNullOrEmpty(computeGroup)
&& !((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getCloudClusterNames().contains(computeGroup)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_CLOUD_CLUSTER_ERROR, computeGroup);
}
}
}
Loading
Loading