Skip to content

Commit

Permalink
cancel cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
englefly committed Nov 28, 2024
1 parent 0d50ede commit 955d5a5
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.doris.load.loadv2.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -528,7 +527,8 @@ private static void addNeedCancelLoadJob(String label, String state,
/**
* used for nereids planner
*/
public void cancelLoadJob(String dbName, String label, String state, BinaryOperator operator)
public void cancelLoadJob(String dbName, String label, String state,
org.apache.doris.nereids.trees.expressions.CompoundPredicate operator)
throws JobException, AnalysisException, DdlException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
Expand Down Expand Up @@ -582,7 +582,8 @@ public void cancelLoadJob(String dbName, String label, String state, BinaryOpera
}

private static void addNeedCancelLoadJob(String label, String state,
BinaryOperator operator, List<InsertJob> loadJobs,
org.apache.doris.nereids.trees.expressions.CompoundPredicate operator,
List<InsertJob> loadJobs,
List<InsertJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
Expand Down
14 changes: 10 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.Or;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.exception.JobException;
Expand Down Expand Up @@ -162,7 +161,9 @@ public void cancelExportJob(CancelExportStmt stmt) throws DdlException, Analysis
}
}

private List<ExportJob> getWaitingCancelJobs(String label, String state, BinaryOperator operator)
private List<ExportJob> getWaitingCancelJobs(
String label, String state,
org.apache.doris.nereids.trees.expressions.CompoundPredicate operator)
throws AnalysisException {
Predicate<ExportJob> jobFilter = buildCancelJobFilter(label, state, operator);
readLock();
Expand All @@ -174,7 +175,9 @@ private List<ExportJob> getWaitingCancelJobs(String label, String state, BinaryO
}

@VisibleForTesting
public static Predicate<ExportJob> buildCancelJobFilter(String label, String state, BinaryOperator operator)
public static Predicate<ExportJob> buildCancelJobFilter(
String label, String state,
org.apache.doris.nereids.trees.expressions.CompoundPredicate operator)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
CaseSensibility.LABEL.getCaseSensibility());
Expand All @@ -201,7 +204,10 @@ public static Predicate<ExportJob> buildCancelJobFilter(String label, String sta
/**
* used for Nereids planner
*/
public void cancelExportJob(String label, String state, BinaryOperator operator, String dbName)
public void cancelExportJob(
String label,
String state,
org.apache.doris.nereids.trees.expressions.CompoundPredicate operator, String dbName)
throws DdlException, AnalysisException {
// List of export jobs waiting to be cancelled
List<ExportJob> matchExportJobs = getWaitingCancelJobs(label, state, operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.expressions.And;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
Expand Down Expand Up @@ -248,7 +248,7 @@ public void recordFinishedLoadJob(String label, long transactionId, String dbNam
* Match need cancel loadJob by stmt.
**/
@VisibleForTesting
public static void addNeedCancelLoadJob(String label, String state, BinaryOperator operator,
public static void addNeedCancelLoadJob(String label, String state, CompoundPredicate operator,
List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
throws AnalysisException {
PatternMatcher matcher = PatternMatcherWrapper.createMysqlPattern(label,
Expand Down Expand Up @@ -281,7 +281,7 @@ public static void addNeedCancelLoadJob(String label, String state, BinaryOperat
/**
* Cancel load job by stmt.
**/
public void cancelLoadJob(String dbName, String label, String state, BinaryOperator operator)
public void cancelLoadJob(String dbName, String label, String state, CompoundPredicate operator)
throws DdlException, AnalysisException {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -55,7 +55,7 @@ public CancelExportCommand(String dbName, Expression whereClause) {
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
ctx.getEnv().getExportMgr().cancelExportJob(label, state, (BinaryOperator) whereClause, dbName);
ctx.getEnv().getExportMgr().cancelExportJob(label, state, (CompoundPredicate) whereClause, dbName);
}

private void validate(ConnectContext ctx) throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.nereids.trees.expressions.BinaryOperator;
import org.apache.doris.nereids.trees.expressions.CompoundPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
Expand Down Expand Up @@ -57,9 +57,9 @@ public CancelLoadCommand(String dbName, Expression whereClause) {
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
validate(ctx);
try {
ctx.getEnv().getJobManager().cancelLoadJob(dbName, label, state, (BinaryOperator) whereClause);
ctx.getEnv().getJobManager().cancelLoadJob(dbName, label, state, (CompoundPredicate) whereClause);
} catch (JobException e) {
ctx.getEnv().getLoadManager().cancelLoadJob(dbName, label, state, (BinaryOperator) whereClause);
ctx.getEnv().getLoadManager().cancelLoadJob(dbName, label, state, (CompoundPredicate) whereClause);
}
}

Expand Down

0 comments on commit 955d5a5

Please sign in to comment.