Skip to content

Commit

Permalink
[Enhancement] (nereids)implement adminRebalanceDiskCommand in nereids (
Browse files Browse the repository at this point in the history
…#45108)

Issue Number: close #42844, #42845
  • Loading branch information
DongLiang-0 authored Dec 17, 2024
1 parent 86c3c76 commit d670ec2
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,10 @@ unsupportedCancelStatement

supportedAdminStatement
: ADMIN SHOW REPLICA DISTRIBUTION FROM baseTableRef #adminShowReplicaDistribution
| ADMIN REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL
(COMMA backends+=STRING_LITERAL)* RIGHT_PAREN)? #adminRebalanceDisk
| ADMIN CANCEL REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL
(COMMA backends+=STRING_LITERAL)* RIGHT_PAREN)? #adminCancelRebalanceDisk
| ADMIN DIAGNOSE TABLET tabletId=INTEGER_VALUE #adminDiagnoseTablet
| ADMIN SHOW REPLICA STATUS FROM baseTableRef (WHERE STATUS EQ|NEQ STRING_LITERAL)? #adminShowReplicaStatus
| ADMIN COMPACT TABLE baseTableRef (WHERE TYPE EQ STRING_LITERAL)? #adminCompactTable
Expand All @@ -520,10 +524,6 @@ unsupportedAdminStatement
| ADMIN CANCEL REPAIR TABLE baseTableRef #adminCancelRepairTable
| ADMIN SET (FRONTEND | (ALL FRONTENDS)) CONFIG
(LEFT_PAREN propertyItemList RIGHT_PAREN)? ALL? #adminSetFrontendConfig
| ADMIN REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL
(COMMA backends+=STRING_LITERAL) RIGHT_PAREN)? #adminRebalanceDisk
| ADMIN CANCEL REBALANCE DISK (ON LEFT_PAREN backends+=STRING_LITERAL
(COMMA backends+=STRING_LITERAL) RIGHT_PAREN)? #adminCancelRebalanceDisk
| ADMIN SET TABLE name=multipartIdentifier
PARTITION VERSION properties=propertyClause? #adminSetPartitionVersion
| ADMIN COPY TABLET tabletId=INTEGER_VALUE properties=propertyClause? #adminCopyTablet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,14 @@ public synchronized void rebalanceDisk(AdminRebalanceDiskStmt stmt) {
diskRebalancer.addPrioBackends(stmt.getBackends(), stmt.getTimeoutS());
}

public synchronized void rebalanceDisk(List<Backend> backends, long timeoutS) {
diskRebalancer.addPrioBackends(backends, timeoutS);
}

public synchronized void cancelRebalanceDisk(List<Backend> backends) {
diskRebalancer.removePrioBackends(backends);
}

