Skip to content

Commit

Permalink
[fix](Nereids) fix cancel task jobs failed
Browse files Browse the repository at this point in the history
  • Loading branch information
LiBinfeng-01 committed Dec 2, 2024
1 parent 70b0a08 commit 00c6e5b
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ IS_NULL_PRED: 'IS_NULL_PRED';
ISNULL: 'ISNULL';
ISOLATION: 'ISOLATION';
JOB: 'JOB';
JOBNAME: 'JOBNAME';
JOBS: 'JOBS';
JOIN: 'JOIN';
JSON: 'JSON';
Expand Down Expand Up @@ -518,6 +519,7 @@ TABLESAMPLE: 'TABLESAMPLE';
TABLET: 'TABLET';
TABLETS: 'TABLETS';
TASK: 'TASK';
TASKID: 'TASKID';
TASKS: 'TASKS';
TEMPORARY: 'TEMPORARY';
TERMINATED: 'TERMINATED';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ supportedJobStatement
(AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
commentSpec?
DO supportedDmlStatement #createScheduledJob
| PAUSE JOB wildWhere? #pauseJob
| DROP JOB (IF EXISTS)? wildWhere? #dropJob
| RESUME JOB wildWhere? #resumeJob
| CANCEL TASK wildWhere? #cancelJobTask
| PAUSE JOB WHERE JOBNAME EQ STRING_LITERAL #pauseJob
| DROP JOB (IF EXISTS)? WHERE JOBNAME EQ STRING_LITERAL #dropJob
| RESUME JOB WHERE JOBNAME EQ STRING_LITERAL #resumeJob
| CANCEL TASK WHERE JOBNAME EQ STRING_LITERAL AND TASKID EQ INTEGER_VALUE #cancelJobTask
;
constraintStatement
: ALTER TABLE table=multipartIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,39 +772,25 @@ public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext

@Override
public LogicalPlan visitPauseJob(DorisParser.PauseJobContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
return new PauseJobCommand(wildWhere);
return new PauseJobCommand(stripQuotes(ctx.STRING_LITERAL().getText()));
}

@Override
public LogicalPlan visitDropJob(DorisParser.DropJobContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
boolean ifExists = ctx.EXISTS() != null;
return new DropJobCommand(wildWhere, ifExists);
return new DropJobCommand(stripQuotes(ctx.STRING_LITERAL().getText()), ifExists);
}

@Override
public LogicalPlan visitResumeJob(DorisParser.ResumeJobContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
return new ResumeJobCommand(wildWhere);
return new ResumeJobCommand(stripQuotes(ctx.STRING_LITERAL().getText()));
}

@Override
public LogicalPlan visitCancelJobTask(DorisParser.CancelJobTaskContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
return new CancelJobTaskCommand(wildWhere);
String jobName = stripQuotes(ctx.STRING_LITERAL().getText());
Long taskId = Long.valueOf(ctx.TASKID().getText());
return new CancelJobTaskCommand(jobName, taskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,22 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Strings;

/**
* base class for all drop commands
*/
public abstract class AlterJobStatusCommand extends Command implements ForwardWithSync {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
private final Expression wildWhere;
private String jobName;

public AlterJobStatusCommand(PlanType type, Expression wildWhere) {
public AlterJobStatusCommand(PlanType type, String jobName) {
super(type);
this.wildWhere = wildWhere;
this.jobName = jobName;
}

public String getJobName() {
Expand All @@ -69,25 +62,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
}

private void validate() throws Exception {
if (!(wildWhere instanceof EqualTo)) {
throw new AnalysisException("Alter job status only support equal condition, but not: " + wildWhere.toSql());
}
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
Expression left = ((EqualTo) wildWhere).left();
Expression right = ((EqualTo) wildWhere).right();
if (!(left instanceof UnboundSlot && ((UnboundSlot) left).getName().equalsIgnoreCase("jobName"))) {
throw new AnalysisException("Current not support left child of where: " + left);
}
if (!(right instanceof StringLikeLiteral)) {
throw new AnalysisException("Value must is string");
}

if (Strings.isNullOrEmpty(((StringLikeLiteral) right).getStringValue())) {
throw new AnalysisException("Value can't is null");
}
this.jobName = ((StringLikeLiteral) right).getStringValue();
if (jobName.startsWith(excludeJobNamePrefix)) {
throw new AnalysisException("Can't alter inner job status");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,7 @@

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.LargeIntLiteral;
import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -37,19 +27,14 @@
* base class for all drop commands
*/
public class CancelJobTaskCommand extends CancelCommand implements ForwardWithSync {
private static final String jobNameKey = "jobName";

private static final String taskIdKey = "taskId";

private String jobName;

private Long taskId;

private Expression expr;

public CancelJobTaskCommand(Expression expr) {
public CancelJobTaskCommand(String jobName, Long taskId) {
super(PlanType.CANCEL_JOB_COMMAND);
this.expr = expr;
this.jobName = jobName;
this.taskId = taskId;
}

@Override
Expand All @@ -59,36 +44,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate();
doRun(ctx);
}

private void validate() throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
if (!(expr instanceof And)) {
throw new AnalysisException("Only allow compound predicate with operator AND");
}
if (!(expr.child(0).child(0) instanceof UnboundSlot)
&& jobNameKey.equals(((UnboundSlot) expr.child(0).child(0)).getName())) {
throw new AnalysisException("Current not support " + ((UnboundSlot) expr.child(0).child(0)).getName());
}

if (!(expr.child(0).child(1) instanceof StringLikeLiteral)) {
throw new AnalysisException("JobName value must is string");
}
this.jobName = ((StringLikeLiteral) expr.child(0).child(1)).getStringValue();
String taskIdInput = ((StringLikeLiteral) expr.child(1).child(0)).getStringValue();
if (!taskIdKey.equalsIgnoreCase(taskIdInput)) {
throw new AnalysisException("Current not support " + taskIdInput);
}
if (!(expr.child(1).child(1) instanceof LargeIntLiteral)) {
throw new AnalysisException("task id value must is large int");
}
this.taskId = ((LargeIntLiteral) expr.child(1).child(1)).getLongValue();
}

public void doRun(ConnectContext ctx) throws Exception {
try {
ctx.getEnv().getJobManager().cancelTaskById(jobName, taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -30,8 +29,8 @@
public class DropJobCommand extends AlterJobStatusCommand implements ForwardWithSync {
private final boolean ifExists;

public DropJobCommand(Expression wildWhere, boolean ifExists) {
super(PlanType.DROP_JOB_COMMAND, wildWhere);
public DropJobCommand(String jobName, boolean ifExists) {
super(PlanType.DROP_JOB_COMMAND, jobName);
this.ifExists = ifExists;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.StmtType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -29,8 +28,8 @@
* pause job
*/
public class PauseJobCommand extends AlterJobStatusCommand implements ForwardWithSync {
public PauseJobCommand(Expression wildWhere) {
super(PlanType.PAUSE_JOB_COMMAND, wildWhere);
public PauseJobCommand(String jobName) {
super(PlanType.PAUSE_JOB_COMMAND, jobName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.StmtType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -29,8 +28,8 @@
* pause job
*/
public class ResumeJobCommand extends AlterJobStatusCommand implements ForwardWithSync {
public ResumeJobCommand(Expression wildWhere) {
super(PlanType.RESUME_JOB_COMMAND, wildWhere);
public ResumeJobCommand(String jobName) {
super(PlanType.RESUME_JOB_COMMAND, jobName);
}

@Override
Expand Down

0 comments on commit 00c6e5b

Please sign in to comment.