From a15d7c8ad29b33b6623fb7282b88f31273189adf Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Fri, 13 Sep 2024 15:31:24 -0600 Subject: [PATCH] #29479 Remove JobResult enum and refactor job handling logic Remove the `JobResult` enum and update job processing methods to use an `AbstractJobResult` interface. This refactor centralizes job result handling, differentiates success, failure, and cancellation cases, and incorporates metadata and error details within the job result. --- .../business/api/JobQueueManagerAPIImpl.java | 109 +++++++++++++----- .../dotcms/jobs/business/job/AbstractJob.java | 43 ++++--- .../jobs/business/job/AbstractJobResult.java | 24 ++++ .../dotcms/jobs/business/job/JobResult.java | 17 --- .../jobs/business/processor/JobProcessor.java | 20 +++- .../business/api/JobQueueManagerAPITest.java | 60 +++++----- 6 files changed, 182 insertions(+), 91 deletions(-) create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJobResult.java delete mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/job/JobResult.java diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 2012f7b727f1..838b29c762c8 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -13,6 +13,7 @@ import com.dotcms.jobs.business.error.ProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobResult; import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; @@ -274,12 +275,7 @@ public void cancelJob(final String jobId) { Logger.info(this, "Cancelling job " + jobId); processor.cancel(job); - - Job cancelledJob = job.markAsCancelled(); - jobQueue.updateJobStatus(cancelledJob); - jobCancelledEvent.fire( - new JobCancelledEvent(cancelledJob, LocalDateTime.now()) - ); + handleJobCancellation(job, processor); } catch (Exception e) { final var error = new JobCancellationException(jobId, e.getMessage()); Logger.error(JobQueueManagerAPIImpl.class, error); @@ -508,7 +504,7 @@ private void processJob(final Job job) { try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) { - final ProgressTracker progressTracker = processor.progressTracker(runningJob); + final ProgressTracker progressTracker = processor.progressTracker(); // Start a separate thread to periodically update and persist progress ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService(); @@ -523,43 +519,98 @@ private void processJob(final Job job) { updateJobProgress(runningJob, progressTracker); } - Job completedJob = runningJob.markAsCompleted(); - jobQueue.updateJobStatus(completedJob); - jobCompletedEvent.fire(new JobCompletedEvent(completedJob, LocalDateTime.now())); + handleJobCompletion(runningJob, processor); } catch (Exception e) { Logger.error(this, - "Error processing job " + runningJob.id() + ": " + e.getMessage(), e); - final var errorDetail = ErrorDetail.builder() - .message("Job processing failed") - .exception(e) - .exceptionClass(e.getClass().getName()) - .processingStage("Job execution") - .timestamp(LocalDateTime.now()) - .build(); - handleJobFailure(runningJob, errorDetail); + "Error processing job " + runningJob.id() + ": " + e.getMessage(), e + ); + handleJobFailure( + runningJob, processor, e, + "Job processing failed", "Job execution" + ); } } else { Logger.error(this, "No processor found for queue: " + job.queueName()); - final var errorDetail = ErrorDetail.builder() - .message("No processor found for queue") - .processingStage("Processor selection") - .timestamp(LocalDateTime.now()) - .build(); - handleJobFailure(job, errorDetail); + handleJobFailure(job, null, new ProcessorNotFoundException(job.queueName()), + "No processor found for queue", "Processor selection" + ); } } + /** + * Handles the completion of a job. + * + * @param job The job that completed. + * @param processor The processor that handled the job. + */ + private void handleJobCompletion(final Job job, final JobProcessor processor) { + + final var resultMetadata = processor.getResultMetadata(job); + + JobResult jobResult = null; + if (resultMetadata != null && !resultMetadata.isEmpty()) { + jobResult = JobResult.builder().metadata(resultMetadata).build(); + } + final Job completedJob = job.markAsCompleted(jobResult); + + jobQueue.updateJobStatus(completedJob); + jobCompletedEvent.fire(new JobCompletedEvent(completedJob, LocalDateTime.now())); + } + + /** + * Handles the cancellation of a job. + * + * @param job The job that was cancelled. + * @param processor The processor that handled the job. + */ + private void handleJobCancellation(final Job job, final JobProcessor processor) { + + final var resultMetadata = processor.getResultMetadata(job); + + JobResult jobResult = null; + if (resultMetadata != null && !resultMetadata.isEmpty()) { + jobResult = JobResult.builder().metadata(resultMetadata).build(); + } + Job cancelledJob = job.markAsCancelled(jobResult); + + jobQueue.updateJobStatus(cancelledJob); + jobCancelledEvent.fire( + new JobCancelledEvent(cancelledJob, LocalDateTime.now()) + ); + } + /** * Handles the failure of a job * - * @param job The job that failed. - * @param errorDetail The details of the error that caused the failure. + * @param job The job that failed. + * @param processor The processor that handled the job. + * @param exception The exception that caused the failure. + * @param errorMessage The error message to include in the job result. + * @param processingStage The stage of processing where the failure occurred. */ - private void handleJobFailure(final Job job, final ErrorDetail errorDetail) { + private void handleJobFailure(final Job job, final JobProcessor processor, + final Exception exception, final String errorMessage, final String processingStage) { + + final var errorDetail = ErrorDetail.builder() + .message(errorMessage) + .exception(exception) + .exceptionClass(exception.getClass().getName()) + .processingStage(processingStage) + .timestamp(LocalDateTime.now()) + .build(); + + JobResult jobResult = JobResult.builder().errorDetail(errorDetail).build(); + + if (processor != null) { + final var resultMetadata = processor.getResultMetadata(job); + if (resultMetadata != null && !resultMetadata.isEmpty()) { + jobResult = jobResult.withMetadata(resultMetadata); + } + } - final Job failedJob = job.markAsFailed(errorDetail); + final Job failedJob = job.markAsFailed(jobResult); jobQueue.updateJobStatus(failedJob); jobFailedEvent.fire(new JobFailedEvent(failedJob, LocalDateTime.now())); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index 89dce288ee2d..e43bd0f6e579 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -37,8 +37,6 @@ public interface AbstractJob { Optional lastException(); - Optional errorDetail(); - @Default default int retryCount() { return 0; @@ -67,17 +65,16 @@ default Job incrementRetry() { } /** - * Creates a new Job marked as failed with the given error detail. + * Creates a new Job marked as failed with the result details. * - * @param errorDetail The error detail to set. + * @param result The result details of the failed job. * @return A new Job instance marked as failed. */ - default Job markAsFailed(com.dotcms.jobs.business.error.ErrorDetail errorDetail) { + default Job markAsFailed(final JobResult result) { return Job.builder().from(this) .state(JobState.FAILED) - .result(JobResult.ERROR) - .errorDetail(errorDetail) - .lastException(errorDetail.exception()) + .result(result) + .lastException(result.errorDetail().get().exception()) .build(); } @@ -87,7 +84,7 @@ default Job markAsFailed(com.dotcms.jobs.business.error.ErrorDetail errorDetail) * @param newState The new state to set. * @return A new Job instance with the updated state. */ - default Job withState(JobState newState) { + default Job withState(final JobState newState) { return Job.builder().from(this) .state(newState) .updatedAt(LocalDateTime.now()) @@ -97,12 +94,22 @@ default Job withState(JobState newState) { /** * Creates a new Job marked as completed. * + * @param result The result details of the completed job. + * * @return A new Job instance marked as completed. */ - default Job markAsCompleted() { + default Job markAsCompleted(final JobResult result) { + if (result != null) { + return Job.builder().from(this) + .state(JobState.COMPLETED) + .result(result) + .completedAt(Optional.of(LocalDateTime.now())) + .updatedAt(LocalDateTime.now()) + .build(); + } + return Job.builder().from(this) .state(JobState.COMPLETED) - .result(JobResult.SUCCESS) .completedAt(Optional.of(LocalDateTime.now())) .updatedAt(LocalDateTime.now()) .build(); @@ -111,12 +118,22 @@ default Job markAsCompleted() { /** * Creates a new Job marked as canceled. * + * @param result The result details of the canceled job. + * * @return A new Job instance marked as canceled. */ - default Job markAsCancelled() { + default Job markAsCancelled(final JobResult result) { + if (result != null) { + return Job.builder().from(this) + .state(JobState.CANCELLED) + .result(result) + .completedAt(Optional.of(LocalDateTime.now())) + .updatedAt(LocalDateTime.now()) + .build(); + } + return Job.builder().from(this) .state(JobState.CANCELLED) - .result(JobResult.SUCCESS) .completedAt(Optional.of(LocalDateTime.now())) .updatedAt(LocalDateTime.now()) .build(); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJobResult.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJobResult.java new file mode 100644 index 000000000000..ecf1f2fa8d35 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJobResult.java @@ -0,0 +1,24 @@ +package com.dotcms.jobs.business.job; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.util.Map; +import java.util.Optional; +import org.immutables.value.Value; + +/** + * Abstract interface for an immutable JobResult class. This interface defines the structure for job + * result information in the job processing system. The concrete implementation will be generated as + * an immutable class named JobResult. + */ +@Value.Style(typeImmutable = "*", typeAbstract = "Abstract*") +@Value.Immutable +@JsonSerialize(as = JobResult.class) +@JsonDeserialize(as = JobResult.class) +public interface AbstractJobResult { + + Optional errorDetail(); + + Optional> metadata(); + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobResult.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobResult.java deleted file mode 100644 index cd89c2d811fb..000000000000 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobResult.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.dotcms.jobs.business.job; - -/** - * Represents the final result of a job execution. - */ -public enum JobResult { - - /** - * Indicates that the job completed successfully. - */ - SUCCESS, - - /** - * Indicates that the job encountered an error during execution. - */ - ERROR -} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/JobProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/JobProcessor.java index 9cd2fbc4c32d..949d1761e957 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/JobProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/JobProcessor.java @@ -3,6 +3,7 @@ import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessingException; import com.dotcms.jobs.business.job.Job; +import java.util.Map; /** * Interface for processing jobs. Implementations of this interface should define how to process, @@ -35,13 +36,22 @@ public interface JobProcessor { void cancel(Job job) throws JobCancellationException; /** - * Provides a progress tracker for the given job. The default implementation returns a new - * instance of DefaultProgressTracker. + * Returns metadata about the job execution. This metadata can be used to provide additional + * information about the job's execution, such as statistics or other details useful for the + * caller. * - * @param job The job for which to provide a progress tracker. - * @return A ProgressTracker instance for the given job. + * @param job The job for which to provide metadata. + * @return A map containing metadata about the job execution. */ - default ProgressTracker progressTracker(Job job) { + Map getResultMetadata(Job job); + + /** + * Provides a progress tracker. The default implementation returns a new instance of + * DefaultProgressTracker. + * + * @return A ProgressTracker instance + */ + default ProgressTracker progressTracker() { return new DefaultProgressTracker(); } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 4f643a2475c1..042c1b018e5c 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -27,6 +27,7 @@ import com.dotcms.jobs.business.error.JobProcessingException; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobResult; import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.DefaultProgressTracker; import com.dotcms.jobs.business.processor.JobProcessor; @@ -246,7 +247,7 @@ public void test_watchJob() throws Exception { // Mock JobProcessor behavior ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(mockJob.progress()).thenReturn(0f); when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); @@ -257,7 +258,7 @@ public void test_watchJob() throws Exception { jobState.set(inv.getArgument(0)); return mockJob; }); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { jobState.set(JobState.COMPLETED); return mockJob; }); @@ -321,7 +322,7 @@ public void test_JobRetry_single_retry() throws Exception { retryCount.incrementAndGet(); return mockJob; }); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { jobState.set(JobState.COMPLETED); return mockJob; }); @@ -342,7 +343,7 @@ public void test_JobRetry_single_retry() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(mockJob.progress()).thenReturn(0f); when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); @@ -353,7 +354,7 @@ public void test_JobRetry_single_retry() throws Exception { throw new RuntimeException("Simulated failure"); } Job job = invocation.getArgument(0); - job.markAsCompleted(); + job.markAsCompleted(any()); return null; }).when(mockJobProcessor).process(any()); @@ -410,7 +411,7 @@ public void test_JobRetry_retry_twice() throws Exception { lastRetryTimestamp.set(System.currentTimeMillis()); return mockJob; }); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { jobState.set(JobState.COMPLETED); return mockJob; }); @@ -433,7 +434,7 @@ public void test_JobRetry_retry_twice() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(mockJob.progress()).thenReturn(0f); when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); @@ -445,7 +446,7 @@ public void test_JobRetry_retry_twice() throws Exception { throw new RuntimeException("Simulated failure"); } Job job = invocation.getArgument(0); - job.markAsCompleted(); + job.markAsCompleted(any()); return null; }).when(mockJobProcessor).process(any()); @@ -468,7 +469,7 @@ public void test_JobRetry_retry_twice() throws Exception { inOrder.verify(mockJob).withState(JobState.RUNNING); inOrder.verify(mockJob).markAsFailed(any()); inOrder.verify(mockJob).withState(JobState.RUNNING); - inOrder.verify(mockJob).markAsCompleted(); + inOrder.verify(mockJob).markAsCompleted(any()); // Verify retry behavior verify(mockRetryStrategy, atLeast(2)).shouldRetry(any(), any()); @@ -531,7 +532,7 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); // Configure job processor to always fail doThrow(new RuntimeException("Simulated failure")).when(mockJobProcessor).process(any()); @@ -581,7 +582,7 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { jobState.set(inv.getArgument(0)); return mockJob; }); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { jobState.set(JobState.COMPLETED); return mockJob; }); @@ -591,14 +592,14 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(mockJob.progress()).thenReturn(0f); when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); // Configure job processor to succeed doAnswer(inv -> { Job job = inv.getArgument(0); - job.markAsCompleted(); + job.markAsCompleted(any()); return null; }).when(mockJobProcessor).process(any()); @@ -659,7 +660,7 @@ public void test_Job_NotRetryable() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(mockJob.progress()).thenReturn(0f); when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); @@ -683,9 +684,9 @@ public void test_Job_NotRetryable() throws Exception { verify(mockJobQueue, times(1)).removeJob(mockJob.id()); // Capture and verify the error details - ArgumentCaptor errorDetailCaptor = ArgumentCaptor.forClass(ErrorDetail.class); - verify(mockJob).markAsFailed(errorDetailCaptor.capture()); - ErrorDetail capturedErrorDetail = errorDetailCaptor.getValue(); + ArgumentCaptor jobResultCaptor = ArgumentCaptor.forClass(JobResult.class); + verify(mockJob).markAsFailed(jobResultCaptor.capture()); + ErrorDetail capturedErrorDetail = jobResultCaptor.getValue().errorDetail().get(); assertEquals("Non-retryable error", capturedErrorDetail.exception().getMessage()); // Stop the job queue @@ -713,7 +714,7 @@ public void test_JobProgressTracking() throws Exception { jobState.set(inv.getArgument(0)); return mockJob; }); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { jobState.set(JobState.COMPLETED); return mockJob; }); @@ -730,7 +731,7 @@ public void test_JobProgressTracking() throws Exception { // Create a real ProgressTracker ProgressTracker realProgressTracker = new DefaultProgressTracker(); - when(mockJobProcessor.progressTracker(any())).thenReturn(realProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(realProgressTracker); // Make the circuit breaker always allow requests when(mockCircuitBreaker.allowRequest()).thenReturn(true); @@ -756,7 +757,7 @@ public void test_JobProgressTracking() throws Exception { } Job job = inv.getArgument(0); - job.markAsCompleted(); + job.markAsCompleted(any()); return null; }).when(mockJobProcessor).process(any()); @@ -836,7 +837,7 @@ public void test_CircuitBreaker_Opens() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(failingJob.progress()).thenReturn(0f); when(failingJob.withProgress(anyFloat())).thenReturn(failingJob); @@ -903,7 +904,7 @@ public void test_CircuitBreaker_Closes() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(mockJob.progress()).thenReturn(0f); when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); @@ -915,12 +916,12 @@ public void test_CircuitBreaker_Closes() throws Exception { } Job processingJob = inv.getArgument(0); - processingJob.markAsCompleted(); + processingJob.markAsCompleted(any()); return null; }).when(mockJobProcessor).process(any()); AtomicReference jobState = new AtomicReference<>(JobState.PENDING); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { jobState.set(JobState.COMPLETED); return mockJob; }); @@ -979,7 +980,7 @@ public void test_CircuitBreaker_Reset() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJobProcessor.progressTracker()).thenReturn(mockProgressTracker); when(failingJob.progress()).thenReturn(0f); when(failingJob.withProgress(anyFloat())).thenReturn(failingJob); @@ -1081,6 +1082,11 @@ public void cancel(Job job) { cancellationRequested.set(true); } + @Override + public Map getResultMetadata(Job job) { + return null; + } + public boolean awaitProcessingStart(long timeout, TimeUnit unit) throws InterruptedException { return processingStarted.await(timeout, unit); @@ -1111,11 +1117,11 @@ public boolean awaitProcessingCompleted(long timeout, TimeUnit unit) stateUpdates.add(inv.getArgument(0)); return mockJob; }); - when(mockJob.markAsCancelled()).thenAnswer(inv -> { + when(mockJob.markAsCancelled(any())).thenAnswer(inv -> { stateUpdates.add(JobState.CANCELLED); return mockJob; }); - when(mockJob.markAsCompleted()).thenAnswer(inv -> { + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { stateUpdates.add(JobState.COMPLETED); return mockJob; });