From 00c6e5be060294635b33ce2fe8980f8b8dbf508f Mon Sep 17 00:00:00 2001 From: LiBinfeng Date: Thu, 28 Nov 2024 21:00:18 +0800 Subject: [PATCH] [fix](Nereids) fix cancel task jobs failed --- .../org/apache/doris/nereids/DorisLexer.g4 | 2 + .../org/apache/doris/nereids/DorisParser.g4 | 8 ++-- .../nereids/parser/LogicalPlanBuilder.java | 26 +++------- .../plans/commands/AlterJobStatusCommand.java | 27 +---------- .../plans/commands/CancelJobTaskCommand.java | 48 ++----------------- .../trees/plans/commands/DropJobCommand.java | 5 +- .../trees/plans/commands/PauseJobCommand.java | 5 +- .../plans/commands/ResumeJobCommand.java | 5 +- 8 files changed, 23 insertions(+), 103 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 index 8ce8d033108367f..33f8b95ab161259 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 @@ -316,6 +316,7 @@ IS_NULL_PRED: 'IS_NULL_PRED'; ISNULL: 'ISNULL'; ISOLATION: 'ISOLATION'; JOB: 'JOB'; +JOBNAME: 'JOBNAME'; JOBS: 'JOBS'; JOIN: 'JOIN'; JSON: 'JSON'; @@ -518,6 +519,7 @@ TABLESAMPLE: 'TABLESAMPLE'; TABLET: 'TABLET'; TABLETS: 'TABLETS'; TASK: 'TASK'; +TASKID: 'TASKID'; TASKS: 'TASKS'; TEMPORARY: 'TEMPORARY'; TERMINATED: 'TERMINATED'; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 3a377ba05bc7078..293fa8ac2c2f90e 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -118,10 +118,10 @@ supportedJobStatement (AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP))) commentSpec? DO supportedDmlStatement #createScheduledJob - | PAUSE JOB wildWhere? #pauseJob - | DROP JOB (IF EXISTS)? wildWhere? #dropJob - | RESUME JOB wildWhere? #resumeJob - | CANCEL TASK wildWhere? #cancelJobTask + | PAUSE JOB WHERE JOBNAME EQ STRING_LITERAL #pauseJob + | DROP JOB (IF EXISTS)? WHERE JOBNAME EQ STRING_LITERAL #dropJob + | RESUME JOB WHERE JOBNAME EQ STRING_LITERAL #resumeJob + | CANCEL TASK WHERE JOBNAME EQ STRING_LITERAL AND TASKID EQ INTEGER_VALUE #cancelJobTask ; constraintStatement : ALTER TABLE table=multipartIdentifier diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 2d55baffb1ebd91..e398b2806d6c2d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -772,39 +772,25 @@ public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext @Override public LogicalPlan visitPauseJob(DorisParser.PauseJobContext ctx) { - Expression wildWhere = null; - if (ctx.wildWhere() != null) { - wildWhere = getWildWhere(ctx.wildWhere()); - } - return new PauseJobCommand(wildWhere); + return new PauseJobCommand(stripQuotes(ctx.STRING_LITERAL().getText())); } @Override public LogicalPlan visitDropJob(DorisParser.DropJobContext ctx) { - Expression wildWhere = null; - if (ctx.wildWhere() != null) { - wildWhere = getWildWhere(ctx.wildWhere()); - } boolean ifExists = ctx.EXISTS() != null; - return new DropJobCommand(wildWhere, ifExists); + return new DropJobCommand(stripQuotes(ctx.STRING_LITERAL().getText()), ifExists); } @Override public LogicalPlan visitResumeJob(DorisParser.ResumeJobContext ctx) { - Expression wildWhere = null; - if (ctx.wildWhere() != null) { - wildWhere = getWildWhere(ctx.wildWhere()); - } - return new ResumeJobCommand(wildWhere); + return new ResumeJobCommand(stripQuotes(ctx.STRING_LITERAL().getText())); } @Override public LogicalPlan visitCancelJobTask(DorisParser.CancelJobTaskContext ctx) { - Expression wildWhere = null; - if (ctx.wildWhere() != null) { - wildWhere = getWildWhere(ctx.wildWhere()); - } - return new CancelJobTaskCommand(wildWhere); + String jobName = stripQuotes(ctx.STRING_LITERAL().getText()); + Long taskId = Long.valueOf(ctx.TASKID().getText()); + return new CancelJobTaskCommand(jobName, taskId); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java index c3aafa8a5fe0c5a..46cb24382e682fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobStatusCommand.java @@ -23,29 +23,22 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.trees.expressions.EqualTo; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import com.google.common.base.Strings; - /** * base class for all drop commands */ public abstract class AlterJobStatusCommand extends Command implements ForwardWithSync { // exclude job name prefix, which is used by inner job private static final String excludeJobNamePrefix = "inner_"; - private final Expression wildWhere; private String jobName; - public AlterJobStatusCommand(PlanType type, Expression wildWhere) { + public AlterJobStatusCommand(PlanType type, String jobName) { super(type); - this.wildWhere = wildWhere; + this.jobName = jobName; } public String getJobName() { @@ -69,25 +62,9 @@ public R accept(PlanVisitor visitor, C context) { } private void validate() throws Exception { - if (!(wildWhere instanceof EqualTo)) { - throw new AnalysisException("Alter job status only support equal condition, but not: " + wildWhere.toSql()); - } if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); } - Expression left = ((EqualTo) wildWhere).left(); - Expression right = ((EqualTo) wildWhere).right(); - if (!(left instanceof UnboundSlot && ((UnboundSlot) left).getName().equalsIgnoreCase("jobName"))) { - throw new AnalysisException("Current not support left child of where: " + left); - } - if (!(right instanceof StringLikeLiteral)) { - throw new AnalysisException("Value must is string"); - } - - if (Strings.isNullOrEmpty(((StringLikeLiteral) right).getStringValue())) { - throw new AnalysisException("Value can't is null"); - } - this.jobName = ((StringLikeLiteral) right).getStringValue(); if (jobName.startsWith(excludeJobNamePrefix)) { throw new AnalysisException("Can't alter inner job status"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelJobTaskCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelJobTaskCommand.java index e02c731911a18eb..157eabaf6287729 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelJobTaskCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CancelJobTaskCommand.java @@ -17,17 +17,7 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.mysql.privilege.PrivPredicate; -import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.trees.expressions.And; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.literal.LargeIntLiteral; -import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -37,19 +27,14 @@ * base class for all drop commands */ public class CancelJobTaskCommand extends CancelCommand implements ForwardWithSync { - private static final String jobNameKey = "jobName"; - - private static final String taskIdKey = "taskId"; - private String jobName; private Long taskId; - private Expression expr; - - public CancelJobTaskCommand(Expression expr) { + public CancelJobTaskCommand(String jobName, Long taskId) { super(PlanType.CANCEL_JOB_COMMAND); - this.expr = expr; + this.jobName = jobName; + this.taskId = taskId; } @Override @@ -59,36 +44,9 @@ public R accept(PlanVisitor visitor, C context) { @Override public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { - validate(); doRun(ctx); } - private void validate() throws AnalysisException { - if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN"); - } - if (!(expr instanceof And)) { - throw new AnalysisException("Only allow compound predicate with operator AND"); - } - if (!(expr.child(0).child(0) instanceof UnboundSlot) - && jobNameKey.equals(((UnboundSlot) expr.child(0).child(0)).getName())) { - throw new AnalysisException("Current not support " + ((UnboundSlot) expr.child(0).child(0)).getName()); - } - - if (!(expr.child(0).child(1) instanceof StringLikeLiteral)) { - throw new AnalysisException("JobName value must is string"); - } - this.jobName = ((StringLikeLiteral) expr.child(0).child(1)).getStringValue(); - String taskIdInput = ((StringLikeLiteral) expr.child(1).child(0)).getStringValue(); - if (!taskIdKey.equalsIgnoreCase(taskIdInput)) { - throw new AnalysisException("Current not support " + taskIdInput); - } - if (!(expr.child(1).child(1) instanceof LargeIntLiteral)) { - throw new AnalysisException("task id value must is large int"); - } - this.taskId = ((LargeIntLiteral) expr.child(1).child(1)).getLongValue(); - } - public void doRun(ConnectContext ctx) throws Exception { try { ctx.getEnv().getJobManager().cancelTaskById(jobName, taskId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java index 69c00a0b0846948..10bbe910efb07ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DropJobCommand.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.analysis.StmtType; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -30,8 +29,8 @@ public class DropJobCommand extends AlterJobStatusCommand implements ForwardWithSync { private final boolean ifExists; - public DropJobCommand(Expression wildWhere, boolean ifExists) { - super(PlanType.DROP_JOB_COMMAND, wildWhere); + public DropJobCommand(String jobName, boolean ifExists) { + super(PlanType.DROP_JOB_COMMAND, jobName); this.ifExists = ifExists; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java index 2954c79b074dfe8..2b22a5272931665 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/PauseJobCommand.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.StmtType; import org.apache.doris.job.common.JobStatus; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -29,8 +28,8 @@ * pause job */ public class PauseJobCommand extends AlterJobStatusCommand implements ForwardWithSync { - public PauseJobCommand(Expression wildWhere) { - super(PlanType.PAUSE_JOB_COMMAND, wildWhere); + public PauseJobCommand(String jobName) { + super(PlanType.PAUSE_JOB_COMMAND, jobName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java index 88ee9fe0774186f..fc3758a853e0931 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java @@ -19,7 +19,6 @@ import org.apache.doris.analysis.StmtType; import org.apache.doris.job.common.JobStatus; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -29,8 +28,8 @@ * pause job */ public class ResumeJobCommand extends AlterJobStatusCommand implements ForwardWithSync { - public ResumeJobCommand(Expression wildWhere) { - super(PlanType.RESUME_JOB_COMMAND, wildWhere); + public ResumeJobCommand(String jobName) { + super(PlanType.RESUME_JOB_COMMAND, jobName); } @Override