Skip to content

Commit

Permalink
[Improvement]downgrade resource tag when there is not queryable repli…
Browse files Browse the repository at this point in the history
…ca (#44255)
  • Loading branch information
wangbo authored Dec 10, 2024
1 parent 54fbbef commit 71f96e6
Show file tree
Hide file tree
Showing 15 changed files with 269 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void init() throws UserException {

public void init(List<String> preLocations) throws UserException {
Set<Tag> tags = Sets.newHashSet();
boolean allowResourceTagDowngrade = false;
if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
// Some request from stream load(eg, mysql load) may not set user info in ConnectContext
Expand All @@ -164,6 +165,7 @@ public void init(List<String> preLocations) throws UserException {
if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
throw new UserException("No valid resource tag for user: " + qualifiedUser);
}
allowResourceTagDowngrade = Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser);
}
} else {
if (LOG.isDebugEnabled()) {
Expand All @@ -176,6 +178,7 @@ public void init(List<String> preLocations) throws UserException {
.needQueryAvailable()
.needLoadAvailable()
.addTags(tags)
.setAllowResourceTagDowngrade(allowResourceTagDowngrade)
.preferComputeNode(Config.prefer_compute_node_for_external_table)
.assignExpectBeNum(Config.min_backend_num_for_external_table)
.addPreLocations(preLocations)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,10 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
BeSelectionPolicy policy = null;
String qualifiedUser = ConnectContext.get().getQualifiedUser();
Set<Tag> userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
boolean allowResourceTagDowngrade = Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser);
policy = new BeSelectionPolicy.Builder()
.addTags(userTags)
.setAllowResourceTagDowngrade(allowResourceTagDowngrade)
.setEnableRoundRobin(true)
.needLoadAvailable().build();
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ public static boolean negotiate(ConnectContext context) throws IOException {
}

// set resource tag if has
context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser));
context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser),
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,15 @@ public Set<Tag> getResourceTags(String qualifiedUser) {
}
}

public boolean isAllowResourceTagDowngrade(String qualifiedUser) {
readLock();
try {
return propertyMgr.isAllowResourceTagDowngrade(qualifiedUser);
} finally {
readUnlock();
}
}

