Skip to content

Commit

Permalink
#29479 Refactor job cancellation handling
Browse files Browse the repository at this point in the history
Decoupled job cancellation logic by introducing a `Cancellable` interface. This improves clarity and separation of concerns, ensuring only processors capable of cancellation implement the relevant method. Updated existing tests to support the new interface and ensured compatibility with the JobQueueManagerAPI.
  • Loading branch information
jgambarios committed Sep 25, 2024
1 parent fd872ba commit a05c6ed
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Defines the contract for interacting with the job queue system. This interface provides methods
* for managing jobs, processors, and the overall state of the job queue.
*/
public interface JobQueueManagerAPI extends AutoCloseable {
public interface JobQueueManagerAPI {

/**
* Starts the job queue manager, initializing the thread pool for job processing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.job.JobResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.queue.JobQueue;
Expand Down Expand Up @@ -300,13 +301,13 @@ public void cancelJob(final String jobId) throws DotDataException {
}

final var processor = processors.get(job.queueName());
if (processor != null && processor.canCancel(job)) {
if (processor instanceof Cancellable) {

try {

Logger.info(this, "Cancelling job " + jobId);

processor.cancel(job);
((Cancellable) processor).cancel(job);
handleJobCancellation(job, processor);
} catch (Exception e) {
final var error = new DotDataException("Error cancelling job " + jobId, e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.dotcms.jobs.business.processor;

import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.job.Job;

/**
* The Cancellable interface represents a contract for objects that can be cancelled,
* typically long-running operations or jobs.
* <p>
* Implementations of this interface should provide a mechanism to interrupt or
* stop their execution in a controlled manner when the cancel method is invoked.
* <p>
* It's important to note that implementing this interface indicates that the object
* supports cancellation. There is no separate method to check if cancellation is possible;
* the presence of this interface implies that it is.
*/
public interface Cancellable {

/**
* Attempts to cancel the execution of this object.
* <p>
* The exact behavior of this method depends on the specific implementation,
* but it should generally attempt to stop the ongoing operation as quickly
* and safely as possible. This might involve interrupting threads, closing
* resources, or setting flags to stop loops.
* <p>
* Implementations should ensure that this method can be called safely from
* another thread while the operation is in progress.
* <p>
* After this method is called, the object should make its best effort to
* terminate, but there's no guarantee about when the termination will occur.
*
* @throws JobCancellationException if there is an error during the cancellation process.
* This could occur if the job is in a state where it cannot be cancelled,
* or if there's an unexpected error while attempting to cancel.
*/
void cancel(Job job) throws JobCancellationException;

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.dotcms.jobs.business.processor;

import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.JobProcessingException;
import com.dotcms.jobs.business.job.Job;
import java.util.Map;
Expand All @@ -19,22 +18,6 @@ public interface JobProcessor {
*/
void process(Job job) throws JobProcessingException;

/**
* Determines if the given job can be cancelled.
*
* @param job The job to check for cancellation capability.
* @return true if the job can be cancelled, false otherwise.
*/
boolean canCancel(Job job);

/**
* Cancels the given job.
*
* @param job The job to cancel.
* @throws JobCancellationException if an error occurs during cancellation.
*/
void cancel(Job job) throws JobCancellationException;

/**
* Returns metadata about the job execution. This metadata can be used to provide additional
* information about the job's execution, such as statistics or other details useful for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static void logDebug(Class cl, String action, String msg, String hostName
}

private static String getHostName(String hostNameOrId){
if (!UtilMethods.isSet(hostNameOrId)) {
if (!UtilMethods.isSet(hostNameOrId) || "SYSTEM_HOST".equals(hostNameOrId)) {
return "system";
}
Host h = new Host();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import com.dotcms.jobs.business.api.events.RealTimeJobMonitor;
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.ErrorDetail;
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.JobProcessingException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.job.JobResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.DefaultProgressTracker;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
Expand Down Expand Up @@ -1036,20 +1038,43 @@ public void test_CircuitBreaker_Reset() throws Exception {
* ExpectedResult: Job is successfully cancelled and its status is updated
*/
@Test
public void test_simple_cancelJob()
throws DotDataException, JobQueueDataException, JobNotFoundException {
public void test_simple_cancelJob2()
throws DotDataException, JobQueueDataException, JobNotFoundException, JobCancellationException {

class TestJobProcessor implements JobProcessor, Cancellable {

@Override
public void process(Job job) throws JobProcessingException {
}

@Override
public void cancel(Job job) {
}

@Override
public Map<String, Object> getResultMetadata(Job job) {
return null;
}
}

// Create a mock job
Job mockJob = mock(Job.class);
when(mockJobQueue.getJob("job123")).thenReturn(mockJob);
when(mockJob.queueName()).thenReturn("testQueue");
when(mockJob.id()).thenReturn("job123");
when(mockJob.withState(any())).thenReturn(mockJob);

when(mockJobProcessor.canCancel(mockJob)).thenReturn(true);
// Create a mock CancellableJobProcessor
TestJobProcessor mockCancellableProcessor = mock(TestJobProcessor.class);

// Set up the job queue manager to return our mock cancellable processor
jobQueueManagerAPI.registerProcessor("testQueue", mockCancellableProcessor);

// Perform the cancellation
jobQueueManagerAPI.cancelJob("job123");

verify(mockJobProcessor).cancel(mockJob);
// Verify that the cancel method was called on our mock processor
verify(mockCancellableProcessor).cancel(mockJob);
}

/**
Expand All @@ -1060,7 +1085,7 @@ public void test_simple_cancelJob()
@Test
public void test_complex_cancelJob() throws Exception {

class TestJobProcessor implements JobProcessor {
class TestJobProcessor implements JobProcessor, Cancellable {

private final AtomicBoolean cancellationRequested = new AtomicBoolean(false);
private final CountDownLatch processingStarted = new CountDownLatch(1);
Expand All @@ -1084,11 +1109,6 @@ public void process(Job job) throws JobProcessingException {
}
}

@Override
public boolean canCancel(Job job) {
return true;
}

@Override
public void cancel(Job job) {
cancellationRequested.set(true);
Expand Down

0 comments on commit a05c6ed

Please sign in to comment.