From 71f96e6e06fc8ac0fc30545514c1d4d95ef205af Mon Sep 17 00:00:00 2001 From: wangbo Date: Tue, 10 Dec 2024 20:13:27 +0800 Subject: [PATCH] [Improvement]downgrade resource tag when there is not queryable replica (#44255) --- .../datasource/FederationBackendPolicy.java | 3 + .../apache/doris/httpv2/rest/LoadAction.java | 2 + .../org/apache/doris/mysql/MysqlProto.java | 3 +- .../apache/doris/mysql/privilege/Auth.java | 9 ++ .../mysql/privilege/CommonUserProperties.java | 11 +++ .../doris/mysql/privilege/UserProperty.java | 22 +++++ .../mysql/privilege/UserPropertyMgr.java | 8 ++ .../apache/doris/planner/OlapScanNode.java | 47 ++++++++- .../org/apache/doris/qe/ConnectContext.java | 8 +- .../org/apache/doris/qe/ConnectProcessor.java | 3 +- .../doris/system/BeSelectionPolicy.java | 33 +++++-- .../doris/planner/ResourceTagQueryTest.java | 12 ++- .../doris/system/SystemInfoServiceTest.java | 23 +++++ .../skip_rg_test_table.csv | 2 + .../test_resource_tag.groovy | 99 +++++++++++++++++++ 15 files changed, 269 insertions(+), 16 deletions(-) create mode 100644 regression-test/data/workload_manager_p0/skip_rg_test_table.csv create mode 100644 regression-test/suites/workload_manager_p0/test_resource_tag.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java index 1e1787c1f64947..4a24645bf3ee03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java @@ -155,6 +155,7 @@ public void init() throws UserException { public void init(List preLocations) throws UserException { Set tags = Sets.newHashSet(); + boolean allowResourceTagDowngrade = false; if (ConnectContext.get() != null && ConnectContext.get().getCurrentUserIdentity() != null) { String qualifiedUser = ConnectContext.get().getCurrentUserIdentity().getQualifiedUser(); // Some request from stream load(eg, mysql load) may not set user info in ConnectContext @@ -164,6 +165,7 @@ public void init(List preLocations) throws UserException { if (tags == UserProperty.INVALID_RESOURCE_TAGS) { throw new UserException("No valid resource tag for user: " + qualifiedUser); } + allowResourceTagDowngrade = Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser); } } else { if (LOG.isDebugEnabled()) { @@ -176,6 +178,7 @@ public void init(List preLocations) throws UserException { .needQueryAvailable() .needLoadAvailable() .addTags(tags) + .setAllowResourceTagDowngrade(allowResourceTagDowngrade) .preferComputeNode(Config.prefer_compute_node_for_external_table) .assignExpectBeNum(Config.min_backend_num_for_external_table) .addPreLocations(preLocations) diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 2f9efc1ed1b1bf..00348b2d83a61b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -425,8 +425,10 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ BeSelectionPolicy policy = null; String qualifiedUser = ConnectContext.get().getQualifiedUser(); Set userTags = Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser); + boolean allowResourceTagDowngrade = Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser); policy = new BeSelectionPolicy.Builder() .addTags(userTags) + .setAllowResourceTagDowngrade(allowResourceTagDowngrade) .setEnableRoundRobin(true) .needLoadAvailable().build(); policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java index a672a217a33795..c16cec5689afd4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlProto.java @@ -266,7 +266,8 @@ public static boolean negotiate(ConnectContext context) throws IOException { } // set resource tag if has - context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser)); + context.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser), + Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(qualifiedUser)); return true; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java index a1fb57d01cafa4..9d6f52d5a51d7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/Auth.java @@ -1234,6 +1234,15 @@ public Set getResourceTags(String qualifiedUser) { } } + public boolean isAllowResourceTagDowngrade(String qualifiedUser) { + readLock(); + try { + return propertyMgr.isAllowResourceTagDowngrade(qualifiedUser); + } finally { + readUnlock(); + } + } + public long getExecMemLimit(String qualifiedUser) { readLock(); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java index db838a91a56614..bd2c3d028231f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/CommonUserProperties.java @@ -69,6 +69,9 @@ public class CommonUserProperties implements Writable, GsonPostProcessable { @SerializedName(value = "wg", alternate = {"workloadGroup"}) private String workloadGroup = WorkloadGroupMgr.DEFAULT_GROUP_NAME; + @SerializedName(value = "ard", alternate = {"AllowResourceTagDowngrade"}) + private boolean allowResourceTagDowngrade = false; + private String[] sqlBlockRulesSplit = {}; long getMaxConn() { @@ -164,6 +167,14 @@ public void setWorkloadGroup(String workloadGroup) { this.workloadGroup = workloadGroup; } + public void setAllowResourceTagDowngrade(boolean allowResourceTagDowngrade) { + this.allowResourceTagDowngrade = allowResourceTagDowngrade; + } + + public boolean isAllowResourceTagDowngrade() { + return this.allowResourceTagDowngrade; + } + @Deprecated public static CommonUserProperties read(DataInput in) throws IOException { String json = Text.readString(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java index e4a76b23820b3b..08f64cc006e3d9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserProperty.java @@ -86,6 +86,8 @@ public class UserProperty implements Writable { public static final String PROP_WORKLOAD_GROUP = "default_workload_group"; + public static final String PROP_ALLOW_RESOURCE_TAG_DOWNGRADE = "allow_resource_tag_downgrade"; + public static final String DEFAULT_CLOUD_CLUSTER = "default_cloud_cluster"; public static final String DEFAULT_COMPUTE_GROUP = "default_compute_group"; @@ -139,6 +141,8 @@ public class UserProperty implements Writable { ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_EXEC_MEM_LIMIT + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_QUERY_TIMEOUT + "$", Pattern.CASE_INSENSITIVE)); ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_INSERT_TIMEOUT + "$", Pattern.CASE_INSENSITIVE)); + ADVANCED_PROPERTIES.add( + Pattern.compile("^" + PROP_ALLOW_RESOURCE_TAG_DOWNGRADE + "$", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE)); COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE)); @@ -201,6 +205,10 @@ public Set getCopiedResourceTags() { return Sets.newHashSet(this.commonProperties.getResourceTags()); } + public boolean isAllowResourceTagDowngrade() { + return this.commonProperties.isAllowResourceTagDowngrade(); + } + public long getExecMemLimit() { return commonProperties.getExecMemLimit(); } @@ -221,6 +229,7 @@ public void update(List> properties, boolean isReplay) thro int queryTimeout = this.commonProperties.getQueryTimeout(); int insertTimeout = this.commonProperties.getInsertTimeout(); String workloadGroup = this.commonProperties.getWorkloadGroup(); + boolean allowResourceTagDowngrade = this.commonProperties.isAllowResourceTagDowngrade(); String newDefaultLoadCluster = defaultLoadCluster; String newDefaultCloudCluster = defaultCloudCluster; @@ -358,6 +367,15 @@ public void update(List> properties, boolean isReplay) thro throw new DdlException("workload group " + value + " not exists"); } workloadGroup = value; + } else if (keyArr[0].equalsIgnoreCase(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE)) { + if (keyArr.length != 1) { + throw new DdlException(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE + " format error"); + } + if (!"true".equalsIgnoreCase(value) && !"false".equalsIgnoreCase(value)) { + throw new DdlException( + "allow_resource_tag_downgrade's value must be true or false"); + } + allowResourceTagDowngrade = Boolean.parseBoolean(value); } else { if (isReplay) { // After using SET PROPERTY to modify the user property, if FE rolls back to a version without @@ -381,6 +399,7 @@ public void update(List> properties, boolean isReplay) thro this.commonProperties.setQueryTimeout(queryTimeout); this.commonProperties.setInsertTimeout(insertTimeout); this.commonProperties.setWorkloadGroup(workloadGroup); + this.commonProperties.setAllowResourceTagDowngrade(allowResourceTagDowngrade); if (newDppConfigs.containsKey(newDefaultLoadCluster)) { defaultLoadCluster = newDefaultLoadCluster; } else { @@ -546,6 +565,9 @@ public List> fetchProperty() { result.add(Lists.newArrayList(PROP_WORKLOAD_GROUP, String.valueOf(commonProperties.getWorkloadGroup()))); + result.add(Lists.newArrayList(PROP_ALLOW_RESOURCE_TAG_DOWNGRADE, + String.valueOf(commonProperties.isAllowResourceTagDowngrade()))); + // load cluster if (defaultLoadCluster != null) { result.add(Lists.newArrayList(PROP_DEFAULT_LOAD_CLUSTER, defaultLoadCluster)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index 29ae1f438a10f3..b40bb92fbfae7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -179,6 +179,14 @@ public Set getResourceTags(String qualifiedUser) { return tags; } + public boolean isAllowResourceTagDowngrade(String qualifiedUser) { + UserProperty existProperty = propertyMap.get(qualifiedUser); + if (existProperty == null) { + return false; + } + return existProperty.isAllowResourceTagDowngrade(); + } + public Pair getLoadClusterInfo(String qualifiedUser, String cluster) throws DdlException { Pair loadClusterInfo = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 41c055062e3e9b..92175523f227a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -888,6 +888,21 @@ private void addScanRangeLocations(Partition partition, int replicaInTablet = 0; long oneReplicaBytes = 0; + + + // when resource tag has no alive replica and allowResourceTagDowngrade = true, + // resource tag should be disabled, we should find at least one alive replica + boolean shouldSkipResourceTag = false; + boolean isAllowRgDowngrade = context.isAllowResourceTagDowngrade(); + if (needCheckTags && isAllowRgDowngrade && !checkTagHasAvailReplica(allowedTags, replicas)) { + shouldSkipResourceTag = true; + if (ConnectContext.get() != null && LOG.isDebugEnabled()) { + LOG.debug("query {} skip resource tag for table {}.", + DebugUtil.printId(ConnectContext.get().queryId()), + olapTable != null ? olapTable.getId() : -1); + } + } + for (Replica replica : replicas) { Backend backend = null; long backendId = -1; @@ -906,14 +921,15 @@ private void addScanRangeLocations(Partition partition, replica.getId()); } String err = "replica " + replica.getId() + "'s backend " + backendId - + " does not exist or not alive"; + + " with tag " + backend.getLocationTag() + " does not exist or not alive"; errs.add(err); continue; } if (!backend.isMixNode()) { continue; } - if (needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains(backend.getLocationTag())) { + if (!shouldSkipResourceTag && needCheckTags && !allowedTags.isEmpty() && !allowedTags.contains( + backend.getLocationTag())) { String err = String.format( "Replica on backend %d with tag %s," + " which is not in user's resource tags: %s", backend.getId(), backend.getLocationTag(), allowedTags); @@ -954,6 +970,10 @@ private void addScanRangeLocations(Partition partition, throw new UserException("tablet " + tabletId + " err: " + Joiner.on(", ").join(errs)); } if (tabletIsNull) { + if (needCheckTags && !isAllowRgDowngrade) { + errs.add("If user specified tag has no queryable replica, " + + "you can set property 'allow_resource_tag_downgrade'='true' to skip resource tag."); + } throw new UserException("tablet " + tabletId + " has no queryable replicas. err: " + Joiner.on(", ").join(errs)); } @@ -974,6 +994,29 @@ private void addScanRangeLocations(Partition partition, } } + private boolean checkTagHasAvailReplica(Set allowedTags, List replicas) { + try { + for (Replica replica : replicas) { + long backendId = replica.getBackendId(); + Backend backend = Env.getCurrentSystemInfo().getBackend(backendId); + + if (backend == null || !backend.isAlive()) { + continue; + } + if (!backend.isMixNode()) { + continue; + } + if (!allowedTags.isEmpty() && allowedTags.contains(backend.getLocationTag())) { + return true; + } + } + return false; + } catch (Throwable t) { + LOG.warn("error happens when check resource tag has avail replica ", t); + return true; + } + } + private boolean isEnableCooldownReplicaAffinity() { ConnectContext connectContext = ConnectContext.get(); if (connectContext != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index c81cf4920e1f7d..bfbb15a7a7a783 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -203,6 +203,7 @@ public enum ConnectType { // This property is obtained from UserProperty when the client connection is created. // Only when the connection is created again, the new resource tags will be retrieved from the UserProperty private Set resourceTags = Sets.newHashSet(); + private boolean allowResourceTagDowngrade = false; // If set to true, the resource tags set in resourceTags will be used to limit the query resources. // If set to false, the system will not restrict query resources. private boolean isResourceTagsSet = false; @@ -999,9 +1000,14 @@ public Set getResourceTags() { return resourceTags; } - public void setResourceTags(Set resourceTags) { + public boolean isAllowResourceTagDowngrade() { + return allowResourceTagDowngrade; + } + + public void setResourceTags(Set resourceTags, boolean allowResourceTagDowngrade) { this.resourceTags = resourceTags; this.isResourceTagsSet = !this.resourceTags.isEmpty(); + this.allowResourceTagDowngrade = allowResourceTagDowngrade; } public void setCurrentConnectedFEIp(String ip) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index fcc6c2362cf276..0c633186abf8ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -663,7 +663,8 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException } // set resource tag - ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser)); + ctx.setResourceTags(Env.getCurrentEnv().getAuth().getResourceTags(ctx.qualifiedUser), + Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(ctx.qualifiedUser)); ctx.setThreadLocalInfo(); StmtExecutor executor = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 59cb4f19139b46..dbe16853bfde4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -44,6 +44,7 @@ public class BeSelectionPolicy { public boolean needLoadAvailable = false; // Resource tag. Empty means no need to consider resource tag. public Set resourceTags = Sets.newHashSet(); + public boolean allowResourceTagDowngrade = false; // storage medium. null means no need to consider storage medium. public TStorageMedium storageMedium = null; // Check if disk usage reaches limit. false means no need to check. @@ -92,6 +93,11 @@ public Builder addTags(Set tags) { return this; } + public Builder setAllowResourceTagDowngrade(boolean allowResourceTagDowngrade) { + policy.allowResourceTagDowngrade = allowResourceTagDowngrade; + return this; + } + public Builder setStorageMedium(TStorageMedium medium) { policy.storageMedium = medium; return this; @@ -137,7 +143,7 @@ public BeSelectionPolicy build() { } } - private boolean isMatch(Backend backend) { + private boolean isMatch(Backend backend, boolean needResourceTagAvail) { // Compute node is only used when preferComputeNode is set. if (!preferComputeNode && backend.isComputeNode()) { if (LOG.isDebugEnabled()) { @@ -146,10 +152,11 @@ private boolean isMatch(Backend backend) { return false; } - if (needScheduleAvailable && !backend.isScheduleAvailable() || needQueryAvailable && !backend.isQueryAvailable() - || needLoadAvailable && !backend.isLoadAvailable() || !resourceTags.isEmpty() && !resourceTags.contains( - backend.getLocationTag()) || storageMedium != null && !backend.hasSpecifiedStorageMedium( - storageMedium)) { + if (needScheduleAvailable && !backend.isScheduleAvailable() + || needQueryAvailable && !backend.isQueryAvailable() + || needLoadAvailable && !backend.isLoadAvailable() + || (needResourceTagAvail && !resourceTags.isEmpty() && !resourceTags.contains(backend.getLocationTag())) + || storageMedium != null && !backend.hasSpecifiedStorageMedium(storageMedium)) { if (LOG.isDebugEnabled()) { LOG.debug("Backend [{}] is not match by Other rules, policy: [{}]", backend.getHost(), this); } @@ -176,7 +183,16 @@ private boolean isMatch(Backend backend) { } public List getCandidateBackends(Collection backends) { - List filterBackends = backends.stream().filter(this::isMatch).collect(Collectors.toList()); + boolean needResourceTagAvail = !this.allowResourceTagDowngrade || backends.stream() + .filter(backend -> resourceTags.contains(backend.getLocationTag()) && backend.isAlive()) + .count() != 0; + List filterBackends = new ArrayList<>(); + for (Backend be : backends) { + if (this.isMatch(be, needResourceTagAvail)) { + filterBackends.add(be); + } + } + List preLocationFilterBackends = filterBackends.stream() .filter(iterm -> preferredLocations.contains(iterm.getHost())).collect(Collectors.toList()); // If preLocations were chosen, use the preLocation backends. Otherwise we just ignore this filter. @@ -221,8 +237,9 @@ public List getCandidateBackends(Collection backends) { @Override public String toString() { - return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s | medium=%s", + return String.format("computeNode=%s | query=%s | load=%s | schedule=%s | tags=%s(%s) | medium=%s", preferComputeNode, needQueryAvailable, needLoadAvailable, needScheduleAvailable, - resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), storageMedium); + resourceTags.stream().map(tag -> tag.toString()).collect(Collectors.joining(",")), + this.allowResourceTagDowngrade, storageMedium); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 850d8b27b062af..9568841134ad45 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -205,7 +205,7 @@ public void test() throws Exception { Assert.assertEquals(1, userTags.size()); // update connection context and query - connectContext.setResourceTags(userTags); + connectContext.setResourceTags(userTags, false); String queryStr = "explain select * from test.tbl1"; String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); System.out.println(explainString); @@ -221,7 +221,7 @@ public void test() throws Exception { } // update connection context and query, it will failed because no zone1 backend - connectContext.setResourceTags(userTags); + connectContext.setResourceTags(userTags, false); Assert.assertTrue(connectContext.isResourceTagsSet()); queryStr = "explain select * from test.tbl1"; String error = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); @@ -280,7 +280,13 @@ public void test() throws Exception { Assert.assertEquals(1000000, execMemLimit); List> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER); - Assert.assertEquals(13, userProps.size()); + Assert.assertEquals(14, userProps.size()); + + // set resource tag downgrade + String setResourceTagDownStr = "set property for 'root' 'allow_resource_tag_downgrade' = 'false';"; + ExceptionChecker.expectThrowsNoException(() -> setProperty(setResourceTagDownStr)); + boolean tagDowngrade = Env.getCurrentEnv().getAuth().isAllowResourceTagDowngrade(Auth.ROOT_USER); + Assert.assertTrue(!tagDowngrade); // now : // be1 be2 be3 ==>tag1; diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 62ade50c9198f6..f582c37706c77b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -333,6 +333,29 @@ public void testComputeNodeBackendSelect() throws Exception { Assert.assertEquals(3, infoService.selectBackendIdsByPolicy(policy07, 3).size()); } + @Test + public void testResourceTagDowngrade() throws Exception { + Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga"); + addBackend(10001, "192.168.1.1", 9050); + Backend be1 = infoService.getBackend(10001); + be1.setTagMap(taga.toMap()); + be1.setAlive(true); + + addBackend(10002, "192.168.1.2", 9050); + Backend be2 = infoService.getBackend(10002); + be2.setAlive(true); + + BeSelectionPolicy policy1 = new BeSelectionPolicy.Builder().addTags(Sets.newHashSet(taga)).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy1, 1).size()); + + be1.setAlive(false); + Assert.assertEquals(0, infoService.selectBackendIdsByPolicy(policy1, 1).size()); + + BeSelectionPolicy policy2 = new BeSelectionPolicy.Builder().setAllowResourceTagDowngrade(true) + .addTags(Sets.newHashSet(taga)).build(); + Assert.assertEquals(1, infoService.selectBackendIdsByPolicy(policy2, 1).size()); + } + @Test public void testPreferLocationsSelect() throws Exception { Tag taga = Tag.create(Tag.TYPE_LOCATION, "taga"); diff --git a/regression-test/data/workload_manager_p0/skip_rg_test_table.csv b/regression-test/data/workload_manager_p0/skip_rg_test_table.csv new file mode 100644 index 00000000000000..edcc00b603a95e --- /dev/null +++ b/regression-test/data/workload_manager_p0/skip_rg_test_table.csv @@ -0,0 +1,2 @@ +1|2 +3|4 \ No newline at end of file diff --git a/regression-test/suites/workload_manager_p0/test_resource_tag.groovy b/regression-test/suites/workload_manager_p0/test_resource_tag.groovy new file mode 100644 index 00000000000000..d6557645eb8759 --- /dev/null +++ b/regression-test/suites/workload_manager_p0/test_resource_tag.groovy @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_resource_tag") { + sql "drop user if exists test_rg;" + sql "create user test_rg" + sql "GRANT SELECT_PRIV,LOAD_PRIV,ALTER_PRIV,CREATE_PRIV,DROP_PRIV ON *.*.* TO test_rg;" + sql "set property for test_rg 'resource_tags.location' = 'c3p0';" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_rg"""; + } + + // test query + connect(user = 'test_rg', password = '', url = context.config.jdbcUrl) { + sql "drop table if exists test_skip_rg_bad_replica_tab;" + sql """ + CREATE TABLE test_skip_rg_bad_replica_tab + ( + k1 int, + k2 int, + )ENGINE=OLAP + duplicate KEY(k1) + DISTRIBUTED BY HASH (k1) BUCKETS 1 + PROPERTIES( + 'replication_allocation' = 'tag.location.default: 1' + ); + """ + sql """ + insert into test_skip_rg_bad_replica_tab values + (9,10), + (1,2) + """ + test { + sql "select count(1) as t1 from test_skip_rg_bad_replica_tab;" + exception "which is not in user's resource tags: [{\"location\" : \"c3p0\"}], If user specified tag has no queryable replica, you can set property 'allow_resource_tag_downgrade'='true' to skip resource tag." + } + } + sql "set property for test_rg 'allow_resource_tag_downgrade' = 'true';" + + connect(user = 'test_rg', password = '', url = context.config.jdbcUrl) { + sql "select count(1) as t2 from test_skip_rg_bad_replica_tab;" + sql "drop table test_skip_rg_bad_replica_tab"; + } + + + // test stream load + sql "set property for test_rg 'allow_resource_tag_downgrade' = 'false';" + sql """ DROP TABLE IF EXISTS ${context.config.defaultDb}.skip_rg_test_table """ + sql """ + CREATE TABLE IF NOT EXISTS ${context.config.defaultDb}.skip_rg_test_table ( + `k1` int NULL, + `k2` int NULL + ) ENGINE=OLAP + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + def test_failed_command = "curl --location-trusted -u test_rg: -H column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2 -T ${context.dataPath}/skip_rg_test_table.csv http://${context.config.feHttpAddress}/api/${context.config.defaultDb}/skip_rg_test_table/_stream_load" + log.info("stream load skip_rg_test_table failed test cmd: ${test_failed_command}") + def process = test_failed_command.execute() + code1 = process.waitFor() + out1 = process.text + log.info("stream load skip_rg_test_table failed test result, ${out1}".toString()) + assertTrue("${out1}".toString().contains("No backend load available") || "${out1}".toString().contains("No available backends")) + + sql "set property for test_rg 'allow_resource_tag_downgrade' = 'true';" + + def test_succ_command = "curl --location-trusted -u test_rg: -H column_separator:| -H Transfer-Encoding:chunked -H columns:k1,k2 -T ${context.dataPath}/skip_rg_test_table.csv http://${context.config.feHttpAddress}/api/${context.config.defaultDb}/skip_rg_test_table/_stream_load" + def process2 = test_succ_command.execute() + code2 = process2.waitFor() + out2 = process2.text + jsonRet = parseJson(out2) + log.info("stream load skip_rg_test_table succ test result, ${out2}".toString()) + assertFalse("${out2}".toString().contains("No backend load available")) + assertTrue(jsonRet['Status'] == 'Success') + + + // clear + sql "drop user test_rg" + sql "drop table ${context.config.defaultDb}.skip_rg_test_table" +} \ No newline at end of file