Skip to content

Commit

Permalink
#29479 Remove JobResult enum and refactor job handling logic
Browse files Browse the repository at this point in the history
Remove the `JobResult` enum and update job processing methods to use an `AbstractJobResult` interface. This refactor centralizes job result handling, differentiates success, failure, and cancellation cases, and incorporates metadata and error details within the job result.
  • Loading branch information
jgambarios committed Sep 13, 2024
1 parent acbceef commit a15d7c8
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.dotcms.jobs.business.error.ProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
Expand Down Expand Up @@ -274,12 +275,7 @@ public void cancelJob(final String jobId) {
Logger.info(this, "Cancelling job " + jobId);

processor.cancel(job);

Job cancelledJob = job.markAsCancelled();
jobQueue.updateJobStatus(cancelledJob);
jobCancelledEvent.fire(
new JobCancelledEvent(cancelledJob, LocalDateTime.now())
);
handleJobCancellation(job, processor);
} catch (Exception e) {
final var error = new JobCancellationException(jobId, e.getMessage());
Logger.error(JobQueueManagerAPIImpl.class, error);
Expand Down Expand Up @@ -508,7 +504,7 @@ private void processJob(final Job job) {

try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) {

final ProgressTracker progressTracker = processor.progressTracker(runningJob);
final ProgressTracker progressTracker = processor.progressTracker();

// Start a separate thread to periodically update and persist progress
ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService();
Expand All @@ -523,43 +519,98 @@ private void processJob(final Job job) {
updateJobProgress(runningJob, progressTracker);
}

Job completedJob = runningJob.markAsCompleted();
jobQueue.updateJobStatus(completedJob);
jobCompletedEvent.fire(new JobCompletedEvent(completedJob, LocalDateTime.now()));
handleJobCompletion(runningJob, processor);
} catch (Exception e) {

Logger.error(this,
"Error processing job " + runningJob.id() + ": " + e.getMessage(), e);
final var errorDetail = ErrorDetail.builder()
.message("Job processing failed")
.exception(e)
.exceptionClass(e.getClass().getName())
.processingStage("Job execution")
.timestamp(LocalDateTime.now())
.build();
handleJobFailure(runningJob, errorDetail);
"Error processing job " + runningJob.id() + ": " + e.getMessage(), e
);
handleJobFailure(
runningJob, processor, e,
"Job processing failed", "Job execution"
);
}
} else {

Logger.error(this, "No processor found for queue: " + job.queueName());
final var errorDetail = ErrorDetail.builder()
.message("No processor found for queue")
.processingStage("Processor selection")
.timestamp(LocalDateTime.now())
.build();
handleJobFailure(job, errorDetail);
handleJobFailure(job, null, new ProcessorNotFoundException(job.queueName()),
"No processor found for queue", "Processor selection"
);
}
}

/**
* Handles the completion of a job.
*
* @param job The job that completed.
* @param processor The processor that handled the job.
*/
private void handleJobCompletion(final Job job, final JobProcessor processor) {

final var resultMetadata = processor.getResultMetadata(job);

JobResult jobResult = null;
if (resultMetadata != null && !resultMetadata.isEmpty()) {
jobResult = JobResult.builder().metadata(resultMetadata).build();
}
final Job completedJob = job.markAsCompleted(jobResult);

jobQueue.updateJobStatus(completedJob);
jobCompletedEvent.fire(new JobCompletedEvent(completedJob, LocalDateTime.now()));
}

/**
* Handles the cancellation of a job.
*
* @param job The job that was cancelled.
* @param processor The processor that handled the job.
*/
private void handleJobCancellation(final Job job, final JobProcessor processor) {

final var resultMetadata = processor.getResultMetadata(job);

JobResult jobResult = null;
if (resultMetadata != null && !resultMetadata.isEmpty()) {
jobResult = JobResult.builder().metadata(resultMetadata).build();
}
Job cancelledJob = job.markAsCancelled(jobResult);

jobQueue.updateJobStatus(cancelledJob);
jobCancelledEvent.fire(
new JobCancelledEvent(cancelledJob, LocalDateTime.now())
);
}

