Skip to content

Commit

Permalink
[BugFix] Fix transaction of insert load job can not be aborted when j…
Browse files Browse the repository at this point in the history
…ob has been cancelled (#48059)

(cherry picked from commit d952829)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/load/loadv2/InsertLoadJob.java
#	fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java
  • Loading branch information
caneGuy authored and mergify[bot] committed Jul 31, 2024
1 parent 511aaf2 commit e7d6432
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,15 @@ public void readFields(DataInput in) throws IOException {
super.readFields(in);
tableId = in.readLong();
}
<<<<<<< HEAD
=======

public void setEstimateScanRow(long rows) {
this.estimateScanRow = rows;
}

public void setTransactionId(long txnId) {
this.transactionId = txnId;
}
>>>>>>> d9528297c4 ([BugFix] Fix transaction of insert load job can not be aborted when job has been cancelled (#48059))
}
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,17 @@ public void recordFinishedOrCacnelledLoadJob(long jobId, EtlJobType jobType, Str
}
}

<<<<<<< HEAD
public long registerLoadJob(String label, String dbName, long tableId, EtlJobType jobType,
long createTimestamp, long estimateScanRows, TLoadJobType type, long timeout,
Coordinator coordinator) throws UserException {
=======
public long registerLoadJob(String label, String dbName, long tableId, long txnId, EtlJobType jobType,
long createTimestamp, long estimateScanRows,
int estimateFileNum, long estimateFileSize,
TLoadJobType type, long timeout, Coordinator coordinator)
throws UserException {
>>>>>>> d9528297c4 ([BugFix] Fix transaction of insert load job can not be aborted when job has been cancelled (#48059))

// get db id
Database db = GlobalStateMgr.getCurrentState().getDb(dbName);
Expand All @@ -243,8 +251,15 @@ public long registerLoadJob(String label, String dbName, long tableId, EtlJobTyp

LoadJob loadJob;
if (Objects.requireNonNull(jobType) == EtlJobType.INSERT) {
<<<<<<< HEAD
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, estimateScanRows, type, timeout,
coordinator);
=======
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, type, timeout, coordinator);
loadJob.setLoadFileInfo(estimateFileNum, estimateFileSize);
loadJob.setEstimateScanRow(estimateScanRows);
loadJob.setTransactionId(txnId);
>>>>>>> d9528297c4 ([BugFix] Fix transaction of insert load job can not be aborted when job has been cancelled (#48059))
} else {
throw new LoadException("Unknown job type [" + jobType.name() + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
label,
database.getFullName(),
targetTable.getId(),
transactionId,
EtlJobType.INSERT,
createTime,
estimateScanRows,
Expand Down

0 comments on commit e7d6432

Please sign in to comment.