Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](Nereids) support cancel commands #44422

Merged
merged 3 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ statementBase
| supportedRefreshStatement #supportedRefreshStatementAlias
| supportedShowStatement #supportedShowStatementAlias
| supportedLoadStatement #supportedLoadStatementAlias
| supportedCancelStatement #supportedCancelStatementAlias
| supportedRecoverStatement #supportedRecoverStatementAlias
| unsupportedStatement #unsupported
;
Expand Down Expand Up @@ -452,10 +453,14 @@ unsupportedCleanStatement
| CLEAN ALL QUERY STATS #cleanAllQueryStats
;

unsupportedCancelStatement
supportedCancelStatement
: CANCEL LOAD ((FROM | IN) database=identifier)? wildWhere? #cancelLoad
| CANCEL EXPORT ((FROM | IN) database=identifier)? wildWhere? #cancelExport
| CANCEL ALTER TABLE (ROLLUP | (MATERIALIZED VIEW) | COLUMN)
| CANCEL WARM UP JOB wildWhere? #cancelWarmUpJob
;

unsupportedCancelStatement
: CANCEL ALTER TABLE (ROLLUP | (MATERIALIZED VIEW) | COLUMN)
FROM tableName=multipartIdentifier (LEFT_PAREN jobIds+=INTEGER_VALUE
(COMMA jobIds+=INTEGER_VALUE)* RIGHT_PAREN)? #cancelAlterTable
| CANCEL BUILD INDEX ON tableName=multipartIdentifier
Expand All @@ -465,7 +470,6 @@ unsupportedCancelStatement
(COMMA hostPorts+=STRING_LITERAL)* #cancelDecommisionBackend
| CANCEL BACKUP ((FROM | IN) database=identifier)? #cancelBackup
| CANCEL RESTORE ((FROM | IN) database=identifier)? #cancelRestore
| CANCEL WARM UP JOB wildWhere? #cancelWarmUp
;

supportedRecoverStatement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ public CancelExportStmt(String dbName, Expr whereClause) {
this.whereClause = whereClause;
}

public CancelExportStmt(String dbName, Expr whereClause, String label, CompoundPredicate.Operator operator,
String state) {
this.dbName = dbName;
this.whereClause = whereClause;
this.label = label;
this.operator = operator;
this.state = state;
}

private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol.toLowerCase())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public CancelLoadStmt(String dbName, Expr whereClause) {
this.SUPPORT_COLUMNS.add("state");
}

public CancelLoadStmt(String dbName, Expr whereClause, String label, CompoundPredicate.Operator operator,
String state) {
this.dbName = dbName;
this.whereClause = whereClause;
this.label = label;
this.operator = operator;
this.state = state;
}

private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.doris.job.scheduler.JobScheduler;
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -522,4 +524,91 @@ private static void addNeedCancelLoadJob(String label, String state,
// job.updateLoadingStatus(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
// }
// }

/**
* used for nereids planner
*/
public void cancelLoadJob(String dbName, String label, String state, BinaryOperator operator)
throws JobException, AnalysisException, DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<InsertJob> unfinishedLoadJob;
readLock();
try {
List<InsertJob> loadJobs = Env.getCurrentEnv().getLabelProcessor().getJobs(db);
List<InsertJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(label, state, operator, loadJobs, matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new JobException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(InsertJob::isRunning)
.collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new JobException("There is no uncompleted job");
}
} finally {
readUnlock();
}
// check auth
if (unfinishedLoadJob.size() > 1 || unfinishedLoadJob.get(0).getTableNames().isEmpty()) {
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName);
}
} else {
for (String tableName : unfinishedLoadJob.get(0).getTableNames()) {
if (Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), InternalCatalog.INTERNAL_CATALOG_NAME, dbName,
tableName,
PrivPredicate.LOAD)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ":" + tableName);
}
}
}
for (InsertJob loadJob : unfinishedLoadJob) {
try {
alterJobStatus(loadJob.getJobId(), JobStatus.STOPPED);
} catch (JobException e) {
log.warn("Fail to cancel job, its label: {}", loadJob.getLabelName());
}
}
}

private static void addNeedCancelLoadJob(String label, String state,
BinaryOperator operator, List<InsertJob> loadJobs,
List<InsertJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> !job.isCancelled())
.filter(job -> {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
boolean stateFilter = job.getJobStatus().name().equalsIgnoreCase(state);
return operator instanceof And ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabelName())
: job.getLabelName().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getJobStatus().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}
}
74 changes: 74 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.exception.JobException;