public long getExecMemLimit(String qualifiedUser) {
readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class CommonUserProperties implements Writable, GsonPostProcessable {
@SerializedName(value = "wg", alternate = {"workloadGroup"})
private String workloadGroup = WorkloadGroupMgr.DEFAULT_GROUP_NAME;

@SerializedName(value = "ard", alternate = {"AllowResourceTagDowngrade"})
private boolean allowResourceTagDowngrade = false;

private String[] sqlBlockRulesSplit = {};

long getMaxConn() {
Expand Down Expand Up @@ -164,6 +167,14 @@ public void setWorkloadGroup(String workloadGroup) {
this.workloadGroup = workloadGroup;
}

public void setAllowResourceTagDowngrade(boolean allowResourceTagDowngrade) {
this.allowResourceTagDowngrade = allowResourceTagDowngrade;
}

public boolean isAllowResourceTagDowngrade() {
return this.allowResourceTagDowngrade;
}

@Deprecated
public static CommonUserProperties read(DataInput in) throws IOException {
String json = Text.readString(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class UserProperty implements Writable {

public static final String PROP_WORKLOAD_GROUP = "default_workload_group";

public static final String PROP_ALLOW_RESOURCE_TAG_DOWNGRADE = "allow_resource_tag_downgrade";

public static final String DEFAULT_CLOUD_CLUSTER = "default_cloud_cluster";
public static final String DEFAULT_COMPUTE_GROUP = "default_compute_group";

Expand Down Expand Up @@ -139,6 +141,8 @@ public class UserProperty implements Writable {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_EXEC_MEM_LIMIT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_QUERY_TIMEOUT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_INSERT_TIMEOUT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(
Pattern.compile("^" + PROP_ALLOW_RESOURCE_TAG_DOWNGRADE + "$", Pattern.CASE_INSENSITIVE));

COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE));
Expand Down Expand Up @@ -201,6 +205,10 @@ public Set<Tag> getCopiedResourceTags() {
return Sets.newHashSet(this.commonProperties.getResourceTags());
}

public boolean isAllowResourceTagDowngrade() {
return this.commonProperties.isAllowResourceTagDowngrade();
}

public long getExecMemLimit() {
return commonProperties.getExecMemLimit();
}
Expand All @@ -221,6 +229,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
int queryTimeout = this.commonProperties.getQueryTimeout();
int insertTimeout = this.commonProperties.getInsertTimeout();
String workloadGroup = this.commonProperties.getWorkloadGroup();
boolean allowResourceTagDowngrade = this.commonProperties.isAllowResourceTagDowngrade();

String newDefaultLoadCluster = defaultLoadCluster;
String newDefaultCloudCluster = defaultCloudCluster;
Expand Down Expand Up @@ -358,6 +367,15 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
throw new DdlException("workload group " + value + " not exists");
}
workloadGroup = value;
} else if (keyArr[0].equalsIgnoreCase(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE)) {
if (keyArr.length != 1) {
throw new DdlException(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE + " format error");
}
if (!"true".equalsIgnoreCase(value) && !"false".equalsIgnoreCase(value)) {
throw new DdlException(
"allow_resource_tag_downgrade's value must be true or false");
}
allowResourceTagDowngrade = Boolean.parseBoolean(value);
} else {
if (isReplay) {
// After using SET PROPERTY to modify the user property, if FE rolls back to a version without
Expand All @@ -381,6 +399,7 @@ public void update(List<Pair<String, String>> properties, boolean isReplay) thro
this.commonProperties.setQueryTimeout(queryTimeout);
this.commonProperties.setInsertTimeout(insertTimeout);
this.commonProperties.setWorkloadGroup(workloadGroup);
this.commonProperties.setAllowResourceTagDowngrade(allowResourceTagDowngrade);
if (newDppConfigs.containsKey(newDefaultLoadCluster)) {
defaultLoadCluster = newDefaultLoadCluster;
} else {
Expand Down Expand Up @@ -546,6 +565,9 @@ public List<List<String>> fetchProperty() {

result.add(Lists.newArrayList(PROP_WORKLOAD_GROUP, String.valueOf(commonProperties.getWorkloadGroup())));

result.add(Lists.newArrayList(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE,
String.valueOf(commonProperties.isAllowResourceTagDowngrade())));

// load cluster
if (defaultLoadCluster != null) {
result.add(Lists.newArrayList(PROP_DEFAULT_LOAD_CLUSTER, defaultLoadCluster));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ public Set<Tag> getResourceTags(String qualifiedUser) {
return tags;
}

public boolean isAllowResourceTagDowngrade(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
if (existProperty == null) {
return false;
}
return existProperty.isAllowResourceTagDowngrade();
}

public Pair<String, DppConfig> getLoadClusterInfo(String qualifiedUser, String cluster) throws DdlException {
Pair<String, DppConfig> loadClusterInfo = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,21 @@ private void addScanRangeLocations(Partition partition,

int replicaInTablet = 0;
long oneReplicaBytes = 0;


// when resource tag has no alive replica and allowResourceTagDowngrade = true,
// resource tag should be disabled, we should find at least one alive replica
boolean shouldSkipResourceTag = false;
boolean isAllowRgDowngrade = context.isAllowResourceTagDowngrade();
if (needCheckTags && isAllowRgDowngrade && !checkTagHasAvailReplica(allowedTags, replicas)) {
shouldSkipResourceTag = true;
if (ConnectContext.get() != null && LOG.isDebugEnabled()) {
LOG.debug("query {} skip resource tag for table {}.",
DebugUtil.printId(ConnectContext.get().queryId()),
olapTable != null ? olapTable.getId() : -1);
}
}

for (Replica replica : replicas) {
Backend backend = null;
long backendId = -1;
Expand All @@ -906,14 +921,15 @@ private void addScanRangeLocations(Partition partition,
replica.getId());
}
String err = "replica " + replica.getId() + "'s backend " + backendId
+ " does not exist or not alive";
+ " with tag " + backend.getLocationTag() + " does not exist or not alive";
errs.add(err);
continue;
}
if (!backend.isMixNode()) {
continue;
}
if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) {
if (!shouldSkipResourceTag && needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(
backend.getLocationTag())) {
String err = String.format(
"Replica on backend %d with tag %s," + " which is not in user's resource tags: %s",
backend.getId(), backend.getLocationTag(), allowedTags);
Expand Down Expand Up @@ -954,6 +970,10 @@ private void addScanRangeLocations(Partition partition,
throw new UserException("tablet " + tabletId + " err: " + Joiner.on(", ").join(errs));
}
if (tabletIsNull) {
if (needCheckTags && !isAllowRgDowngrade) {
errs.add("If user specified tag has no queryable replica, "
+ "you can set property 'allow_resource_tag_downgrade'='true' to skip resource tag.");
}
throw new UserException("tablet " + tabletId + " has no queryable replicas. err: "
+ Joiner.on(", ").join(errs));
}
Expand All @@ -974,6 +994,29 @@ private void addScanRangeLocations(Partition partition,
}
}

private boolean checkTagHasAvailReplica(Set<Tag> allowedTags, List<Replica> replicas) {
try {
for (Replica replica : replicas) {
long backendId = replica.getBackendId();
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);

if (backend == null || !backend.isAlive()) {
continue;
}
if (!backend.isMixNode()) {
continue;
}
if (!allowedTags.isEmpty() && allowedTags.contains(backend.getLocationTag())) {
return true;
}
}
return false;
} catch (Throwable t) {
LOG.warn("error happens when check resource tag has avail replica ", t);
return true;
}
}

private boolean isEnableCooldownReplicaAffinity() {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public enum ConnectType {
// This property is obtained from UserProperty when the client connection is created.
// Only when the connection is created again, the new resource tags will be retrieved from the UserProperty
private Set<Tag> resourceTags = Sets.newHashSet();
private boolean allowResourceTagDowngrade = false;
// If set to true, the resource tags set in resourceTags will be used to limit the query resources.
// If set to false, the system will not restrict query resources.
private boolean isResourceTagsSet = false;
Expand Down Expand Up @@ -999,9 +1000,14 @@ public Set<Tag> getResourceTags() {
return resourceTags;
}

public void setResourceTags(Set<Tag> resourceTags) {
public boolean isAllowResourceTagDowngrade() {
return allowResourceTagDowngrade;
}

public void setResourceTags(Set<Tag> resourceTags, boolean allowResourceTagDowngrade) {
this.resourceTags = resourceTags;
this.isResourceTagsSet = !this.resourceTags.isEmpty();
this.allowResourceTagDowngrade = allowResourceTagDowngrade;
}

public void setCurrentConnectedFEIp(String ip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException
}

// set resource tag
ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser));
ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser),
Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(ctx.qualifiedUser));

ctx.setThreadLocalInfo();
StmtExecutor executor = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class BeSelectionPolicy {
public boolean needLoadAvailable = false;
// Resource tag. Empty means no need to consider resource tag.
public Set<Tag> resourceTags = Sets.newHashSet();
public boolean allowResourceTagDowngrade = false;
// storage medium. null means no need to consider storage medium.
public TStorageMedium storageMedium = null;
// Check if disk usage reaches limit. false means no need to check.
Expand Down Expand Up @@ -92,6 +93,11 @@ public Builder addTags(Set<Tag> tags) {
return this;
}

public Builder setAllowResourceTagDowngrade(boolean allowResourceTagDowngrade) {
policy.allowResourceTagDowngrade = allowResourceTagDowngrade;
return this;
}

public Builder setStorageMedium(TStorageMedium medium) {
policy.storageMedium = medium;
return this;
Expand Down Expand Up @@ -137,7 +143,7 @@ public BeSelectionPolicy build() {
}
}

private boolean isMatch(Backend backend) {
private boolean isMatch(Backend backend, boolean needResourceTagAvail) {
// Compute node is only used when preferComputeNode is set.
if (!preferComputeNode && backend.isComputeNode()) {
if (LOG.isDebugEnabled()) {
Expand All @@ -146,10 +152,11 @@ private boolean isMatch(Backend backend) {
return false;
}

if (needScheduleAvailable && !backend.isScheduleAvailable() || needQueryAvailable && !backend.isQueryAvailable()
|| needLoadAvailable && !backend.isLoadAvailable() || !resourceTags.isEmpty() && !resourceTags.contains(
backend.getLocationTag()) || storageMedium != null && !backend.hasSpecifiedStorageMedium(
storageMedium)) {
if (needScheduleAvailable && !backend.isScheduleAvailable()
|| needQueryAvailable && !backend.isQueryAvailable()
|| needLoadAvailable && !backend.isLoadAvailable()
|| (needResourceTagAvail && !resourceTags.isEmpty() && !resourceTags.contains(backend.getLocationTag()))
|| storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backend [{}] is not match by Other rules, policy: [{}]", backend.getHost(), this);
}
Expand All @@ -176,7 +183,16 @@ private boolean isMatch(Backend backend) {
}

public List<Backend> getCandidateBackends(Collection<Backend> backends) {
List<Backend> filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList());
boolean needResourceTagAvail = !this.allowResourceTagDowngrade || backends.stream()
.filter(backend -> resourceTags.contains(backend.getLocationTag()) && backend.isAlive())
.count() != 0;
List<Backend> filterBackends = new ArrayList<>();
for (Backend be : backends) {
if (this.isMatch(be, needResourceTagAvail)) {
filterBackends.add(be);
}
}

List<Backend> preLocationFilterBackends = filterBackends.stream()
.filter(iterm -> preferredLocations.contains(iterm.getHost())).collect(Collectors.toList());
// If preLocations were chosen, use the preLocation backends. Otherwise we just ignore this filter.
Expand Down Expand Up @@ -221,8 +237,9 @@ public List<Backend> getCandidateBackends(Collection<Backend> backends) {

@Override
public String toString() {
return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s",
return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s(%s) | medium=%s",
preferComputeNode, needQueryAvailable, needLoadAvailable, needScheduleAvailable,
resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), storageMedium);
resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")),
this.allowResourceTagDowngrade, storageMedium);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void test() throws Exception {
Assert.assertEquals(1, userTags.size());

// update connection context and query
connectContext.setResourceTags(userTags);
connectContext.setResourceTags(userTags, false);
String queryStr = "explain select * from test.tbl1";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
System.out.println(explainString);
Expand All @@ -221,7 +221,7 @@ public void test() throws Exception {
}

// update connection context and query, it will failed because no zone1 backend
connectContext.setResourceTags(userTags);
connectContext.setResourceTags(userTags, false);
Assert.assertTrue(connectContext.isResourceTagsSet());
queryStr = "explain select * from test.tbl1";
String error = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
Expand Down Expand Up @@ -280,7 +280,13 @@ public void test() throws Exception {
Assert.assertEquals(1000000, execMemLimit);

List<List<String>> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER);
Assert.assertEquals(13, userProps.size());
Assert.assertEquals(14, userProps.size());

// set resource tag downgrade
String setResourceTagDownStr = "set property for 'root' 'allow_resource_tag_downgrade' = 'false';";
ExceptionChecker.expectThrowsNoException(() -> setProperty(setResourceTagDownStr));
boolean tagDowngrade = Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(Auth.ROOT_USER);
Assert.assertTrue(!tagDowngrade);

// now :
// be1 be2 be3 ==>tag1;
Expand Down
Loading

0 comments on commit 71f96e6

Please sign in to comment.