diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java index a1cc44f0026b7..39af6206ee015 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/PulsarRoutineLoadJob.java @@ -88,7 +88,7 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob { // pulsar properties, property prefix will be mapped to pulsar custom parameters, which can be extended in the future @SerializedName("cpt") private Map customProperties = Maps.newHashMap(); - private Map convertedCustomProperties = Maps.newHashMap(); + private final Map convertedCustomProperties = Maps.newHashMap(); public static final String POSITION_EARLIEST = "POSITION_EARLIEST"; // 1 public static final String POSITION_LATEST = "POSITION_LATEST"; // 0 @@ -100,6 +100,8 @@ public class PulsarRoutineLoadJob extends RoutineLoadJob { public PulsarRoutineLoadJob() { // for serialization, id is dummy super(-1, LoadDataSourceType.PULSAR); + this.progress = new PulsarProgress(); + this.timestampProgress = new PulsarProgress(); } public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId, @@ -109,6 +111,7 @@ public PulsarRoutineLoadJob(Long id, String name, long dbId, long tableId, this.topic = topic; this.subscription = subscription; this.progress = new PulsarProgress(); + this.timestampProgress = new PulsarProgress(); } public String getTopic() { @@ -260,8 +263,7 @@ protected boolean checkCommitInfo(RLTaskTxnCommitAttachment rlTaskTxnCommitAttac // For compatible reason, the default behavior of empty load is still returning // "No partitions have data available for loading" and abort transaction. // In this situation, we also need update commit info. - if (txnStatusChangeReason != null && - txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) { + if (txnStatusChangeReason == TransactionState.TxnStatusChangeReason.NO_PARTITIONS) { // Because the max_filter_ratio of routine load task is always 1. // Therefore, under normal circumstances, routine load task will not return the error "too many filtered rows". // If no data is imported, the error "No partitions have data available for loading" may only be returned. @@ -385,17 +387,17 @@ protected void unprotectUpdateCurrentPartitions(List newCurrentPartition @Override protected String getStatistic() { Map summary = Maps.newHashMap(); - summary.put("totalRows", Long.valueOf(totalRows)); - summary.put("loadedRows", Long.valueOf(totalRows - errorRows - unselectedRows)); - summary.put("errorRows", Long.valueOf(errorRows)); - summary.put("unselectedRows", Long.valueOf(unselectedRows)); - summary.put("receivedBytes", Long.valueOf(receivedBytes)); - summary.put("taskExecuteTimeMs", Long.valueOf(totalTaskExcutionTimeMs)); - summary.put("receivedBytesRate", Long.valueOf(receivedBytes / totalTaskExcutionTimeMs * 1000)); + summary.put("totalRows", totalRows); + summary.put("loadedRows", totalRows - errorRows - unselectedRows); + summary.put("errorRows", errorRows); + summary.put("unselectedRows", unselectedRows); + summary.put("receivedBytes", receivedBytes); + summary.put("taskExecuteTimeMs", totalTaskExcutionTimeMs); + summary.put("receivedBytesRate", receivedBytes / totalTaskExcutionTimeMs * 1000); summary.put("loadRowsRate", - Long.valueOf((totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000)); - summary.put("committedTaskNum", Long.valueOf(committedTaskNum)); - summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum)); + (totalRows - errorRows - unselectedRows) / totalTaskExcutionTimeMs * 1000); + summary.put("committedTaskNum", committedTaskNum); + summary.put("abortedTaskNum", abortedTaskNum); Gson gson = new GsonBuilder().disableHtmlEscaping().create(); return gson.toJson(summary); } diff --git a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java index 7e1f0f1c51a27..86e582e360549 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/routineload/RoutineLoadJob.java @@ -1574,25 +1574,25 @@ public String jobPropertiesToSql() { StringBuilder sb = new StringBuilder(); sb.append("(\n"); sb.append("\"").append(CreateRoutineLoadStmt.DESIRED_CONCURRENT_NUMBER_PROPERTY).append("\"=\""); - sb.append(String.valueOf(desireTaskConcurrentNum)).append("\",\n"); + sb.append(desireTaskConcurrentNum).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.MAX_ERROR_NUMBER_PROPERTY).append("\"=\""); - sb.append(String.valueOf(maxErrorNum)).append("\",\n"); + sb.append(maxErrorNum).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY).append("\"=\""); - sb.append(String.valueOf(maxFilterRatio)).append("\",\n"); + sb.append(maxFilterRatio).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_INTERVAL_SEC_PROPERTY).append("\"=\""); - sb.append(String.valueOf(taskSchedIntervalS)).append("\",\n"); + sb.append(taskSchedIntervalS).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.MAX_BATCH_ROWS_PROPERTY).append("\"=\""); - sb.append(String.valueOf(maxBatchRows)).append("\",\n"); + sb.append(maxBatchRows).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.TASK_CONSUME_SECOND).append("\"=\""); - sb.append(String.valueOf(taskConsumeSecond)).append("\",\n"); + sb.append(taskConsumeSecond).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.TASK_TIMEOUT_SECOND).append("\"=\""); - sb.append(String.valueOf(taskTimeoutSecond)).append("\",\n"); + sb.append(taskTimeoutSecond).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.FORMAT).append("\"=\""); sb.append(getFormat()).append("\",\n"); @@ -1601,19 +1601,19 @@ public String jobPropertiesToSql() { sb.append(getJsonPaths()).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY).append("\"=\""); - sb.append(Boolean.toString(isStripOuterArray())).append("\",\n"); + sb.append(isStripOuterArray()).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.JSONROOT).append("\"=\""); sb.append(getJsonRoot()).append("\",\n"); sb.append("\"").append(LoadStmt.STRICT_MODE).append("\"=\""); - sb.append(Boolean.toString(isStrictMode())).append("\",\n"); + sb.append(isStrictMode()).append("\",\n"); sb.append("\"").append(LoadStmt.TIMEZONE).append("\"=\""); sb.append(getTimezone()).append("\",\n"); sb.append("\"").append(LoadStmt.PARTIAL_UPDATE).append("\"=\""); - sb.append(Boolean.toString(isPartialUpdate())).append("\",\n"); + sb.append(isPartialUpdate()).append("\",\n"); if (getMergeCondition() != null) { sb.append("\"").append(LoadStmt.MERGE_CONDITION).append("\"=\""); @@ -1621,7 +1621,7 @@ public String jobPropertiesToSql() { } sb.append("\"").append(CreateRoutineLoadStmt.TRIMSPACE).append("\"=\""); - sb.append(Boolean.toString(isTrimspace())).append("\",\n"); + sb.append(isTrimspace()).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.ENCLOSE).append("\"=\""); sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEnclose()))).append("\",\n"); @@ -1630,7 +1630,7 @@ public String jobPropertiesToSql() { sb.append(StringEscapeUtils.escapeJava(String.valueOf(getEscape()))).append("\",\n"); sb.append("\"").append(CreateRoutineLoadStmt.LOG_REJECTED_RECORD_NUM_PROPERTY).append("\"=\""); - sb.append(String.valueOf(getLogRejectedRecordNum())).append("\"\n"); + sb.append(getLogRejectedRecordNum()).append("\"\n"); sb.append(")\n"); return sb.toString(); diff --git a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java index f7d7b7e30e50d..f6476ccd96b3a 100644 --- a/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/load/routineload/RoutineLoadJobTest.java @@ -159,12 +159,42 @@ void writeUnlock() { routineLoadJob.afterAborted(transactionState, true, txnStatusChangeReasonString); Assert.assertEquals(RoutineLoadJob.JobState.RUNNING, routineLoadJob.getState()); - Assert.assertEquals(new Long(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum")); + Assert.assertEquals(Long.valueOf(1), Deencapsulation.getField(routineLoadJob, "abortedTaskNum")); Assert.assertTrue(routineLoadJob.getOtherMsg(), routineLoadJob.getOtherMsg().endsWith(txnStatusChangeReasonString)); } @Test - public void testGetShowInfo() throws UserException { + public void testPulsarGetShowInfo() { + { + // PAUSE state + PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob(); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, + TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); + Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason); + + List showInfo = routineLoadJob.getShowInfo(); + Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity)) + .anyMatch(entity -> entity.equals(errorReason.toString()))); + } + + { + // PAUSE state + PulsarRoutineLoadJob routineLoadJob = new PulsarRoutineLoadJob( + 1L, "task1", 1, 1, "http://url", "task-1", "sub-1"); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, + TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); + Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason); + + List showInfo = routineLoadJob.getShowInfo(); + Assert.assertTrue(showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity)) + .anyMatch(entity -> entity.equals(errorReason.toString()))); + } + } + + @Test + public void testKafkaGetShowInfo() throws UserException { { // PAUSE state KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();