public synchronized void cancelRebalanceDisk(AdminCancelRebalanceDiskStmt stmt) {
diskRebalancer.removePrioBackends(stmt.getBackends());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
import org.apache.doris.mtmv.MTMVRefreshTriggerInfo;
import org.apache.doris.nereids.DorisParser;
import org.apache.doris.nereids.DorisParser.AddConstraintContext;
import org.apache.doris.nereids.DorisParser.AdminCancelRebalanceDiskContext;
import org.apache.doris.nereids.DorisParser.AdminCheckTabletsContext;
import org.apache.doris.nereids.DorisParser.AdminCompactTableContext;
import org.apache.doris.nereids.DorisParser.AdminDiagnoseTabletContext;
import org.apache.doris.nereids.DorisParser.AdminRebalanceDiskContext;
import org.apache.doris.nereids.DorisParser.AdminShowReplicaDistributionContext;
import org.apache.doris.nereids.DorisParser.AdminShowReplicaStatusContext;
import org.apache.doris.nereids.DorisParser.AdminShowTabletStorageFormatContext;
Expand Down Expand Up @@ -481,9 +483,11 @@
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCancelRebalanceDiskCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCheckTabletsCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCleanTrashCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCompactTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminRebalanceDiskCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminShowReplicaStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogCommentCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
Expand Down Expand Up @@ -4724,6 +4728,26 @@ public LogicalPlan visitShowStorageEngines(ShowStorageEnginesContext ctx) {
return new ShowStorageEnginesCommand();
}

@Override
public LogicalPlan visitAdminRebalanceDisk(AdminRebalanceDiskContext ctx) {
if (ctx.ON() != null) {
List<String> backendList = Lists.newArrayList();
ctx.backends.forEach(backend -> backendList.add(stripQuotes(backend.getText())));
return new AdminRebalanceDiskCommand(backendList);
}
return new AdminRebalanceDiskCommand();
}

@Override
public LogicalPlan visitAdminCancelRebalanceDisk(AdminCancelRebalanceDiskContext ctx) {
if (ctx.ON() != null) {
List<String> backendList = Lists.newArrayList();
ctx.backends.forEach(backend -> backendList.add(stripQuotes(backend.getText())));
return new AdminCancelRebalanceDiskCommand(backendList);
}
return new AdminCancelRebalanceDiskCommand();
}

@Override
public LogicalPlan visitShowDiagnoseTablet(ShowDiagnoseTabletContext ctx) {
long tabletId = Long.parseLong(ctx.INTEGER_VALUE().getText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ public enum PlanType {
RECOVER_TABLE_COMMAND,
RECOVER_PARTITION_COMMAND,
REPLAY_COMMAND,
ADMIN_REBALANCE_DISK_COMMAND,
ADMIN_CANCEL_REBALANCE_DISK_COMMAND,
CREATE_ENCRYPTKEY_COMMAND,
CREATE_WORKLOAD_GROUP_COMMAND,
CREATE_CATALOG_COMMAND,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* admin cancel rebalance disk
*/
public class AdminCancelRebalanceDiskCommand extends Command implements NoForward {
private static final Logger LOG = LogManager.getLogger(AdminCancelRebalanceDiskCommand.class);
private List<String> backends;

public AdminCancelRebalanceDiskCommand() {
super(PlanType.ADMIN_CANCEL_REBALANCE_DISK_COMMAND);
}

public AdminCancelRebalanceDiskCommand(List<String> backends) {
super(PlanType.ADMIN_CANCEL_REBALANCE_DISK_COMMAND);
this.backends = backends;
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
handleCancelRebalanceDisk();
}

private void handleCancelRebalanceDisk() throws AnalysisException {
List<Backend> rebalanceDiskBackends = getNeedRebalanceDiskBackends(backends);
if (rebalanceDiskBackends.isEmpty()) {
LOG.info("The matching be is empty, no be to cancel rebalance disk.");
return;
}
Env.getCurrentEnv().getTabletScheduler().cancelRebalanceDisk(rebalanceDiskBackends);
}

private List<Backend> getNeedRebalanceDiskBackends(List<String> backends) throws AnalysisException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
List<Backend> needRebalanceDiskBackends = Lists.newArrayList();
if (backends == null) {
needRebalanceDiskBackends.addAll(backendsInfo.values());
} else {
Map<String, Long> backendsID = new HashMap<>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(
NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()),
backend.getId());
}
for (String be : backends) {
if (backendsID.containsKey(be)) {
needRebalanceDiskBackends.add(backendsInfo.get(backendsID.get(be)));
backendsID.remove(be);
}
}
}
return needRebalanceDiskBackends;
}

@Override
protected void checkSupportedInCloudMode(ConnectContext ctx) throws DdlException {
LOG.info("AdminCancelRebalanceDiskCommand not supported in cloud mode");
throw new DdlException("Unsupported operation");
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAdminCancelRebalanceDiskCommand(this, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* admin rebalance disk
*/
public class AdminRebalanceDiskCommand extends Command implements NoForward {
private static final Logger LOG = LogManager.getLogger(AdminRebalanceDiskCommand.class);
private final long timeoutS = 24 * 3600; // default 24 hours
private List<String> backends;

public AdminRebalanceDiskCommand() {
super(PlanType.ADMIN_REBALANCE_DISK_COMMAND);
}

public AdminRebalanceDiskCommand(List<String> backends) {
super(PlanType.ADMIN_REBALANCE_DISK_COMMAND);
this.backends = backends;
}

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
// check auth
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
handleRebalanceDisk();
}

private void handleRebalanceDisk() throws AnalysisException {
List<Backend> rebalanceDiskBackends = getNeedRebalanceDiskBackends(backends);
if (rebalanceDiskBackends.isEmpty()) {
LOG.info("The matching be is empty, no be to rebalance disk.");
return;
}
Env.getCurrentEnv().getTabletScheduler().rebalanceDisk(rebalanceDiskBackends, timeoutS);
}

private List<Backend> getNeedRebalanceDiskBackends(List<String> backends) throws AnalysisException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
List<Backend> needRebalanceDiskBackends = Lists.newArrayList();
if (backends == null) {
needRebalanceDiskBackends.addAll(backendsInfo.values());
} else {
Map<String, Long> backendsID = new HashMap<>();
for (Backend backend : backendsInfo.values()) {
backendsID.put(
NetUtils.getHostPortInAccessibleFormat(backend.getHost(), backend.getHeartbeatPort()),
backend.getId());
}
for (String be : backends) {
if (backendsID.containsKey(be)) {
needRebalanceDiskBackends.add(backendsInfo.get(backendsID.get(be)));
backendsID.remove(be);
}
}
}
return needRebalanceDiskBackends;
}

@Override
protected void checkSupportedInCloudMode(ConnectContext ctx) throws DdlException {
LOG.info("AdminRebalanceDiskCommand not supported in cloud mode");
throw new DdlException("Unsupported operation");
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitAdminRebalanceDiskCommand(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.doris.nereids.trees.plans.visitor;

import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCancelRebalanceDiskCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCheckTabletsCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCleanTrashCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminCompactTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminRebalanceDiskCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminShowReplicaStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterCatalogCommentCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterJobStatusCommand;
Expand Down Expand Up @@ -502,6 +504,14 @@ default R visitShowFrontendsCommand(ShowFrontendsCommand showFrontendsCommand, C
return visitCommand(showFrontendsCommand, context);
}

default R visitAdminRebalanceDiskCommand(AdminRebalanceDiskCommand adminRebalanceDiskCommand, C context) {
return visitCommand(adminRebalanceDiskCommand, context);
}

default R visitAdminCancelRebalanceDiskCommand(AdminCancelRebalanceDiskCommand command, C context) {
return visitCommand(command, context);
}

default R visitShowDynamicPartitionCommand(ShowDynamicPartitionCommand showDynamicPartitionCommand, C context) {
return visitCommand(showDynamicPartitionCommand, context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ suite("test_database_management_auth","p0,auth_call") {
}
test {
sql """ADMIN REBALANCE DISK;"""
exception "denied"
exception "${error_in_cloud}"
}
test {
sql """ADMIN CANCEL REBALANCE DISK;"""
exception "denied"
exception "${error_in_cloud}"
}
test {
sql """UNSET GLOBAL VARIABLE ALL;"""
Expand Down
Loading

0 comments on commit d670ec2

Please sign in to comment.