Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
fabrizzio-dotCMS committed Oct 3, 2024
1 parent 892e5cb commit 69fa89a
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ public interface JobQueueManagerAPI {
* @param queueName The name of the queue
* @param processor The job processor to register
*/
void registerProcessor(String queueName, JobProcessor processor);
void registerProcessor(final String queueName, final Class<? extends JobProcessor> processor);

/**
* Retrieves the job processors for all registered queues.
* @return A map of queue names to job processors
*/
Map<String,JobProcessor> getQueueNames();
Map<String,Class<? extends JobProcessor>> getQueueNames();

/**
* Creates a new job in the specified queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {

private final CircuitBreaker circuitBreaker;
private final JobQueue jobQueue;
private final Map<String, JobProcessor> processors;
private final Map<String, Class<? extends JobProcessor>> processors;
private final Map<String, JobProcessor> processorInstancesByJobId;
private final int threadPoolSize;
private ExecutorService executorService;
private final Map<String, RetryStrategy> retryStrategies;
Expand Down Expand Up @@ -140,6 +141,7 @@ public JobQueueManagerAPIImpl(JobQueue jobQueue,
this.jobQueue = jobQueue;
this.threadPoolSize = jobQueueConfig.getThreadPoolSize();
this.processors = new ConcurrentHashMap<>();
this.processorInstancesByJobId = new ConcurrentHashMap<>();
this.retryStrategies = new ConcurrentHashMap<>();
this.defaultRetryStrategy = defaultRetryStrategy;
this.circuitBreaker = circuitBreaker;
Expand Down Expand Up @@ -224,19 +226,19 @@ public void close() throws Exception {
}

@Override
public void registerProcessor(final String queueName, final JobProcessor processor) {
public void registerProcessor(final String queueName, final Class<? extends JobProcessor> processor) {
final String queueNameLower = queueName.toLowerCase();
final JobProcessor jobProcessor = processors.get(queueNameLower);
final Class<? extends JobProcessor> jobProcessor = processors.get(queueNameLower);
if (null != jobProcessor) {
Logger.warn(this, String.format(
"Job processor [%s] already registered for queue: [%s] is getting overridden.",
jobProcessor.getClass().getName(), queueName));
jobProcessor.getName(), queueName));
}
processors.put(queueNameLower, processor);
}

@Override
public Map<String,JobProcessor> getQueueNames() {
public Map<String,Class<? extends JobProcessor>> getQueueNames() {
return Map.copyOf(processors);
}

Expand Down Expand Up @@ -319,8 +321,7 @@ public void cancelJob(final String jobId) throws DotDataException {
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching job", e);
}
final String queueNameLower = job.queueName().toLowerCase();
final var processor = processors.get(queueNameLower);
final var processor = processorInstancesByJobId.get(jobId);
if (processor instanceof Cancellable) {

try {
Expand All @@ -337,7 +338,7 @@ public void cancelJob(final String jobId) throws DotDataException {
} else {

if (processor == null) {
final var error = new JobProcessorNotFoundException(job.queueName());
final var error = new JobProcessorNotFoundException(job.queueName(), jobId);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}
Expand Down Expand Up @@ -619,8 +620,11 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException
*/
private void processJob(final Job job) throws DotDataException {

JobProcessor processor = processors.get(job.queueName());
if (processor != null) {
Class<? extends JobProcessor> processorClass = processors.get(job.queueName());
if (processorClass != null) {

final JobProcessor processor = newJobProcessorInstance(job,
processorClass);

final ProgressTracker progressTracker = new DefaultProgressTracker();
Job runningJob = job.markAsRunning().withProgressTracker(progressTracker);
Expand Down Expand Up @@ -665,6 +669,8 @@ private void processJob(final Job job) throws DotDataException {
handleJobFailure(
runningJob, processor, e, e.getMessage(), "Job execution"
);
} finally {
processorInstancesByJobId.remove(job.id());
}
} else {

Expand All @@ -675,6 +681,25 @@ private void processJob(final Job job) throws DotDataException {
}
}

/**
* Instantiate a new JobProcessor instance for a specific job. and store the reference in a map.
* @param job
* @param processorClass
* @return
*/
private JobProcessor newJobProcessorInstance(final Job job, final Class<? extends JobProcessor> processorClass) {
//Get an instance and put it in the map
return processorInstancesByJobId.computeIfAbsent(
job.id(), k -> {
try {
return processorClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new DotRuntimeException("Error creating job processor", e);
}
}
);
}

/**
* Handles the completion of a job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,13 @@ public class JobProcessorNotFoundException extends RuntimeException {
public JobProcessorNotFoundException(String queueName) {
super("No job processor found for queue: " + queueName);
}

/**
* Constructs a new JobProcessorNotFoundException with the specified queue name and job ID.
* @param queueName The name of the queue for which no processor was found
* @param jobId The ID of the job for which no processor was found
*/
public JobProcessorNotFoundException(String queueName, String jobId) {
super("No job processor found for queue: " + queueName + " and job id: " + jobId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class JobNotFoundException extends JobQueueException {
*
* @param jobId the ID of the job that was not found
*/
public JobNotFoundException(String jobId) {
public JobNotFoundException(final String jobId) {
super("Job with id: " + jobId + " not found");
}

Expand Down
85 changes: 65 additions & 20 deletions dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.dotcms.jobs.business.api.JobProcessorScanner;
import com.dotcms.jobs.business.api.JobQueueManagerAPI;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.processor.JobProcessor;
Expand All @@ -10,6 +11,7 @@
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotcms.rest.api.v1.temp.TempFileAPI;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.exception.DoesNotExistException;
import com.dotmarketing.exception.DotDataException;
import com.dotmarketing.util.Logger;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -51,22 +53,39 @@ public void onInit() {
final List<Class<? extends JobProcessor>> processors = scanner.discoverJobProcessors();
processors.forEach(processor -> {
try {
final Constructor<? extends JobProcessor> declaredConstructor = processor.getDeclaredConstructor();
final JobProcessor jobProcessor = declaredConstructor.newInstance();
if(!testInstantiation(processor)){
return;
}
//registering the processor with the jobQueueManagerAPI
// lower case it to avoid case
if(processor.isAnnotationPresent(Queue.class)){
final Queue queue = processor.getAnnotation(Queue.class);
jobQueueManagerAPI.registerProcessor(queue.value(), jobProcessor);
jobQueueManagerAPI.registerProcessor(queue.value(), processor);
} else {
jobQueueManagerAPI.registerProcessor(processor.getName(), jobProcessor);
jobQueueManagerAPI.registerProcessor(processor.getName(), processor);
}
}catch (Exception e){
Logger.error(this.getClass(), "Unable to register JobProcessor ", e);
}
});
}

/**
* Test if a processor can be instantiated
* @param processor The processor to tested
* @return true if the processor can be instantiated, false otherwise
*/
private boolean testInstantiation(Class<? extends JobProcessor> processor) {
try {
final Constructor<? extends JobProcessor> declaredConstructor = processor.getDeclaredConstructor();
declaredConstructor.newInstance();
return true;
} catch (Exception e) {
Logger.error(this.getClass(), String.format(" JobProcessor [%s] can not be instantiated and will be ignored.",processor.getName()), e);
}
return false;
}

@PreDestroy
public void onDestroy() {
if(jobQueueManagerAPI.isStarted()){
Expand Down Expand Up @@ -98,7 +117,12 @@ String createJob(String queueName, JobParams form, HttpServletRequest request)

final HashMap <String, Object>in = new HashMap<>(form.getParams());
handleUploadIfPresent(form, in, request);
return jobQueueManagerAPI.createJob(queueName, Map.copyOf(in));
try {
return jobQueueManagerAPI.createJob(queueName, Map.copyOf(in));
} catch (JobProcessorNotFoundException e) {
Logger.error(this.getClass(), "Error creating job", e);
throw new DoesNotExistException(e.getMessage());
}
}

/**
Expand All @@ -107,7 +131,7 @@ String createJob(String queueName, JobParams form, HttpServletRequest request)
* @return Job
* @throws DotDataException if there's an error fetching the job
*/
Job getJob(String jobId) throws DotDataException{
Job getJob(String jobId) throws DotDataException {
return jobQueueManagerAPI.getJob(jobId);
}

Expand All @@ -116,16 +140,24 @@ Job getJob(String jobId) throws DotDataException{
* @param jobId The ID of the job
* @throws DotDataException if there's an error cancelling the job
*/
void cancelJob(String jobId) throws DotDataException{
jobQueueManagerAPI.cancelJob(jobId);
void cancelJob(String jobId) throws DotDataException {
try{
jobQueueManagerAPI.cancelJob(jobId);
} catch (JobProcessorNotFoundException e) {
Logger.error(this.getClass(), "Error cancelling job", e);
throw new DoesNotExistException(e.getMessage());
}
}

/**
* watches a job
* @param jobId The ID of the job
* @param watcher The watcher
*/
void watchJob(String jobId, Consumer<Job> watcher){
void watchJob(String jobId, Consumer<Job> watcher) throws DotDataException {
//Validate the job exists
jobQueueManagerAPI.getJob(jobId);
// if it does then watch it
jobQueueManagerAPI.watchJob(jobId, watcher);
}

Expand All @@ -136,8 +168,13 @@ void watchJob(String jobId, Consumer<Job> watcher){
* @return JobPaginatedResult
* @throws DotDataException if there's an error fetching the jobs
*/
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException{
return jobQueueManagerAPI.getJobs(page, pageSize);
JobPaginatedResult getJobs(int page, int pageSize) {
try {
return jobQueueManagerAPI.getJobs(page, pageSize);
} catch (DotDataException e){
Logger.error(this.getClass(), "Error fetching jobs", e);
}
return JobPaginatedResult.builder().build();
}

/**
Expand All @@ -147,9 +184,13 @@ JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException{
* @return JobPaginatedResult
* @throws DotDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
return jobQueueManagerAPI.getActiveJobs( queueName, page, pageSize);
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) {
try {
return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize);
} catch (JobQueueDataException e) {
Logger.error(this.getClass(), "Error fetching active jobs", e);
}
return JobPaginatedResult.builder().build();
}

/**
Expand All @@ -159,9 +200,13 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getFailedJobs(int page, int pageSize)
throws JobQueueDataException {
return jobQueueManagerAPI.getFailedJobs(page, pageSize);
JobPaginatedResult getFailedJobs(int page, int pageSize) {
try {
return jobQueueManagerAPI.getFailedJobs(page, pageSize);
} catch (JobQueueDataException e) {
Logger.error(this.getClass(), "Error fetching failed jobs", e);
}
return JobPaginatedResult.builder().build();
}

/**
Expand All @@ -174,9 +219,9 @@ Set<String> getQueueNames(){

/**
* if a file is uploaded, move it to temp location and update params
* @param form
* @param params
* @param request
* @param form The form
* @param params The params
* @param request The request
*/
private void handleUploadIfPresent(final JobParams form, Map<String, Object> params, HttpServletRequest request) {
final InputStream fileInputStream = form.getFileInputStream();
Expand Down
Loading

0 comments on commit 69fa89a

Please sign in to comment.