Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix the npe of show routine load (backport #50963) #51142

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob {
// pulsar properties, property prefix will be mapped to pulsar custom parameters, which can be extended in the future
@SerializedName("cpt")
private Map<String, String> customProperties = Maps.newHashMap();
private Map<String, String> convertedCustomProperties = Maps.newHashMap();
private final Map<String, String> convertedCustomProperties = Maps.newHashMap();

public static final String POSITION_EARLIEST = "POSITION_EARLIEST"; // 1
public static final String POSITION_LATEST = "POSITION_LATEST"; // 0
Expand All @@ -100,6 +100,8 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob {
public PulsarRoutineLoadJob() {
// for serialization, id is dummy
super(-1, LoadDataSourceType.PULSAR);
this.progress = new PulsarProgress();
this.timestampProgress = new PulsarProgress();
}

public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId,
Expand All @@ -109,6 +111,7 @@ public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId,
this.topic = topic;
this.subscription = subscription;
this.progress = new PulsarProgress();
this.timestampProgress = new PulsarProgress();
}

public String getTopic() {
Expand Down Expand Up @@ -260,8 +263,7 @@ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttac
// For compatible reason, the default behavior of empty load is still returning
// "No partitions have data available for loading" and abort transaction.
// In this situation, we also need update commit info.
if (txnStatusChangeReason != null &&
txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
if (txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) {
// Because the max_filter_ratio of routine load task is always 1.
// Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows".
// If no data is imported, the error "No partitions have data available for loading" may only be returned.
Expand Down Expand Up @@ -385,17 +387,17 @@ protected void unprotectUpdateCurrentPartitions(List<String> newCurrentPartition
@Override
protected String getStatistic() {
Map<String, Object> summary = Maps.newHashMap();
summary.put("totalRows", Long.valueOf(totalRows));
summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows));
summary.put("errorRows", Long.valueOf(errorRows));
summary.put("unselectedRows", Long.valueOf(unselectedRows));
summary.put("receivedBytes", Long.valueOf(receivedBytes));
summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs));
summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000));
summary.put("totalRows", totalRows);
summary.put("loadedRows", totalRows - errorRows - unselectedRows);
summary.put("errorRows", errorRows);
summary.put("unselectedRows", unselectedRows);
summary.put("receivedBytes", receivedBytes);
summary.put("taskExecuteTimeMs", totalTaskExcutionTimeMs);
summary.put("receivedBytesRate", receivedBytes / totalTaskExcutionTimeMs * 1000);
summary.put("loadRowsRate",
Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000));
summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
(totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000);
summary.put("committedTaskNum", committedTaskNum);
summary.put("abortedTaskNum", abortedTaskNum);
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,25 +1574,25 @@ public String jobPropertiesToSql() {
StringBuilder sb = new StringBuilder();
sb.append("(\n");
sb.append("\"").append(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).append("\"=\"");
sb.append(String.valueOf(desireTaskConcurrentNum)).append("\",\n");
sb.append(desireTaskConcurrentNum).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxErrorNum)).append("\",\n");
sb.append(maxErrorNum).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxFilterRatio)).append("\",\n");
sb.append(maxFilterRatio).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY).append("\"=\"");
sb.append(String.valueOf(taskSchedIntervalS)).append("\",\n");
sb.append(taskSchedIntervalS).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY).append("\"=\"");
sb.append(String.valueOf(maxBatchRows)).append("\",\n");
sb.append(maxBatchRows).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.TASK_CONSUME_SECOND).append("\"=\"");
sb.append(String.valueOf(taskConsumeSecond)).append("\",\n");
sb.append(taskConsumeSecond).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.TASK_TIMEOUT_SECOND).append("\"=\"");
sb.append(String.valueOf(taskTimeoutSecond)).append("\",\n");
sb.append(taskTimeoutSecond).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.FORMAT).append("\"=\"");
sb.append(getFormat()).append("\",\n");
Expand All @@ -1601,27 +1601,27 @@ public String jobPropertiesToSql() {
sb.append(getJsonPaths()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY).append("\"=\"");
sb.append(Boolean.toString(isStripOuterArray())).append("\",\n");
sb.append(isStripOuterArray()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.JSONROOT).append("\"=\"");
sb.append(getJsonRoot()).append("\",\n");

sb.append("\"").append(LoadStmt.STRICT_MODE).append("\"=\"");
sb.append(Boolean.toString(isStrictMode())).append("\",\n");
sb.append(isStrictMode()).append("\",\n");

sb.append("\"").append(LoadStmt.TIMEZONE).append("\"=\"");
sb.append(getTimezone()).append("\",\n");

sb.append("\"").append(LoadStmt.PARTIAL_UPDATE).append("\"=\"");
sb.append(Boolean.toString(isPartialUpdate())).append("\",\n");
sb.append(isPartialUpdate()).append("\",\n");

if (getMergeCondition() != null) {
sb.append("\"").append(LoadStmt.MERGE_CONDITION).append("\"=\"");
sb.append(getMergeCondition()).append("\",\n");
}

sb.append("\"").append(CreateRoutineLoadStmt.TRIMSPACE).append("\"=\"");
sb.append(Boolean.toString(isTrimspace())).append("\",\n");
sb.append(isTrimspace()).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.ENCLOSE).append("\"=\"");
sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEnclose()))).append("\",\n");
Expand All @@ -1630,7 +1630,7 @@ public String jobPropertiesToSql() {
sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEscape()))).append("\",\n");

sb.append("\"").append(CreateRoutineLoadStmt.LOG_REJECTED_RECORD_NUM_PROPERTY).append("\"=\"");
sb.append(String.valueOf(getLogRejectedRecordNum())).append("\"\n");
sb.append(getLogRejectedRecordNum()).append("\"\n");

sb.append(")\n");
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,42 @@ void writeUnlock() {
routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString);

Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState());
Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum"));
Assert.assertTrue(routineLoadJob.getOtherMsg(), routineLoadJob.getOtherMsg().endsWith(txnStatusChangeReasonString));
}

@Test
public void testGetShowInfo() throws UserException {
public void testPulsarGetShowInfo() {
{
// PAUSE state
PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);

List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(errorReason.toString())));
}

{
// PAUSE state
PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob(
1L, "task1", 1, 1, "http://url", "task-1", "sub-1");
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR,
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);

List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(errorReason.toString())));
}
}

@Test
public void testKafkaGetShowInfo() throws UserException {
{
// PAUSE state
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Expand Down
Loading