Skip to content

Commit

Permalink
fixup!: apache#15981 Missing completion reports on index_parallel tas…
Browse files Browse the repository at this point in the history
…ks (apache#16042)

* initial commit

* comments

* typo

* comments

* comments

* remove var

* initialize global var early

* remove new line

* small test fix

* same fix another test
  • Loading branch information
adithyachakilam authored Mar 6, 2024
1 parent c284142 commit ae022cc
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

int failCnt = 0;
Map<String, TaskReport> completionReports = new HashMap<>();
for (ParallelIndexSupervisorTask eachSpec : indexTaskSpecs) {
for (int i = 0; i < indexTaskSpecs.size(); i++) {
ParallelIndexSupervisorTask eachSpec = indexTaskSpecs.get(i);
final String json = toolbox.getJsonMapper().writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
if (!currentSubTaskHolder.setTask(eachSpec)) {
String errMsg = "Task was asked to stop. Finish as failed.";
Expand All @@ -518,9 +519,11 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
failCnt++;
log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
}

String reportKeySuffix = "_" + i;
Optional.ofNullable(eachSpec.getCompletionReports())
.ifPresent(reports -> completionReports.putAll(
CollectionUtils.mapKeys(reports, key -> getReportkey(eachSpec.getBaseSubtaskSpecName(), key))));
CollectionUtils.mapKeys(reports, key -> key + reportKeySuffix)));
} else {
failCnt++;
log.warn("indexSpec is not ready: [%s].\nTrying the next indexSpec.", json);
Expand Down Expand Up @@ -572,11 +575,6 @@ private String createIndexTaskSpecId(int i)
return StringUtils.format("%s_%d", getId(), i);
}

private String getReportkey(String baseSequenceName, String currentKey)
{
return StringUtils.format("%s_%s", currentKey, baseSequenceName.substring(baseSequenceName.lastIndexOf('_') + 1));
}

/**
* Generate {@link ParallelIndexIngestionSpec} from input segments.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ private static String makeGroupId(String dataSource, IngestionMode ingestionMode

private IngestionState ingestionState;

// used to specify if indextask.run() is run as a part of another task
// skips writing reports and cleanup if not a standalone task
private boolean isStandAloneTask;

@MonotonicNonNull
Expand Down Expand Up @@ -217,6 +215,10 @@ public IndexTask(
);
}

/**
* @param isStandAloneTask used to specify if indextask.run() is run as a part of another task
* skips writing reports and cleanup if not a standalone task
*/
public IndexTask(
String id,
String groupId,
Expand Down Expand Up @@ -567,8 +569,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
catch (Exception e) {
log.error(e, "Encountered exception in %s.", ingestionState);
errorMsg = Throwables.getStackTraceAsString(e);
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
Expand All @@ -580,8 +581,10 @@ public TaskStatus runTask(final TaskToolbox toolbox)
}
}

private void writeCompletionReports(TaskToolbox toolbox)

