Skip to content

Commit

Permalink
[BugFix] Fix the npe of show routine load (#50963)
Browse files Browse the repository at this point in the history
Signed-off-by: trueeyu <[email protected]>
(cherry picked from commit 5bc037d)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java
#	fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java
  • Loading branch information
trueeyu authored and mergify[bot] committed Sep 19, 2024
1 parent 514420e commit 8e78ff0
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob {
// pulsar properties, property prefix will be mapped to pulsar custom parameters, which can be extended in the future
@SerializedName("cpt")
private Map<String, String> customProperties = Maps.newHashMap();
private Map<String, String> convertedCustomProperties = Maps.newHashMap();
private final Map<String, String> convertedCustomProperties = Maps.newHashMap();

public static final String POSITION_EARLIEST = "POSITION_EARLIEST"; // 1
public static final String POSITION_LATEST = "POSITION_LATEST"; // 0
Expand All @@ -100,6 +100,8 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob {
public PulsarRoutineLoadJob() {
// for serialization, id is dummy
super(-1, LoadDataSourceType.PULSAR);
this.progress = new PulsarProgress();
this.timestampProgress = new PulsarProgress();
}

public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId,
Expand All @@ -109,6 +111,7 @@ public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId,
this.topic = topic;
this.subscription = subscription;
this.progress = new PulsarProgress();
this.timestampProgress = new PulsarProgress();
}

public String getTopic() {
Expand Down Expand Up @@ -260,8 +263,7 @@ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttac
// For compatible reason, the default behavior of empty load is still returning
// "No partitions have data available for loading" and abort transaction.
// In this situation, we also need update commit info.
if (txnStatusChangeReason != null &&
txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
if (txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
// Because the max_filter_ratio of routine load task is always 1.
// Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows".
// If no data is imported, the error "No partitions have data available for loading" may only be returned.
Expand Down Expand Up @@ -385,17 +387,17 @@ protected void unprotectUpdateCurrentPartitions(List<String> newCurrentPartition
@Override
protected String getStatistic() {
Map<String, Object> summary = Maps.newHashMap();
summary.put("totalRows", Long.valueOf(totalRows));
summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows));
summary.put("errorRows", Long.valueOf(errorRows));
summary.put("unselectedRows", Long.valueOf(unselectedRows));
summary.put("receivedBytes", Long.valueOf(receivedBytes));
summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
summary.put("totalRows", totalRows);
summary.put("loadedRows", totalRows - errorRows - unselectedRows);
summary.put("errorRows", errorRows);
summary.put("unselectedRows", unselectedRows);
summary.put("receivedBytes", receivedBytes);
summary.put("taskExecuteTimeMs", totalTaskExcutionTimeMs);
summary.put("receivedBytesRate", receivedBytes / totalTaskExcutionTimeMs * 1000);
summary.put("loadRowsRate",
Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
(totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000);
summary.put("committedTaskNum", committedTaskNum);
summary.put("abortedTaskNum", abortedTaskNum);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,25 +1574,25 @@ public String jobPropertiesToSql() {
StringBuilder sb = new StringBuilder();
sb.append("(\n");
sb.append("\"").append(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).append("\"=\"");
sb.append(String.valueOf(desireTaskConcurrentNum)).append("\",\n");
sb.append(desireTaskConcurrentNum).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxErrorNum)).append("\",\n");
sb.append(maxErrorNum).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxFilterRatio)).append("\",\n");
sb.append(maxFilterRatio).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY).append("\"=\"");
sb.append(String.valueOf(taskSchedIntervalS)).append("\",\n");
sb.append(taskSchedIntervalS).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxBatchRows)).append("\",\n");
sb.append(maxBatchRows).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.TASK_CONSUME_SECOND).append("\"=\"");
sb.append(String.valueOf(taskConsumeSecond)).append("\",\n");
sb.append(taskConsumeSecond).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.TASK_TIMEOUT_SECOND).append("\"=\"");
sb.append(String.valueOf(taskTimeoutSecond)).append("\",\n");
sb.append(taskTimeoutSecond).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.FORMAT).append("\"=\"");
sb.append(getFormat()).append("\",\n");
Expand All @@ -1601,27 +1601,27 @@ public String jobPropertiesToSql() {
sb.append(getJsonPaths()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY).append("\"=\"");
sb.append(Boolean.toString(isStripOuterArray())).append("\",\n");
sb.append(isStripOuterArray()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.JSONROOT).append("\"=\"");
sb.append(getJsonRoot()).append("\",\n");

sb.append("\"").append(LoadStmt.STRICT_MODE).append("\"=\"");
sb.append(Boolean.toString(isStrictMode())).append("\",\n");
sb.append(isStrictMode()).append("\",\n");

sb.append("\"").append(LoadStmt.TIMEZONE).append("\"=\"");
sb.append(getTimezone()).append("\",\n");

sb.append("\"").append(LoadStmt.PARTIAL_UPDATE).append("\"=\"");
sb.append(Boolean.toString(isPartialUpdate())).append("\",\n");
sb.append(isPartialUpdate()).append("\",\n");

if (getMergeCondition() != null) {
sb.append("\"").append(LoadStmt.MERGE_CONDITION).append("\"=\"");
sb.append(getMergeCondition()).append("\",\n");
}

sb.append("\"").append(CreateRoutineLoadStmt.TRIMSPACE).append("\"=\"");
sb.append(Boolean.toString(isTrimspace())).append("\",\n");
sb.append(isTrimspace()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.ENCLOSE).append("\"=\"");
sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEnclose()))).append("\",\n");
Expand All @@ -1630,7 +1630,24 @@ public String jobPropertiesToSql() {
sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEscape()))).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.LOG_REJECTED_RECORD_NUM_PROPERTY).append("\"=\"");
<<<<<<< HEAD
sb.append(String.valueOf(getLogRejectedRecordNum())).append("\"\n");
=======
sb.append(getLogRejectedRecordNum());

