diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorScanner.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorScanner.java new file mode 100644 index 000000000000..d04e0c13772b --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorScanner.java @@ -0,0 +1,54 @@ +package com.dotcms.jobs.business.api; + +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotmarketing.util.Logger; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import javax.enterprise.context.ApplicationScoped; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.DotName; +import org.jboss.jandex.Index; +import org.jboss.jandex.IndexReader; + +@ApplicationScoped +public class JobProcessorScanner { + + + public List> discoverJobProcessors() { + List> jobProcessors = new ArrayList<>(); + try { + + Index index = getJandexIndex(); + DotName jobProcessorInterface = DotName.createSimple(JobProcessor.class.getName()); + + Collection implementors = index.getAllKnownImplementors(jobProcessorInterface); + + for (ClassInfo classInfo : implementors) { + String className = classInfo.name().toString(); + + Class clazz = Class.forName(className); + if (JobProcessor.class.isAssignableFrom(clazz)) { + jobProcessors.add((Class) clazz); + } + } + + } catch (IOException | ClassNotFoundException e) { + Logger.error(JobProcessorScanner.class, "Error discovering JobProcessors", e); + + } + return jobProcessors; + } + + private Index getJandexIndex() throws IOException { + InputStream input = getClass().getClassLoader().getResourceAsStream("META-INF/jandex.idx"); + if (input == null) { + throw new IOException("Jandex index not found"); + } + IndexReader reader = new IndexReader(input); + return reader.read(); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index aa74dd88257f..391c27b4d97d 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -7,6 +7,7 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.queue.JobQueue; +import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotmarketing.exception.DotDataException; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -57,6 +58,12 @@ public interface JobQueueManagerAPI { */ void registerProcessor(String queueName, JobProcessor processor); + /** + * Retrieves the job processors for all registered queues. + * @return A map of queue names to job processors + */ + Map getQueueNames(); + /** * Creates a new job in the specified queue. * @@ -88,6 +95,26 @@ String createJob(String queueName, Map parameters) */ JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException; + /** + * Retrieves a list of active jobs for a specific queue. + * @param queueName The name of the queue + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of active jobs and pagination information. + * @throws JobQueueDataException if there's an error fetching the jobs + */ + JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException; + + /** + * Retrieves a list of completed jobs for a specific queue within a date range. + * @param queueName The name of the queue + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of completed jobs and pagination information. + * @throws JobQueueDataException + */ + JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize) throws JobQueueDataException; + /** * Cancels a job. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index f675c331d21f..e3d743b2f812 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -225,22 +225,34 @@ public void close() throws Exception { @Override public void registerProcessor(final String queueName, final JobProcessor processor) { - processors.put(queueName, processor); + final String queueNameLower = queueName.toLowerCase(); + final JobProcessor jobProcessor = processors.get(queueNameLower); + if (null != jobProcessor) { + Logger.warn(this, String.format( + "Job processor [%s] already registered for queue: [%s] is getting overridden.", + jobProcessor.getClass().getName(), queueName)); + } + processors.put(queueNameLower, processor); + } + + @Override + public Map getQueueNames() { + return Map.copyOf(processors); } @WrapInTransaction @Override public String createJob(final String queueName, final Map parameters) throws JobProcessorNotFoundException, DotDataException { - - if (!processors.containsKey(queueName)) { + final String queueNameLower = queueName.toLowerCase(); + if (!processors.containsKey(queueNameLower)) { final var error = new JobProcessorNotFoundException(queueName); Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } try { - String jobId = jobQueue.createJob(queueName, parameters); + String jobId = jobQueue.createJob(queueNameLower, parameters); eventProducer.getEvent(JobCreatedEvent.class).fire( new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters) ); @@ -272,6 +284,30 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot } } + @CloseDBIfOpened + @Override + public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) + throws JobQueueDataException { + final String queueNameLower = queueName.toLowerCase(); + try { + return jobQueue.getActiveJobs(queueNameLower, page, pageSize); + } catch (JobQueueDataException e) { + throw new JobQueueDataException("Error fetching active jobs", e); + } + } + + @CloseDBIfOpened + @Override + public JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize) + throws JobQueueDataException { + final String queueNameLower = queueName.toLowerCase(); + try { + return jobQueue.getFailedJobs(queueNameLower, page, pageSize); + } catch (JobQueueDataException e) { + throw new JobQueueDataException("Error fetching active jobs", e); + } + } + @WrapInTransaction @Override public void cancelJob(final String jobId) throws DotDataException { @@ -284,8 +320,8 @@ public void cancelJob(final String jobId) throws DotDataException { } catch (JobQueueDataException e) { throw new DotDataException("Error fetching job", e); } - - final var processor = processors.get(job.queueName()); + final String queueNameLower = job.queueName().toLowerCase(); + final var processor = processors.get(queueNameLower); if (processor instanceof Cancellable) { try { diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Queue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Queue.java new file mode 100644 index 000000000000..f6463528b8f7 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Queue.java @@ -0,0 +1,14 @@ +package com.dotcms.jobs.business.processor; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +public @interface Queue { + String value(); +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FileReaderJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FileReaderJob.java new file mode 100644 index 000000000000..bd22b2ed3243 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FileReaderJob.java @@ -0,0 +1,76 @@ +package com.dotcms.jobs.business.processor.impl; + +import com.dotcms.jobs.business.error.JobCancellationException; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.processor.Cancellable; +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotcms.jobs.business.processor.Queue; +import com.dotmarketing.util.Logger; +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.Map; + +@Queue("FileReader") +public class FileReaderJob implements JobProcessor, Cancellable { + + boolean working = true; + + @Override + public void process(Job job) { + // Retrieve job parameters + Logger.info(this.getClass(), "Processing job: " + job.id()); + Map params = job.parameters(); + String filePath = (String) params.get("filePath"); + final Object nLinesRaw = params.get("nLines"); + if(!(nLinesRaw instanceof String)) { + Logger.error(this.getClass(), "Parameter 'nLines' is required."); + return; + } + int nLines = Integer.parseInt((String) nLinesRaw); + // Validate required parameters + if (filePath == null || nLines <= 0) { + Logger.error(this.getClass(), "Parameters 'filePath' and 'nLines' (greater than zero) are required."); + return; + } + + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + String line; + int lineCount = 0; + int totalLines = 0; + + Logger.info(this.getClass(), "Starting to read the file: " + filePath); + + while (working && (line = reader.readLine()) != null) { + lineCount++; + totalLines++; + + // Print the line when the counter reaches nLines + if (lineCount == nLines) { + Logger.info(this.getClass(), "Line " + totalLines + ": " + line); + lineCount = 0; // Reset the counter + } + Thread.sleep(1000); // Simulate processing time + } + + Logger.info(this.getClass(), "Reading completed. Total lines read: " + totalLines); + + } catch (IOException e) { + Logger.error(this.getClass(), "Error reading the file: " + e.getMessage()); + } catch (Exception e) { + Logger.error(this.getClass(), "Unexpected error during processing: " + e.getMessage()); + } + } + + @Override + public Map getResultMetadata(Job job) { + return Map.of(); + } + + + @Override + public void cancel(Job job) throws JobCancellationException { + Logger.info(this.getClass(), "Job cancelled: " + job.id()); + working = false; + } +} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java new file mode 100644 index 000000000000..aa5bada7c1c7 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java @@ -0,0 +1,58 @@ +package com.dotcms.rest.api.v1.job; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.InputStream; +import java.util.Map; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; +import org.glassfish.jersey.media.multipart.FormDataParam; + +public class JobParams { + @FormDataParam("file") + private InputStream fileInputStream; + + @FormDataParam("file") + private FormDataContentDisposition contentDisposition; + + @FormDataParam("params") + private String jsonParams; + + @FormDataParam("params") + private Map params; + + public InputStream getFileInputStream() { + return fileInputStream; + } + + public void setFileInputStream(InputStream fileInputStream) { + this.fileInputStream = fileInputStream; + } + + public FormDataContentDisposition getContentDisposition() { + return contentDisposition; + } + + public void setContentDisposition(FormDataContentDisposition contentDisposition) { + this.contentDisposition = contentDisposition; + } + + public void setJsonParams(String jsonParams) { + this.jsonParams = jsonParams; + } + + public Map getParams() throws JsonProcessingException { + if (null == params) { + params = new ObjectMapper().readValue(jsonParams, Map.class); + } + return params; + } + + public void setParams(Map params) { + this.params = params; + } + + public String getJsonParams() { + return jsonParams; + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java new file mode 100644 index 000000000000..baf193bee6cf --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -0,0 +1,194 @@ +package com.dotcms.rest.api.v1.job; + +import com.dotcms.jobs.business.api.JobProcessorScanner; +import com.dotcms.jobs.business.api.JobQueueManagerAPI; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobPaginatedResult; +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotcms.jobs.business.processor.Queue; +import com.dotcms.jobs.business.queue.error.JobQueueDataException; +import com.dotcms.rest.api.v1.temp.DotTempFile; +import com.dotcms.rest.api.v1.temp.TempFileAPI; +import com.dotmarketing.business.APILocator; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.util.Logger; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.InputStream; +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.servlet.http.HttpServletRequest; +import org.glassfish.jersey.media.multipart.FormDataContentDisposition; + +@ApplicationScoped +public class JobQueueHelper { + + JobQueueManagerAPI jobQueueManagerAPI; + + JobProcessorScanner scanner; + + public JobQueueHelper() { + //default constructor Mandatory for CDI + } + + @PostConstruct + public void onInit() { + + if(!jobQueueManagerAPI.isStarted()){ + jobQueueManagerAPI.start(); + Logger.info(this.getClass(), "JobQueueManagerAPI started"); + } + final List> processors = scanner.discoverJobProcessors(); + processors.forEach(processor -> { + try { + final Constructor declaredConstructor = processor.getDeclaredConstructor(); + final JobProcessor jobProcessor = declaredConstructor.newInstance(); + //registering the processor with the jobQueueManagerAPI + // lower case it to avoid case + if(processor.isAnnotationPresent(Queue.class)){ + final Queue queue = processor.getAnnotation(Queue.class); + jobQueueManagerAPI.registerProcessor(queue.value(), jobProcessor); + } else { + jobQueueManagerAPI.registerProcessor(processor.getName(), jobProcessor); + } + }catch (Exception e){ + Logger.error(this.getClass(), "Unable to register JobProcessor ", e); + } + }); + } + + @PreDestroy + public void onDestroy() { + if(jobQueueManagerAPI.isStarted()){ + try { + jobQueueManagerAPI.close(); + Logger.info(this.getClass(), "JobQueueManagerAPI successfully closed"); + } catch (Exception e) { + Logger.error(this.getClass(), e.getMessage(), e); + } + } + } + + @Inject + public JobQueueHelper(JobQueueManagerAPI jobQueueManagerAPI, JobProcessorScanner scanner) { + this.jobQueueManagerAPI = jobQueueManagerAPI; + this.scanner = scanner; + } + + /** + * creates a job + * @param queueName + * @param form + * @return jobId + * @throws JsonProcessingException + * @throws DotDataException + */ + String createJob(String queueName, JobParams form, HttpServletRequest request) + throws JsonProcessingException, DotDataException { + + final HashMap in = new HashMap<>(form.getParams()); + handleUploadIfPresent(form, in, request); + return jobQueueManagerAPI.createJob(queueName, Map.copyOf(in)); + } + + /** + * gets a job + * @param jobId + * @return + * @throws DotDataException + */ + Job getJob(String jobId) throws DotDataException{ + return jobQueueManagerAPI.getJob(jobId); + } + + /** + * cancels a job + * @param jobId + * @throws DotDataException + */ + void cancelJob(String jobId) throws DotDataException{ + jobQueueManagerAPI.cancelJob(jobId); + } + + /** + * watches a job + * @param jobId + * @param watcher + */ + void watchJob(String jobId, Consumer watcher){ + jobQueueManagerAPI.watchJob(jobId, watcher); + } + + /** + * Retrieves a list of jobs. + * @param page + * @param pageSize + * @return JobPaginatedResult + * @throws DotDataException + */ + JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException{ + return jobQueueManagerAPI.getJobs(page, pageSize); + } + + /** + * Retrieves a list of jobs. + * @param page + * @param pageSize + * @return JobPaginatedResult + * @throws DotDataException + */ + JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) + throws JobQueueDataException { + return jobQueueManagerAPI.getActiveJobs( queueName, page, pageSize); + } + + /** + * Retrieves a list of completed jobs for a specific queue within a date range. + * @param queueName The name of the queue + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of completed jobs and pagination information. + * @throws JobQueueDataException if there's an error fetching the jobs + */ + JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize) + throws JobQueueDataException { + return jobQueueManagerAPI.getFailedJobs( queueName, page, pageSize); + } + + /** + * Retrieves a list of active jobs for a specific queue. + * @return JobPaginatedResult + */ + Set getQueueNames(){ + return jobQueueManagerAPI.getQueueNames().keySet(); + } + + /** + * if a file is uploaded, move it to temp location and update params + * @param form + * @param params + * @param request + */ + private void handleUploadIfPresent(final JobParams form, Map params, HttpServletRequest request) { + final InputStream fileInputStream = form.getFileInputStream(); + final FormDataContentDisposition contentDisposition = form.getContentDisposition(); + if (null != fileInputStream && null != contentDisposition) { + final TempFileAPI tempFileAPI = APILocator.getTempFileAPI(); + try { + final DotTempFile tempFile = tempFileAPI.createTempFile(contentDisposition.getFileName(), request, fileInputStream); + final String path = tempFile.file.getPath(); + Logger.info(this.getClass(), "File uploaded to temp location: " + path); + params.put("filePath", path); + } catch (Exception e) { + Logger.error(this.getClass(), "Error saving temp file", e); + } + } + } +} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java new file mode 100644 index 000000000000..97874c0cb2de --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -0,0 +1,200 @@ +package com.dotcms.rest.api.v1.job; + +import com.dotcms.cdi.CDIUtils; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobPaginatedResult; +import com.dotcms.rest.InitDataObject; +import com.dotcms.rest.ResponseEntityView; +import com.dotcms.rest.WebResource; +import com.dotcms.rest.annotation.NoCache; +import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.util.Logger; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.BeanParam; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.media.sse.EventOutput; +import org.glassfish.jersey.media.sse.OutboundEvent; +import org.glassfish.jersey.media.sse.SseFeature; + +@Path("/v1/jobs") +public class JobQueueResource { + + private final WebResource webResource; + + private final JobQueueHelper helper; + + public JobQueueResource() { + this(new WebResource(), CDIUtils.getBean(JobQueueHelper.class).orElseThrow(()->new IllegalStateException("JobQueueHelper Bean not found"))); + } + + public JobQueueResource(WebResource webResource, JobQueueHelper helper) { + this.webResource = webResource; + this.helper = helper; + } + + @POST + @Path("/{queueName}") + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Produces(MediaType.APPLICATION_JSON) + public Response createJob( + @Context HttpServletRequest request, + @PathParam("queueName") String queueName, + @BeanParam JobParams form) throws JsonProcessingException, DotDataException { + webResource.init(null, true, request, true, null); + final String jobId = helper.createJob(queueName, form, request); + return Response.ok(new ResponseEntityView<>(jobId)).build(); + } + + @GET + @Path("/queues") + @Produces(MediaType.APPLICATION_JSON) + public Response getQueues(@Context HttpServletRequest request) { + try { + webResource.init(null, true, request, true, null); + return Response.ok(new ResponseEntityView<>(helper.getQueueNames())).build(); + } catch (Exception e) { + Logger.error(this, "Error getting job status", e); + return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); + } + } + + @GET + @Path("/{jobId}/status") + @Produces(MediaType.APPLICATION_JSON) + public Response getJobStatus(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { + try { + webResource.init(null, true, request, true, null); + + Job job = helper.getJob(jobId); + Map statusInfo = Map.of( + "state", job.state(), + "progress", job.progress(), + "executionNode", job.executionNode().orElse("N/A") + ); + + return Response.ok(new ResponseEntityView<>(statusInfo)).build(); + } catch (Exception e) { + Logger.error(this, "Error getting job status", e); + return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); + } + } + + @POST + @Path("/{jobId}/cancel") + @Produces(MediaType.APPLICATION_JSON) + public Response cancelJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { + try { + webResource.init(null, true, request, true, null); + helper.cancelJob(jobId); + return Response.ok(new ResponseEntityView<>("Job cancelled successfully")).build(); + } catch (Exception e) { + Logger.error(this, "Error cancelling job", e); + return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); + } + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + public Response listJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + try { + InitDataObject initData = webResource.init(null, true, request, true, null); + initData.getUser(); + + JobPaginatedResult result = helper.getJobs(page, pageSize); + return Response.ok(new ResponseEntityView<>(result)).build(); + } catch (Exception e) { + Logger.error(this, "Error listing jobs", e); + return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); + } + } + + @GET + @Path("/active/{queueName}") + @Produces(MediaType.APPLICATION_JSON) + public Response activeJobs(@Context HttpServletRequest request, @PathParam("queueName") String queueName, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + try { + InitDataObject initData = webResource.init(null, true, request, true, null); + initData.getUser(); + + JobPaginatedResult result = helper.getActiveJobs(queueName, page, pageSize); + return Response.ok(new ResponseEntityView<>(result)).build(); + } catch (Exception e) { + Logger.error(this, "Error listing active jobs", e); + return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); + } + } + + @GET + @Path("/failed/{queueName}") + @Produces(MediaType.APPLICATION_JSON) + public Response activeJobs(@Context HttpServletRequest request, @PathParam("queueName") String queueName, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + try { + InitDataObject initData = webResource.init(null, true, request, true, null); + initData.getUser(); + + JobPaginatedResult result = helper.getFailedJobs(queueName, page, pageSize); + return Response.ok(new ResponseEntityView<>(result)).build(); + } catch (Exception e) { + Logger.error(this, "Error listing failed jobs", e); + return Response.serverError().entity(new ResponseEntityView<>(e.getMessage())).build(); + } + } + + + @GET + @Path("/{jobId}/monitor") + @Produces(SseFeature.SERVER_SENT_EVENTS) + @NoCache + public EventOutput monitorJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) { + EventOutput eventOutput = new EventOutput(); + try { + webResource.init(null, true, request, true, null); + + helper.watchJob(jobId, job -> { + try { + OutboundEvent.Builder eventBuilder = new OutboundEvent.Builder(); + eventBuilder.name("job-update"); + eventBuilder.data(Job.class, job); + eventOutput.write(eventBuilder.build()); + } catch (IOException e) { + Logger.error(this, "Error writing SSE event", e); + } + }); + + // Keep the connection open for a reasonable time (e.g., 5 minutes) + if (!eventOutput.isClosed()) { + Thread.sleep(TimeUnit.MINUTES.toMillis(5)); + } + } catch (Exception e) { + Logger.error(this, "Error monitoring job", e); + Thread.currentThread().interrupt(); + } finally { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(this, "Error closing SSE connection", e); + } + } + return eventOutput; + } +} \ No newline at end of file