Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](Nereids) fix cancel task jobs failed #44755

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ supportedJobStatement
|
(AT (atTime=STRING_LITERAL | CURRENT_TIMESTAMP)))
commentSpec?
DO supportedDmlStatement #createScheduledJob
| PAUSE JOB wildWhere? #pauseJob
| DROP JOB (IF EXISTS)? wildWhere? #dropJob
| RESUME JOB wildWhere? #resumeJob
| CANCEL TASK wildWhere? #cancelJobTask
DO supportedDmlStatement #createScheduledJob
| PAUSE JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #pauseJob
| DROP JOB (IF EXISTS)? WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #dropJob
| RESUME JOB WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) #resumeJob
| CANCEL TASK WHERE (jobNameKey=identifier) EQ (jobNameValue=STRING_LITERAL) AND (taskIdKey=identifier) EQ (taskIdValue=INTEGER_VALUE) #cancelJobTask
;
constraintStatement
: ALTER TABLE table=multipartIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,39 +772,25 @@ public LogicalPlan visitCreateScheduledJob(DorisParser.CreateScheduledJobContext

@Override
public LogicalPlan visitPauseJob(DorisParser.PauseJobContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
return new PauseJobCommand(wildWhere);
return new PauseJobCommand(stripQuotes(ctx.jobNameValue.getText()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we do not check jobNameKey?

}

@Override
public LogicalPlan visitDropJob(DorisParser.DropJobContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
boolean ifExists = ctx.EXISTS() != null;
return new DropJobCommand(wildWhere, ifExists);
return new DropJobCommand(stripQuotes(ctx.jobNameValue.getText()), ifExists);
}

@Override
public LogicalPlan visitResumeJob(DorisParser.ResumeJobContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
return new ResumeJobCommand(wildWhere);
return new ResumeJobCommand(stripQuotes(ctx.jobNameValue.getText()));
}

@Override
public LogicalPlan visitCancelJobTask(DorisParser.CancelJobTaskContext ctx) {
Expression wildWhere = null;
if (ctx.wildWhere() != null) {
wildWhere = getWildWhere(ctx.wildWhere());
}
return new CancelJobTaskCommand(wildWhere);
String jobName = stripQuotes(ctx.jobNameValue.getText());
Long taskId = Long.valueOf(ctx.taskIdValue.getText());
Comment on lines +802 to +803
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if sql like cancel task where task_id = '123' and job_name = 123?

return new CancelJobTaskCommand(jobName, taskId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,22 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Strings;

/**
* base class for all drop commands
*/
public abstract class AlterJobStatusCommand extends Command implements ForwardWithSync {
// exclude job name prefix, which is used by inner job
private static final String excludeJobNamePrefix = "inner_";
private final Expression wildWhere;
private String jobName;

public AlterJobStatusCommand(PlanType type, Expression wildWhere) {
public AlterJobStatusCommand(PlanType type, String jobName) {
super(type);
this.wildWhere = wildWhere;
this.jobName = jobName;
}

public String getJobName() {
Expand All @@ -69,25 +62,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
}

private void validate() throws Exception {
if (!(wildWhere instanceof EqualTo)) {
throw new AnalysisException("Alter job status only support equal condition, but not: " + wildWhere.toSql());
}
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
Expression left = ((EqualTo) wildWhere).left();
Expression right = ((EqualTo) wildWhere).right();
if (!(left instanceof UnboundSlot && ((UnboundSlot) left).getName().equalsIgnoreCase("jobName"))) {
throw new AnalysisException("Current not support left child of where: " + left);
}
if (!(right instanceof StringLikeLiteral)) {
throw new AnalysisException("Value must is string");
}

if (Strings.isNullOrEmpty(((StringLikeLiteral) right).getStringValue())) {
throw new AnalysisException("Value can't is null");
}
this.jobName = ((StringLikeLiteral) right).getStringValue();
if (jobName.startsWith(excludeJobNamePrefix)) {
throw new AnalysisException("Can't alter inner job status");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@
package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.LargeIntLiteral;
import org.apache.doris.nereids.trees.expressions.literal.StringLikeLiteral;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -37,19 +31,14 @@
* base class for all drop commands
*/
public class CancelJobTaskCommand extends CancelCommand implements ForwardWithSync {
private static final String jobNameKey = "jobName";

private static final String taskIdKey = "taskId";

private String jobName;

private Long taskId;

private Expression expr;

public CancelJobTaskCommand(Expression expr) {
public CancelJobTaskCommand(String jobName, Long taskId) {
super(PlanType.CANCEL_JOB_COMMAND);
this.expr = expr;
this.jobName = jobName;
this.taskId = taskId;
}

@Override
Expand All @@ -59,34 +48,10 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we miss the permission check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, mistoken delete

doRun(ctx);
}

private void validate() throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
if (!(expr instanceof And)) {
throw new AnalysisException("Only allow compound predicate with operator AND");
}
if (!(expr.child(0).child(0) instanceof UnboundSlot)
&& jobNameKey.equals(((UnboundSlot) expr.child(0).child(0)).getName())) {
throw new AnalysisException("Current not support " + ((UnboundSlot) expr.child(0).child(0)).getName());
}

if (!(expr.child(0).child(1) instanceof StringLikeLiteral)) {
throw new AnalysisException("JobName value must is string");
}
this.jobName = ((StringLikeLiteral) expr.child(0).child(1)).getStringValue();
String taskIdInput = ((StringLikeLiteral) expr.child(1).child(0)).getStringValue();
if (!taskIdKey.equalsIgnoreCase(taskIdInput)) {
throw new AnalysisException("Current not support " + taskIdInput);
}
if (!(expr.child(1).child(1) instanceof LargeIntLiteral)) {
throw new AnalysisException("task id value must is large int");
}
this.taskId = ((LargeIntLiteral) expr.child(1).child(1)).getLongValue();
doRun(ctx);
}

public void doRun(ConnectContext ctx) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -30,8 +33,8 @@
public class DropJobCommand extends AlterJobStatusCommand implements ForwardWithSync {
private final boolean ifExists;

public DropJobCommand(Expression wildWhere, boolean ifExists) {
super(PlanType.DROP_JOB_COMMAND, wildWhere);
public DropJobCommand(String jobName, boolean ifExists) {
super(PlanType.DROP_JOB_COMMAND, jobName);
this.ifExists = ifExists;
}

Expand All @@ -46,6 +49,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
}

public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
ctx.getEnv().getJobManager().unregisterJob(super.getJobName(), ifExists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -29,8 +32,8 @@
* pause job
*/
public class PauseJobCommand extends AlterJobStatusCommand implements ForwardWithSync {
public PauseJobCommand(Expression wildWhere) {
super(PlanType.PAUSE_JOB_COMMAND, wildWhere);
public PauseJobCommand(String jobName) {
super(PlanType.PAUSE_JOB_COMMAND, jobName);
}

@Override
Expand All @@ -40,6 +43,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.PAUSED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.analysis.StmtType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
Expand All @@ -29,8 +32,8 @@
* pause job
*/
public class ResumeJobCommand extends AlterJobStatusCommand implements ForwardWithSync {
public ResumeJobCommand(Expression wildWhere) {
super(PlanType.RESUME_JOB_COMMAND, wildWhere);
public ResumeJobCommand(String jobName) {
super(PlanType.RESUME_JOB_COMMAND, jobName);
}

@Override
Expand All @@ -40,6 +43,9 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {

@Override
public void doRun(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (!Env.getCurrentEnv().getAccessManager().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "ADMIN");
}
ctx.getEnv().getJobManager().alterJobStatus(super.getJobName(), JobStatus.RUNNING);
}

Expand Down
Loading