if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
sb.append("\",\n");
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_WAREHOUSE).append("\"=\"");
Warehouse warehouse = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseId);
if (warehouse != null) {
sb.append(warehouse.getName()).append("\"\n");
} else {
sb.append("NULL").append("\"\n");
}
} else {
sb.append("\"\n");
}
>>>>>>> 5bc037dadd ([BugFix] Fix the npe of show routine load (#50963))

sb.append(")\n");
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,114 @@ void writeUnlock() {
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertTrue(routineLoadJob.getOtherMsg(), routineLoadJob.getOtherMsg().endsWith(txnStatusChangeReasonString));
<<<<<<< HEAD
=======

Assert.assertEquals(Long.valueOf(prevValue + 1), entity.counterRoutineLoadAbortedTasksTotal.getValue());

routineLoadTaskInfoList.clear();
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);
Assert.assertEquals(Long.valueOf(2), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(Long.valueOf(prevValue + 2), entity.counterRoutineLoadAbortedTasksTotal.getValue());
}

@Test
public void testAfterCommitted(@Mocked RoutineLoadMgr routineLoadMgr,
@Injectable TransactionState transactionState,
@Injectable KafkaTaskInfo routineLoadTaskInfo) throws UserException {
Deencapsulation.setField(routineLoadTaskInfo, "routineLoadManager", routineLoadMgr);
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Lists.newArrayList();
routineLoadTaskInfoList.add(routineLoadTaskInfo);
long txnId = 1L;

RLTaskTxnCommitAttachment attachment = new RLTaskTxnCommitAttachment();
TKafkaRLTaskProgress tKafkaRLTaskProgress = new TKafkaRLTaskProgress();
tKafkaRLTaskProgress.partitionCmtOffset = Maps.newHashMap();
KafkaProgress kafkaProgress = new KafkaProgress(tKafkaRLTaskProgress.getPartitionCmtOffset());
Deencapsulation.setField(attachment, "progress", kafkaProgress);

KafkaProgress currentProgress = new KafkaProgress(tKafkaRLTaskProgress.getPartitionCmtOffset());

RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();

new Expectations() {
{
transactionState.getTransactionId();
minTimes = 0;
result = txnId;
routineLoadTaskInfo.getTxnId();
minTimes = 0;
result = txnId;
transactionState.getTxnCommitAttachment();
minTimes = 0;
result = attachment;
routineLoadTaskInfo.getPartitions();
minTimes = 0;
result = Lists.newArrayList();
routineLoadTaskInfo.getId();
minTimes = 0;
result = UUID.randomUUID();
routineLoadMgr.getJob(anyLong);
minTimes = 0;
result = routineLoadJob;
}
};

new MockUp<RoutineLoadJob>() {
@Mock
void writeUnlock() {
}
};

Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.RUNNING);
Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList", routineLoadTaskInfoList);
Deencapsulation.setField(routineLoadJob, "progress", currentProgress);
TableMetricsEntity entity =
TableMetricsRegistry.getInstance().getMetricsEntity(routineLoadTaskInfo.getJob().tableId);
long prevValue = entity.counterRoutineLoadCommittedTasksTotal.getValue();
routineLoadJob.afterCommitted(transactionState, true);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "committedTaskNum"));

Assert.assertEquals(Long.valueOf(prevValue + 1), entity.counterRoutineLoadCommittedTasksTotal.getValue());
>>>>>>> 5bc037dadd ([BugFix] Fix the npe of show routine load (#50963))
}

@Test
public void testPulsarGetShowInfo() {
{
// PAUSE state
PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);

List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(errorReason.toString())));
}

{
// PAUSE state
PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob(
1L, "task1", 1, 1, "http://url", "task-1", "sub-1");
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);

List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(errorReason.toString())));
}
}

@Test
public void testGetShowInfo() throws UserException {
public void testKafkaGetShowInfo() throws UserException {
{
// PAUSE state
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Expand Down

0 comments on commit 8e78ff0

Please sign in to comment.