Expand Down Expand Up @@ -160,6 +162,78 @@ public void cancelExportJob(CancelExportStmt stmt) throws DdlException, Analysis
}
}

private List<ExportJob> getWaitingCancelJobs(String label, String state, BinaryOperator operator)
throws AnalysisException {
Predicate<ExportJob> jobFilter = buildCancelJobFilter(label, state, operator);
readLock();
try {
return getJobs().stream().filter(jobFilter).collect(Collectors.toList());
} finally {
readUnlock();
}
}

@VisibleForTesting
public static Predicate<ExportJob> buildCancelJobFilter(String label, String state, BinaryOperator operator)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());

return job -> {
boolean labelFilter = true;
boolean stateFilter = true;
if (StringUtils.isNotEmpty(label)) {
labelFilter = label.contains("%") ? matcher.match(job.getLabel()) :
job.getLabel().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
stateFilter = job.getState().name().equalsIgnoreCase(state);
}

if (operator != null && operator instanceof Or) {
return labelFilter || stateFilter;
}

return labelFilter && stateFilter;
};
}

/**
* used for Nereids planner
*/
public void cancelExportJob(String label, String state, BinaryOperator operator, String dbName)
throws DdlException, AnalysisException {
// List of export jobs waiting to be cancelled
List<ExportJob> matchExportJobs = getWaitingCancelJobs(label, state, operator);
if (matchExportJobs.isEmpty()) {
throw new DdlException("Export job(s) do not exist");
}
matchExportJobs = matchExportJobs.stream()
.filter(job -> !job.isFinalState()).collect(Collectors.toList());
if (matchExportJobs.isEmpty()) {
throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
}

// check auth
checkCancelExportJobAuth(InternalCatalog.INTERNAL_CATALOG_NAME, dbName, matchExportJobs);
// Must add lock to protect export job.
// Because job may be cancelled when generating task executors,
// the cancel process may clear the task executor list at same time,
// which will cause ConcurrentModificationException
writeLock();
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, null,
ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
} finally {
writeUnlock();
}
}

public void checkCancelExportJobAuth(String ctlName, String dbName, List<ExportJob> jobs) throws AnalysisException {
if (jobs.size() > 1) {
if (!Env.getCurrentEnv().getAccessManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
Expand Down Expand Up @@ -246,23 +248,22 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam
* Match need cancel loadJob by stmt.
**/
@VisibleForTesting
public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
public static void addNeedCancelLoadJob(String label, String state, BinaryOperator operator,
List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
throws AnalysisException {
String label = stmt.getLabel();
String state = stmt.getState();
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> job.getState() != JobState.CANCELLED)
.filter(job -> {
if (stmt.getOperator() != null) {
if (operator != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabel())
: job.getLabel().equalsIgnoreCase(label);
boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
return operator instanceof And ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
Expand All @@ -280,8 +281,9 @@ public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJ
/**
* Cancel load job by stmt.
**/
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
public void cancelLoadJob(String dbName, String label, String state, BinaryOperator operator)
throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<LoadJob> unfinishedLoadJob;
readLock();
Expand All @@ -291,7 +293,7 @@ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisExce
throw new DdlException("Load job does not exist");
}
List<LoadJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(stmt,
addNeedCancelLoadJob(label, state, operator,
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
Expand All @@ -318,6 +320,82 @@ public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisExce
}
}

/**
* Match need cancel loadJob by stmt.
**/
@VisibleForTesting
public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
throws AnalysisException {
String label = stmt.getLabel();
String state = stmt.getState();
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(
loadJobs.stream()
.filter(job -> job.getState() != JobState.CANCELLED)
.filter(job -> {
if (stmt.getOperator() != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabel())
: job.getLabel().equalsIgnoreCase(label);
boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabel())
: job.getLabel().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getState().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList())
);
}

/**
* Cancel load job by stmt.
**/
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(stmt.getDbName());
// List of load jobs waiting to be cancelled
List<LoadJob> unfinishedLoadJob;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}
List<LoadJob> matchLoadJobs = Lists.newArrayList();
addNeedCancelLoadJob(stmt,
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}
// check state here
unfinishedLoadJob =
matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
if (unfinishedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job");
}
} finally {
readUnlock();
}
for (LoadJob loadJob : unfinishedLoadJob) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
} catch (DdlException e) {
throw new DdlException(
"Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
+
"] failed msg=" + e.getMessage());
}
}
}

/**
* Replay end load job.
**/
Expand Down
Loading
Loading