diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 3d797988046108..11796c5df3b5b3 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -495,6 +495,10 @@ unsupportedCancelStatement supportedAdminStatement : ADMIN SHOW REPLICA DISTRIBUTION FROM baseTableRef #adminShowReplicaDistribution + | ADMIN REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL + (COMMA backends+=STRING_LITERAL)* RIGHT_PAREN)? #adminRebalanceDisk + | ADMIN CANCEL REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL + (COMMA backends+=STRING_LITERAL)* RIGHT_PAREN)? #adminCancelRebalanceDisk | ADMIN DIAGNOSE TABLET tabletId=INTEGER_VALUE #adminDiagnoseTablet | ADMIN SHOW REPLICA STATUS FROM baseTableRef (WHERE STATUS EQ|NEQ STRING_LITERAL)? #adminShowReplicaStatus | ADMIN COMPACT TABLE baseTableRef (WHERE TYPE EQ STRING_LITERAL)? #adminCompactTable @@ -520,10 +524,6 @@ unsupportedAdminStatement | ADMIN CANCEL REPAIR TABLE baseTableRef #adminCancelRepairTable | ADMIN SET (FRONTEND | (ALL FRONTENDS)) CONFIG (LEFT_PAREN propertyItemList RIGHT_PAREN)? ALL? #adminSetFrontendConfig - | ADMIN REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL - (COMMA backends+=STRING_LITERAL) RIGHT_PAREN)? #adminRebalanceDisk - | ADMIN CANCEL REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL - (COMMA backends+=STRING_LITERAL) RIGHT_PAREN)? #adminCancelRebalanceDisk | ADMIN SET TABLE name=multipartIdentifier PARTITION VERSION properties=propertyClause? #adminSetPartitionVersion | ADMIN COPY TABLET tabletId=INTEGER_VALUE properties=propertyClause? #adminCopyTablet diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index bb88afd2b6b040..504e1d36a65af3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -317,6 +317,14 @@ public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) { diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS()); } + public synchronized void rebalanceDisk(List backends, long timeoutS) { + diskRebalancer.addPrioBackends(backends, timeoutS); + } + + public synchronized void cancelRebalanceDisk(List backends) { + diskRebalancer.removePrioBackends(backends); + } + public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) { diskRebalancer.removePrioBackends(stmt.getBackends()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 921e3014c9053a..085ad4458d0b3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -50,9 +50,11 @@ import org.apache.doris.mtmv.MTMVRefreshTriggerInfo; import org.apache.doris.nereids.DorisParser; import org.apache.doris.nereids.DorisParser.AddConstraintContext; +import org.apache.doris.nereids.DorisParser.AdminCancelRebalanceDiskContext; import org.apache.doris.nereids.DorisParser.AdminCheckTabletsContext; import org.apache.doris.nereids.DorisParser.AdminCompactTableContext; import org.apache.doris.nereids.DorisParser.AdminDiagnoseTabletContext; +import org.apache.doris.nereids.DorisParser.AdminRebalanceDiskContext; import org.apache.doris.nereids.DorisParser.AdminShowReplicaDistributionContext; import org.apache.doris.nereids.DorisParser.AdminShowReplicaStatusContext; import org.apache.doris.nereids.DorisParser.AdminShowTabletStorageFormatContext; @@ -481,9 +483,11 @@ import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; +import org.apache.doris.nereids.trees.plans.commands.AdminCancelRebalanceDiskCommand; import org.apache.doris.nereids.trees.plans.commands.AdminCheckTabletsCommand; import org.apache.doris.nereids.trees.plans.commands.AdminCleanTrashCommand; import org.apache.doris.nereids.trees.plans.commands.AdminCompactTableCommand; +import org.apache.doris.nereids.trees.plans.commands.AdminRebalanceDiskCommand; import org.apache.doris.nereids.trees.plans.commands.AdminShowReplicaStatusCommand; import org.apache.doris.nereids.trees.plans.commands.AlterCatalogCommentCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; @@ -4724,6 +4728,26 @@ public LogicalPlan visitShowStorageEngines(ShowStorageEnginesContext ctx) { return new ShowStorageEnginesCommand(); } + @Override + public LogicalPlan visitAdminRebalanceDisk(AdminRebalanceDiskContext ctx) { + if (ctx.ON() != null) { + List backendList = Lists.newArrayList(); + ctx.backends.forEach(backend -> backendList.add(stripQuotes(backend.getText()))); + return new AdminRebalanceDiskCommand(backendList); + } + return new AdminRebalanceDiskCommand(); + } + + @Override + public LogicalPlan visitAdminCancelRebalanceDisk(AdminCancelRebalanceDiskContext ctx) { + if (ctx.ON() != null) { + List backendList = Lists.newArrayList(); + ctx.backends.forEach(backend -> backendList.add(stripQuotes(backend.getText()))); + return new AdminCancelRebalanceDiskCommand(backendList); + } + return new AdminCancelRebalanceDiskCommand(); + } + @Override public LogicalPlan visitShowDiagnoseTablet(ShowDiagnoseTabletContext ctx) { long tabletId = Long.parseLong(ctx.INTEGER_VALUE().getText()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 060dc859fef766..e9488239b8ac2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -251,6 +251,8 @@ public enum PlanType { RECOVER_TABLE_COMMAND, RECOVER_PARTITION_COMMAND, REPLAY_COMMAND, + ADMIN_REBALANCE_DISK_COMMAND, + ADMIN_CANCEL_REBALANCE_DISK_COMMAND, CREATE_ENCRYPTKEY_COMMAND, CREATE_WORKLOAD_GROUP_COMMAND, CREATE_CATALOG_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCancelRebalanceDiskCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCancelRebalanceDiskCommand.java new file mode 100644 index 00000000000000..7f07555cd4b150 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminCancelRebalanceDiskCommand.java @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.util.NetUtils; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.Backend; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * admin cancel rebalance disk + */ +public class AdminCancelRebalanceDiskCommand extends Command implements NoForward { + private static final Logger LOG = LogManager.getLogger(AdminCancelRebalanceDiskCommand.class); + private List backends; + + public AdminCancelRebalanceDiskCommand() { + super(PlanType.ADMIN_CANCEL_REBALANCE_DISK_COMMAND); + } + + public AdminCancelRebalanceDiskCommand(List backends) { + super(PlanType.ADMIN_CANCEL_REBALANCE_DISK_COMMAND); + this.backends = backends; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + handleCancelRebalanceDisk(); + } + + private void handleCancelRebalanceDisk() throws AnalysisException { + List rebalanceDiskBackends = getNeedRebalanceDiskBackends(backends); + if (rebalanceDiskBackends.isEmpty()) { + LOG.info("The matching be is empty, no be to cancel rebalance disk."); + return; + } + Env.getCurrentEnv().getTabletScheduler().cancelRebalanceDisk(rebalanceDiskBackends); + } + + private List getNeedRebalanceDiskBackends(List backends) throws AnalysisException { + ImmutableMap backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + List needRebalanceDiskBackends = Lists.newArrayList(); + if (backends == null) { + needRebalanceDiskBackends.addAll(backendsInfo.values()); + } else { + Map backendsID = new HashMap<>(); + for (Backend backend : backendsInfo.values()) { + backendsID.put( + NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()), + backend.getId()); + } + for (String be : backends) { + if (backendsID.containsKey(be)) { + needRebalanceDiskBackends.add(backendsInfo.get(backendsID.get(be))); + backendsID.remove(be); + } + } + } + return needRebalanceDiskBackends; + } + + @Override + protected void checkSupportedInCloudMode(ConnectContext ctx) throws DdlException { + LOG.info("AdminCancelRebalanceDiskCommand not supported in cloud mode"); + throw new DdlException("Unsupported operation"); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitAdminCancelRebalanceDiskCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminRebalanceDiskCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminRebalanceDiskCommand.java new file mode 100644 index 00000000000000..d33b81c262ddc9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AdminRebalanceDiskCommand.java @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.util.NetUtils; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.Backend; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * admin rebalance disk + */ +public class AdminRebalanceDiskCommand extends Command implements NoForward { + private static final Logger LOG = LogManager.getLogger(AdminRebalanceDiskCommand.class); + private final long timeoutS = 24 * 3600; // default 24 hours + private List backends; + + public AdminRebalanceDiskCommand() { + super(PlanType.ADMIN_REBALANCE_DISK_COMMAND); + } + + public AdminRebalanceDiskCommand(List backends) { + super(PlanType.ADMIN_REBALANCE_DISK_COMMAND); + this.backends = backends; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + // check auth + if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); + } + handleRebalanceDisk(); + } + + private void handleRebalanceDisk() throws AnalysisException { + List rebalanceDiskBackends = getNeedRebalanceDiskBackends(backends); + if (rebalanceDiskBackends.isEmpty()) { + LOG.info("The matching be is empty, no be to rebalance disk."); + return; + } + Env.getCurrentEnv().getTabletScheduler().rebalanceDisk(rebalanceDiskBackends, timeoutS); + } + + private List getNeedRebalanceDiskBackends(List backends) throws AnalysisException { + ImmutableMap backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster(); + List needRebalanceDiskBackends = Lists.newArrayList(); + if (backends == null) { + needRebalanceDiskBackends.addAll(backendsInfo.values()); + } else { + Map backendsID = new HashMap<>(); + for (Backend backend : backendsInfo.values()) { + backendsID.put( + NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()), + backend.getId()); + } + for (String be : backends) { + if (backendsID.containsKey(be)) { + needRebalanceDiskBackends.add(backendsInfo.get(backendsID.get(be))); + backendsID.remove(be); + } + } + } + return needRebalanceDiskBackends; + } + + @Override + protected void checkSupportedInCloudMode(ConnectContext ctx) throws DdlException { + LOG.info("AdminRebalanceDiskCommand not supported in cloud mode"); + throw new DdlException("Unsupported operation"); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitAdminRebalanceDiskCommand(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index dc01cdd6d76bd3..ff71331b7afc5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -18,9 +18,11 @@ package org.apache.doris.nereids.trees.plans.visitor; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; +import org.apache.doris.nereids.trees.plans.commands.AdminCancelRebalanceDiskCommand; import org.apache.doris.nereids.trees.plans.commands.AdminCheckTabletsCommand; import org.apache.doris.nereids.trees.plans.commands.AdminCleanTrashCommand; import org.apache.doris.nereids.trees.plans.commands.AdminCompactTableCommand; +import org.apache.doris.nereids.trees.plans.commands.AdminRebalanceDiskCommand; import org.apache.doris.nereids.trees.plans.commands.AdminShowReplicaStatusCommand; import org.apache.doris.nereids.trees.plans.commands.AlterCatalogCommentCommand; import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand; @@ -502,6 +504,14 @@ default R visitShowFrontendsCommand(ShowFrontendsCommand showFrontendsCommand, C return visitCommand(showFrontendsCommand, context); } + default R visitAdminRebalanceDiskCommand(AdminRebalanceDiskCommand adminRebalanceDiskCommand, C context) { + return visitCommand(adminRebalanceDiskCommand, context); + } + + default R visitAdminCancelRebalanceDiskCommand(AdminCancelRebalanceDiskCommand command, C context) { + return visitCommand(command, context); + } + default R visitShowDynamicPartitionCommand(ShowDynamicPartitionCommand showDynamicPartitionCommand, C context) { return visitCommand(showDynamicPartitionCommand, context); } diff --git a/regression-test/suites/auth_call/test_database_management_auth.groovy b/regression-test/suites/auth_call/test_database_management_auth.groovy index de4a55ad5973cf..fb643d9ee089e2 100644 --- a/regression-test/suites/auth_call/test_database_management_auth.groovy +++ b/regression-test/suites/auth_call/test_database_management_auth.groovy @@ -119,11 +119,11 @@ suite("test_database_management_auth","p0,auth_call") { } test { sql """ADMIN REBALANCE DISK;""" - exception "denied" + exception "${error_in_cloud}" } test { sql """ADMIN CANCEL REBALANCE DISK;""" - exception "denied" + exception "${error_in_cloud}" } test { sql """UNSET GLOBAL VARIABLE ALL;""" diff --git a/regression-test/suites/nereids_p0/ddl/rebalance_disk/test_nereids_rebalance_disk.groovy b/regression-test/suites/nereids_p0/ddl/rebalance_disk/test_nereids_rebalance_disk.groovy new file mode 100644 index 00000000000000..fa32edf3ba279e --- /dev/null +++ b/regression-test/suites/nereids_p0/ddl/rebalance_disk/test_nereids_rebalance_disk.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_nereids_rebalance_disk") { + //cloud-mode + if (isCloudMode()) { + def String error_in_cloud = "Unsupported" + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + + test { + sql """ admin rebalance disk; """ + exception "${error_in_cloud}" + } + test { + sql """ admin rebalance disk ON ("127.0.0.1:9050"); """ + exception "${error_in_cloud}" + } + test { + sql """ admin rebalance disk ON ("192.168.0.1:9050", "127.0.0.1:9050", "192.168.0.2:9050"); """ + exception "${error_in_cloud}" + } + + test { + sql """ admin cancel rebalance disk; """ + exception "${error_in_cloud}" + } + test { + sql """ admin cancel rebalance disk ON ("127.0.0.1:9050"); """ + exception "${error_in_cloud}" + } + test { + sql """ admin cancel rebalance disk ON ("192.168.0.1:9050", "127.0.0.1:9050", "192.168.0.2:9050"); """ + exception "${error_in_cloud}" + } + } else { + // can not use qt command since the output change based on cluster and backend ip + checkNereidsExecute(""" admin rebalance disk; """) + checkNereidsExecute(""" admin rebalance disk ON ("127.0.0.1:9050"); """) + checkNereidsExecute(""" admin rebalance disk ON ("192.168.0.1:9050", "127.0.0.1:9050", "192.168.0.2:9050"); """) + + checkNereidsExecute(""" admin cancel rebalance disk; """) + checkNereidsExecute(""" admin cancel rebalance disk ON ("127.0.0.1:9050"); """) + checkNereidsExecute(""" admin cancel rebalance disk ON ("192.168.0.1:9050", "127.0.0.1:9050", "192.168.0.2:9050"); """) + } + +} \ No newline at end of file