private void updateAndWriteCompletionReports(TaskToolbox toolbox)
{
completionReports = getTaskCompletionReports();
if (isStandAloneTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
}
Expand Down Expand Up @@ -1043,8 +1046,7 @@ private TaskStatus generateAndPublishSegments(
if (published == null) {
log.error("Failed to publish segments, aborting!");
errorMsg = "Failed to publish segments.";
completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(toolbox);
return TaskStatus.failure(
getId(),
errorMsg
Expand All @@ -1067,8 +1069,7 @@ private TaskStatus generateAndPublishSegments(

log.debugSegments(published.getSegments(), "Published segments");

completionReports = getTaskCompletionReports();
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(toolbox);
return TaskStatus.success(getId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen

private IngestionState ingestionState;
private Map<String, TaskReport> completionReports;
private final Boolean isCompactionTask;
private final boolean isCompactionTask;


@JsonCreator
Expand All @@ -225,7 +225,7 @@ public ParallelIndexSupervisorTask(
ParallelIndexIngestionSpec ingestionSchema,
@Nullable String baseSubtaskSpecName,
Map<String, Object> context,
Boolean isCompactionTask
boolean isCompactionTask
)
{
super(
Expand Down Expand Up @@ -303,13 +303,6 @@ public Map<String, TaskReport> getCompletionReports()
return completionReports;
}

@Nullable
@JsonIgnore
public String getBaseSubtaskSpecName()
{
return baseSubtaskSpecName;
}

@JsonProperty("spec")
public ParallelIndexIngestionSpec getIngestionSchema()
{
Expand Down Expand Up @@ -534,13 +527,12 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
try {

initializeSubTaskCleaner();
this.toolbox = toolbox;

if (isParallelMode()) {
// emit metric for parallel batch ingestion mode:
emitMetric(toolbox.getEmitter(), "ingest/count", 1);

this.toolbox = toolbox;

if (isGuaranteedRollup(
getIngestionMode(),
ingestionSchema.getTuningConfig()
Expand Down Expand Up @@ -669,8 +661,7 @@ private TaskStatus runSinglePhaseParallel(TaskToolbox toolbox) throws Exception
}
taskStatus = TaskStatus.failure(getId(), errorMessage);
}
completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}

Expand Down Expand Up @@ -837,8 +828,7 @@ TaskStatus runHashPartitionMultiPhaseParallel(TaskToolbox toolbox) throws Except
taskStatus = TaskStatus.failure(getId(), errMsg);
}

completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}

Expand Down Expand Up @@ -935,8 +925,7 @@ TaskStatus runRangePartitionMultiPhaseParallel(TaskToolbox toolbox) throws Excep
taskStatus = TaskStatus.failure(getId(), errMsg);
}

completionReports = getTaskCompletionReports(taskStatus, segmentAvailabilityConfirmationCompleted);
writeCompletionReports(toolbox);
updateAndWriteCompletionReports(taskStatus);
return taskStatus;
}

Expand Down Expand Up @@ -1225,6 +1214,7 @@ private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
&& sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
TaskStatus status = sequentialIndexTask.run(toolbox);
completionReports = sequentialIndexTask.getCompletionReports();
writeCompletionReports();
return status;
} else {
String msg = "Task was asked to stop. Finish as failed";
Expand Down Expand Up @@ -1261,7 +1251,13 @@ private Map<String, TaskReport> getTaskCompletionReports(TaskStatus taskStatus,
);
}

private void writeCompletionReports(TaskToolbox toolbox)
private void updateAndWriteCompletionReports(TaskStatus status)
{
completionReports = getTaskCompletionReports(status, segmentAvailabilityConfirmationCompleted);
writeCompletionReports();
}

private void writeCompletionReports()
{
if (!isCompactionTask) {
toolbox.getTaskReportFileWriter().write(getId(), completionReports);
Expand Down Expand Up @@ -1842,6 +1838,12 @@ static Map<String, Object> getTaskReport(final OverlordClient overlordClient, fi
}
}

@VisibleForTesting
public void setToolbox(TaskToolbox toolbox)
{
this.toolbox = toolbox;
}

/**
* Represents a partition uniquely identified by an Interval and a bucketId.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,52 @@ public void testSerde() throws Exception
Assert.assertEquals(reportMap1, reportMap2);
}

@Test
public void testSerializationOnMissingPartitionStats() throws Exception
{
String json = "{\n"
+ " \"type\": \"ingestionStatsAndErrors\",\n"
+ " \"taskId\": \"ingestionStatsAndErrors\",\n"
+ " \"payload\": {\n"
+ " \"ingestionState\": \"COMPLETED\",\n"
+ " \"unparseableEvents\": {\n"
+ " \"hello\": \"world\"\n"
+ " },\n"
+ " \"rowStats\": {\n"
+ " \"number\": 1234\n"
+ " },\n"
+ " \"errorMsg\": \"an error message\",\n"
+ " \"segmentAvailabilityConfirmed\": true,\n"
+ " \"segmentAvailabilityWaitTimeMs\": 1000\n"
+ " }\n"
+ "}";

IngestionStatsAndErrorsTaskReport expected = new IngestionStatsAndErrorsTaskReport(
IngestionStatsAndErrorsTaskReport.REPORT_KEY,
new IngestionStatsAndErrorsTaskReportData(
IngestionState.COMPLETED,
ImmutableMap.of(
"hello", "world"
),
ImmutableMap.of(
"number", 1234
),
"an error message",
true,
1000L,
null
)
);


Assert.assertEquals(expected, jsonMapper.readValue(
json,
new TypeReference<TaskReport>()
{
}
));
}

@Test
public void testExceptionWhileWritingReport() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void failsInThirdPhase() throws Exception
Assert.assertTrue(task.isReady(actionClient));
task.stopGracefully(null);


task.setToolbox(toolbox);
TaskStatus taskStatus = task.runHashPartitionMultiPhaseParallel(toolbox);

Assert.assertTrue(taskStatus.isFailure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public void failsThirdPhase() throws Exception
Assert.assertTrue(task.isReady(actionClient));
task.stopGracefully(null);


task.setToolbox(toolbox);
TaskStatus taskStatus = task.runRangePartitionMultiPhaseParallel(toolbox);

Assert.assertTrue(taskStatus.isFailure());
Expand Down

0 comments on commit ae022cc

Please sign in to comment.