/**
* Handles the failure of a job
*
* @param job The job that failed.
* @param errorDetail The details of the error that caused the failure.
* @param job The job that failed.
* @param processor The processor that handled the job.
* @param exception The exception that caused the failure.
* @param errorMessage The error message to include in the job result.
* @param processingStage The stage of processing where the failure occurred.
*/
private void handleJobFailure(final Job job, final ErrorDetail errorDetail) {
private void handleJobFailure(final Job job, final JobProcessor processor,
final Exception exception, final String errorMessage, final String processingStage) {

final var errorDetail = ErrorDetail.builder()
.message(errorMessage)
.exception(exception)
.exceptionClass(exception.getClass().getName())
.processingStage(processingStage)
.timestamp(LocalDateTime.now())
.build();

JobResult jobResult = JobResult.builder().errorDetail(errorDetail).build();

if (processor != null) {
final var resultMetadata = processor.getResultMetadata(job);
if (resultMetadata != null && !resultMetadata.isEmpty()) {
jobResult = jobResult.withMetadata(resultMetadata);
}
}

final Job failedJob = job.markAsFailed(errorDetail);
final Job failedJob = job.markAsFailed(jobResult);
jobQueue.updateJobStatus(failedJob);
jobFailedEvent.fire(new JobFailedEvent(failedJob, LocalDateTime.now()));

Expand Down
43 changes: 30 additions & 13 deletions dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public interface AbstractJob {

Optional<Throwable> lastException();

Optional<com.dotcms.jobs.business.error.ErrorDetail> errorDetail();

@Default
default int retryCount() {
return 0;
Expand Down Expand Up @@ -67,17 +65,16 @@ default Job incrementRetry() {
}

/**
* Creates a new Job marked as failed with the given error detail.
* Creates a new Job marked as failed with the result details.
*
* @param errorDetail The error detail to set.
* @param result The result details of the failed job.
* @return A new Job instance marked as failed.
*/
default Job markAsFailed(com.dotcms.jobs.business.error.ErrorDetail errorDetail) {
default Job markAsFailed(final JobResult result) {
return Job.builder().from(this)
.state(JobState.FAILED)
.result(JobResult.ERROR)
.errorDetail(errorDetail)
.lastException(errorDetail.exception())
.result(result)
.lastException(result.errorDetail().get().exception())
.build();
}

Expand All @@ -87,7 +84,7 @@ default Job markAsFailed(com.dotcms.jobs.business.error.ErrorDetail errorDetail)
* @param newState The new state to set.
* @return A new Job instance with the updated state.
*/
default Job withState(JobState newState) {
default Job withState(final JobState newState) {
return Job.builder().from(this)
.state(newState)
.updatedAt(LocalDateTime.now())
Expand All @@ -97,12 +94,22 @@ default Job withState(JobState newState) {
/**
* Creates a new Job marked as completed.
*
* @param result The result details of the completed job.
*
* @return A new Job instance marked as completed.
*/
default Job markAsCompleted() {
default Job markAsCompleted(final JobResult result) {
if (result != null) {
return Job.builder().from(this)
.state(JobState.COMPLETED)
.result(result)
.completedAt(Optional.of(LocalDateTime.now()))
.updatedAt(LocalDateTime.now())
.build();
}

return Job.builder().from(this)
.state(JobState.COMPLETED)
.result(JobResult.SUCCESS)
.completedAt(Optional.of(LocalDateTime.now()))
.updatedAt(LocalDateTime.now())
.build();
Expand All @@ -111,12 +118,22 @@ default Job markAsCompleted() {
/**
* Creates a new Job marked as canceled.
*
* @param result The result details of the canceled job.
*
* @return A new Job instance marked as canceled.
*/
default Job markAsCancelled() {
default Job markAsCancelled(final JobResult result) {
if (result != null) {
return Job.builder().from(this)
.state(JobState.CANCELLED)
.result(result)
.completedAt(Optional.of(LocalDateTime.now()))
.updatedAt(LocalDateTime.now())
.build();
}

return Job.builder().from(this)
.state(JobState.CANCELLED)
.result(JobResult.SUCCESS)
.completedAt(Optional.of(LocalDateTime.now()))
.updatedAt(LocalDateTime.now())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.dotcms.jobs.business.job;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.Map;
import java.util.Optional;
import org.immutables.value.Value;

/**
* Abstract interface for an immutable JobResult class. This interface defines the structure for job
* result information in the job processing system. The concrete implementation will be generated as
* an immutable class named JobResult.
*/
@Value.Style(typeImmutable = "*", typeAbstract = "Abstract*")
@Value.Immutable
@JsonSerialize(as = JobResult.class)
@JsonDeserialize(as = JobResult.class)
public interface AbstractJobResult {

Optional<com.dotcms.jobs.business.error.ErrorDetail> errorDetail();

Optional<Map<String, Object>> metadata();

}
17 changes: 0 additions & 17 deletions dotCMS/src/main/java/com/dotcms/jobs/business/job/JobResult.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.JobProcessingException;
import com.dotcms.jobs.business.job.Job;
import java.util.Map;

/**
* Interface for processing jobs. Implementations of this interface should define how to process,
Expand Down Expand Up @@ -35,13 +36,22 @@ public interface JobProcessor {
void cancel(Job job) throws JobCancellationException;

/**
* Provides a progress tracker for the given job. The default implementation returns a new
* instance of DefaultProgressTracker.
* Returns metadata about the job execution. This metadata can be used to provide additional
* information about the job's execution, such as statistics or other details useful for the
* caller.
*
* @param job The job for which to provide a progress tracker.
* @return A ProgressTracker instance for the given job.
* @param job The job for which to provide metadata.
* @return A map containing metadata about the job execution.
*/
default ProgressTracker progressTracker(Job job) {
Map<String, Object> getResultMetadata(Job job);

/**
* Provides a progress tracker. The default implementation returns a new instance of
* DefaultProgressTracker.
*
* @return A ProgressTracker instance
*/
default ProgressTracker progressTracker() {
return new DefaultProgressTracker();
}

Expand Down
Loading

0 comments on commit a15d7c8

Please sign in to comment.