From 69fa89a82eb1ea4b5cc26edeafff5321d45fbe7b Mon Sep 17 00:00:00 2001 From: fabrizzio-dotCMS Date: Wed, 2 Oct 2024 22:10:28 -0600 Subject: [PATCH] #29480 --- .../jobs/business/api/JobQueueManagerAPI.java | 4 +- .../business/api/JobQueueManagerAPIImpl.java | 45 ++++++-- .../error/JobProcessorNotFoundException.java | 9 ++ .../queue/error/JobNotFoundException.java | 2 +- .../rest/api/v1/job/JobQueueHelper.java | 85 ++++++++++---- .../rest/api/v1/job/JobQueueResource.java | 108 ++++++++++-------- 6 files changed, 173 insertions(+), 80 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 6619750d140d..958e206224c8 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -56,13 +56,13 @@ public interface JobQueueManagerAPI { * @param queueName The name of the queue * @param processor The job processor to register */ - void registerProcessor(String queueName, JobProcessor processor); + void registerProcessor(final String queueName, final Class processor); /** * Retrieves the job processors for all registered queues. * @return A map of queue names to job processors */ - Map getQueueNames(); + Map> getQueueNames(); /** * Creates a new job in the specified queue. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 65f437dc283f..2bddb7bc72b4 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -99,7 +99,8 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private final CircuitBreaker circuitBreaker; private final JobQueue jobQueue; - private final Map processors; + private final Map> processors; + private final Map processorInstancesByJobId; private final int threadPoolSize; private ExecutorService executorService; private final Map retryStrategies; @@ -140,6 +141,7 @@ public JobQueueManagerAPIImpl(JobQueue jobQueue, this.jobQueue = jobQueue; this.threadPoolSize = jobQueueConfig.getThreadPoolSize(); this.processors = new ConcurrentHashMap<>(); + this.processorInstancesByJobId = new ConcurrentHashMap<>(); this.retryStrategies = new ConcurrentHashMap<>(); this.defaultRetryStrategy = defaultRetryStrategy; this.circuitBreaker = circuitBreaker; @@ -224,19 +226,19 @@ public void close() throws Exception { } @Override - public void registerProcessor(final String queueName, final JobProcessor processor) { + public void registerProcessor(final String queueName, final Class processor) { final String queueNameLower = queueName.toLowerCase(); - final JobProcessor jobProcessor = processors.get(queueNameLower); + final Class jobProcessor = processors.get(queueNameLower); if (null != jobProcessor) { Logger.warn(this, String.format( "Job processor [%s] already registered for queue: [%s] is getting overridden.", - jobProcessor.getClass().getName(), queueName)); + jobProcessor.getName(), queueName)); } processors.put(queueNameLower, processor); } @Override - public Map getQueueNames() { + public Map> getQueueNames() { return Map.copyOf(processors); } @@ -319,8 +321,7 @@ public void cancelJob(final String jobId) throws DotDataException { } catch (JobQueueDataException e) { throw new DotDataException("Error fetching job", e); } - final String queueNameLower = job.queueName().toLowerCase(); - final var processor = processors.get(queueNameLower); + final var processor = processorInstancesByJobId.get(jobId); if (processor instanceof Cancellable) { try { @@ -337,7 +338,7 @@ public void cancelJob(final String jobId) throws DotDataException { } else { if (processor == null) { - final var error = new JobProcessorNotFoundException(job.queueName()); + final var error = new JobProcessorNotFoundException(job.queueName(), jobId); Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } @@ -619,8 +620,11 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException */ private void processJob(final Job job) throws DotDataException { - JobProcessor processor = processors.get(job.queueName()); - if (processor != null) { + Class processorClass = processors.get(job.queueName()); + if (processorClass != null) { + + final JobProcessor processor = newJobProcessorInstance(job, + processorClass); final ProgressTracker progressTracker = new DefaultProgressTracker(); Job runningJob = job.markAsRunning().withProgressTracker(progressTracker); @@ -665,6 +669,8 @@ private void processJob(final Job job) throws DotDataException { handleJobFailure( runningJob, processor, e, e.getMessage(), "Job execution" ); + } finally { + processorInstancesByJobId.remove(job.id()); } } else { @@ -675,6 +681,25 @@ private void processJob(final Job job) throws DotDataException { } } + /** + * Instantiate a new JobProcessor instance for a specific job. and store the reference in a map. + * @param job + * @param processorClass + * @return + */ + private JobProcessor newJobProcessorInstance(final Job job, final Class processorClass) { + //Get an instance and put it in the map + return processorInstancesByJobId.computeIfAbsent( + job.id(), k -> { + try { + return processorClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new DotRuntimeException("Error creating job processor", e); + } + } + ); + } + /** * Handles the completion of a job. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java index c1b51d621bd1..aba5ff391e8a 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorNotFoundException.java @@ -14,4 +14,13 @@ public class JobProcessorNotFoundException extends RuntimeException { public JobProcessorNotFoundException(String queueName) { super("No job processor found for queue: " + queueName); } + + /** + * Constructs a new JobProcessorNotFoundException with the specified queue name and job ID. + * @param queueName The name of the queue for which no processor was found + * @param jobId The ID of the job for which no processor was found + */ + public JobProcessorNotFoundException(String queueName, String jobId) { + super("No job processor found for queue: " + queueName + " and job id: " + jobId); + } } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java index 977cf8198cf3..555b17242af8 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/error/JobNotFoundException.java @@ -11,7 +11,7 @@ public class JobNotFoundException extends JobQueueException { * * @param jobId the ID of the job that was not found */ - public JobNotFoundException(String jobId) { + public JobNotFoundException(final String jobId) { super("Job with id: " + jobId + " not found"); } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index e181ec7e7854..902d5c5cfb70 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -2,6 +2,7 @@ import com.dotcms.jobs.business.api.JobProcessorScanner; import com.dotcms.jobs.business.api.JobQueueManagerAPI; +import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.jobs.business.processor.JobProcessor; @@ -10,6 +11,7 @@ import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotcms.rest.api.v1.temp.TempFileAPI; import com.dotmarketing.business.APILocator; +import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; @@ -51,15 +53,16 @@ public void onInit() { final List> processors = scanner.discoverJobProcessors(); processors.forEach(processor -> { try { - final Constructor declaredConstructor = processor.getDeclaredConstructor(); - final JobProcessor jobProcessor = declaredConstructor.newInstance(); + if(!testInstantiation(processor)){ + return; + } //registering the processor with the jobQueueManagerAPI // lower case it to avoid case if(processor.isAnnotationPresent(Queue.class)){ final Queue queue = processor.getAnnotation(Queue.class); - jobQueueManagerAPI.registerProcessor(queue.value(), jobProcessor); + jobQueueManagerAPI.registerProcessor(queue.value(), processor); } else { - jobQueueManagerAPI.registerProcessor(processor.getName(), jobProcessor); + jobQueueManagerAPI.registerProcessor(processor.getName(), processor); } }catch (Exception e){ Logger.error(this.getClass(), "Unable to register JobProcessor ", e); @@ -67,6 +70,22 @@ public void onInit() { }); } + /** + * Test if a processor can be instantiated + * @param processor The processor to tested + * @return true if the processor can be instantiated, false otherwise + */ + private boolean testInstantiation(Class processor) { + try { + final Constructor declaredConstructor = processor.getDeclaredConstructor(); + declaredConstructor.newInstance(); + return true; + } catch (Exception e) { + Logger.error(this.getClass(), String.format(" JobProcessor [%s] can not be instantiated and will be ignored.",processor.getName()), e); + } + return false; + } + @PreDestroy public void onDestroy() { if(jobQueueManagerAPI.isStarted()){ @@ -98,7 +117,12 @@ String createJob(String queueName, JobParams form, HttpServletRequest request) final HashMap in = new HashMap<>(form.getParams()); handleUploadIfPresent(form, in, request); - return jobQueueManagerAPI.createJob(queueName, Map.copyOf(in)); + try { + return jobQueueManagerAPI.createJob(queueName, Map.copyOf(in)); + } catch (JobProcessorNotFoundException e) { + Logger.error(this.getClass(), "Error creating job", e); + throw new DoesNotExistException(e.getMessage()); + } } /** @@ -107,7 +131,7 @@ String createJob(String queueName, JobParams form, HttpServletRequest request) * @return Job * @throws DotDataException if there's an error fetching the job */ - Job getJob(String jobId) throws DotDataException{ + Job getJob(String jobId) throws DotDataException { return jobQueueManagerAPI.getJob(jobId); } @@ -116,8 +140,13 @@ Job getJob(String jobId) throws DotDataException{ * @param jobId The ID of the job * @throws DotDataException if there's an error cancelling the job */ - void cancelJob(String jobId) throws DotDataException{ - jobQueueManagerAPI.cancelJob(jobId); + void cancelJob(String jobId) throws DotDataException { + try{ + jobQueueManagerAPI.cancelJob(jobId); + } catch (JobProcessorNotFoundException e) { + Logger.error(this.getClass(), "Error cancelling job", e); + throw new DoesNotExistException(e.getMessage()); + } } /** @@ -125,7 +154,10 @@ void cancelJob(String jobId) throws DotDataException{ * @param jobId The ID of the job * @param watcher The watcher */ - void watchJob(String jobId, Consumer watcher){ + void watchJob(String jobId, Consumer watcher) throws DotDataException { + //Validate the job exists + jobQueueManagerAPI.getJob(jobId); + // if it does then watch it jobQueueManagerAPI.watchJob(jobId, watcher); } @@ -136,8 +168,13 @@ void watchJob(String jobId, Consumer watcher){ * @return JobPaginatedResult * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException{ - return jobQueueManagerAPI.getJobs(page, pageSize); + JobPaginatedResult getJobs(int page, int pageSize) { + try { + return jobQueueManagerAPI.getJobs(page, pageSize); + } catch (DotDataException e){ + Logger.error(this.getClass(), "Error fetching jobs", e); + } + return JobPaginatedResult.builder().build(); } /** @@ -147,9 +184,13 @@ JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException{ * @return JobPaginatedResult * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException { - return jobQueueManagerAPI.getActiveJobs( queueName, page, pageSize); + JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { + try { + return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize); + } catch (JobQueueDataException e) { + Logger.error(this.getClass(), "Error fetching active jobs", e); + } + return JobPaginatedResult.builder().build(); } /** @@ -159,9 +200,13 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @return A result object containing the list of completed jobs and pagination information. * @throws JobQueueDataException if there's an error fetching the jobs */ - JobPaginatedResult getFailedJobs(int page, int pageSize) - throws JobQueueDataException { - return jobQueueManagerAPI.getFailedJobs(page, pageSize); + JobPaginatedResult getFailedJobs(int page, int pageSize) { + try { + return jobQueueManagerAPI.getFailedJobs(page, pageSize); + } catch (JobQueueDataException e) { + Logger.error(this.getClass(), "Error fetching failed jobs", e); + } + return JobPaginatedResult.builder().build(); } /** @@ -174,9 +219,9 @@ Set getQueueNames(){ /** * if a file is uploaded, move it to temp location and update params - * @param form - * @param params - * @param request + * @param form The form + * @param params The params + * @param request The request */ private void handleUploadIfPresent(final JobParams form, Map params, HttpServletRequest request) { final InputStream fileInputStream = form.getFileInputStream(); diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index fd2372ed672c..bbc9394097c1 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -3,7 +3,6 @@ import com.dotcms.cdi.CDIUtils; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; -import com.dotcms.rest.InitDataObject; import com.dotcms.rest.ResponseEntityView; import com.dotcms.rest.WebResource; import com.dotcms.rest.annotation.NoCache; @@ -54,7 +53,12 @@ public Response createJob( @Context HttpServletRequest request, @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { - webResource.init(null, true, request, true, null); + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); final String jobId = helper.createJob(queueName, form, request); return Response.ok(new ResponseEntityView<>(jobId)).build(); } @@ -63,21 +67,28 @@ public Response createJob( @Path("/queues") @Produces(MediaType.APPLICATION_JSON) public Response getQueues(@Context HttpServletRequest request) { - try { - webResource.init(null, true, request, true, null); + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); return Response.ok(new ResponseEntityView<>(helper.getQueueNames())).build(); - } catch (Exception e) { - Logger.error(this, "Error getting job status", e); - return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); - } + } @GET @Path("/{jobId}/status") @Produces(MediaType.APPLICATION_JSON) - public Response getJobStatus(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { - try { - webResource.init(null, true, request, true, null); + public Response getJobStatus(@Context HttpServletRequest request, @PathParam("jobId") String jobId) + throws DotDataException { + + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); Job job = helper.getJob(jobId); Map statusInfo = Map.of( @@ -87,24 +98,22 @@ public Response getJobStatus(@Context HttpServletRequest request, @PathParam("jo ); return Response.ok(new ResponseEntityView<>(statusInfo)).build(); - } catch (Exception e) { - Logger.error(this, "Error getting job status", e); - return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); - } + } @POST @Path("/{jobId}/cancel") @Produces(MediaType.APPLICATION_JSON) - public Response cancelJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { - try { - webResource.init(null, true, request, true, null); + public Response cancelJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) + throws DotDataException { + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); helper.cancelJob(jobId); return Response.ok(new ResponseEntityView<>("Job cancelled successfully")).build(); - } catch (Exception e) { - Logger.error(this, "Error cancelling job", e); - return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); - } } @GET @@ -112,16 +121,15 @@ public Response cancelJob(@Context HttpServletRequest request, @PathParam("jobId public Response listJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - try { - InitDataObject initData = webResource.init(null, true, request, true, null); - initData.getUser(); - - JobPaginatedResult result = helper.getJobs(page, pageSize); + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getJobs(page, pageSize); return Response.ok(new ResponseEntityView<>(result)).build(); - } catch (Exception e) { - Logger.error(this, "Error listing jobs", e); - return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); - } + } @GET @@ -130,14 +138,15 @@ public Response listJobs(@Context HttpServletRequest request, public Response activeJobs(@Context HttpServletRequest request, @PathParam("queueName") String queueName, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - try { - webResource.init(null, true, request, true, null); - JobPaginatedResult result = helper.getActiveJobs(queueName, page, pageSize); + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getActiveJobs(queueName, page, pageSize); return Response.ok(new ResponseEntityView<>(result)).build(); - } catch (Exception e) { - Logger.error(this, "Error listing active jobs", e); - return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); - } + } @GET @@ -146,14 +155,14 @@ public Response activeJobs(@Context HttpServletRequest request, @PathParam("queu public Response failedJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - try { - webResource.init(null, true, request, true, null); - JobPaginatedResult result = helper.getFailedJobs(page, pageSize); + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getFailedJobs(page, pageSize); return Response.ok(new ResponseEntityView<>(result)).build(); - } catch (Exception e) { - Logger.error(this, "Error listing failed jobs", e); - return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); - } } @@ -162,9 +171,14 @@ public Response failedJobs(@Context HttpServletRequest request, @Produces(SseFeature.SERVER_SENT_EVENTS) @NoCache public EventOutput monitorJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { - EventOutput eventOutput = new EventOutput(); + final EventOutput eventOutput = new EventOutput(); + new WebResource.InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); try { - webResource.init(null, true, request, true, null); helper.watchJob(jobId, job -> { try {