Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#29480 #30219

Closed
wants to merge 1 commit into from
Closed

#29480 #30219

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotmarketing.util.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.Index;
import org.jboss.jandex.IndexReader;

@ApplicationScoped
public class JobProcessorScanner {


public List<Class<? extends JobProcessor>> discoverJobProcessors() {
List<Class<? extends JobProcessor>> jobProcessors = new ArrayList<>();
try {

Index index = getJandexIndex();
DotName jobProcessorInterface = DotName.createSimple(JobProcessor.class.getName());

Collection<ClassInfo> implementors = index.getAllKnownImplementors(jobProcessorInterface);

for (ClassInfo classInfo : implementors) {
String className = classInfo.name().toString();

Class<?> clazz = Class.forName(className);
if (JobProcessor.class.isAssignableFrom(clazz)) {
jobProcessors.add((Class<? extends JobProcessor>) clazz);
}
}

} catch (IOException | ClassNotFoundException e) {
Logger.error(JobProcessorScanner.class, "Error discovering JobProcessors", e);

}
return jobProcessors;
}

private Index getJandexIndex() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream("META-INF/jandex.idx");
if (input == null) {
throw new IOException("Jandex index not found");
}
IndexReader reader = new IndexReader(input);
return reader.read();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.queue.JobQueue;
import com.dotcms.jobs.business.queue.error.JobQueueDataException;
import com.dotmarketing.exception.DotDataException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -57,6 +58,12 @@ public interface JobQueueManagerAPI {
*/
void registerProcessor(String queueName, JobProcessor processor);

/**
* Retrieves the job processors for all registered queues.
* @return A map of queue names to job processors
*/
Map<String,JobProcessor> getQueueNames();

/**
* Creates a new job in the specified queue.
*
Expand Down Expand Up @@ -88,6 +95,26 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException
*/
JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize) throws JobQueueDataException;

/**
* Cancels a job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,34 @@ public void close() throws Exception {

@Override
public void registerProcessor(final String queueName, final JobProcessor processor) {
processors.put(queueName, processor);
final String queueNameLower = queueName.toLowerCase();
final JobProcessor jobProcessor = processors.get(queueNameLower);
if (null != jobProcessor) {
Logger.warn(this, String.format(
"Job processor [%s] already registered for queue: [%s] is getting overridden.",
jobProcessor.getClass().getName(), queueName));
}
processors.put(queueNameLower, processor);
}

@Override
public Map<String,JobProcessor> getQueueNames() {
return Map.copyOf(processors);
}

@WrapInTransaction
@Override
public String createJob(final String queueName, final Map<String, Object> parameters)
throws JobProcessorNotFoundException, DotDataException {

if (!processors.containsKey(queueName)) {
final String queueNameLower = queueName.toLowerCase();
if (!processors.containsKey(queueNameLower)) {
final var error = new JobProcessorNotFoundException(queueName);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

try {
String jobId = jobQueue.createJob(queueName, parameters);
String jobId = jobQueue.createJob(queueNameLower, parameters);
eventProducer.getEvent(JobCreatedEvent.class).fire(
new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
);
Expand Down Expand Up @@ -272,6 +284,30 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
final String queueNameLower = queueName.toLowerCase();
try {
return jobQueue.getActiveJobs(queueNameLower, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
final String queueNameLower = queueName.toLowerCase();
try {
return jobQueue.getFailedJobs(queueNameLower, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@WrapInTransaction
@Override
public void cancelJob(final String jobId) throws DotDataException {
Expand All @@ -284,8 +320,8 @@ public void cancelJob(final String jobId) throws DotDataException {
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching job", e);
}

final var processor = processors.get(job.queueName());
final String queueNameLower = job.queueName().toLowerCase();
final var processor = processors.get(queueNameLower);
if (processor instanceof Cancellable) {

try {
Expand Down
14 changes: 14 additions & 0 deletions dotCMS/src/main/java/com/dotcms/jobs/business/processor/Queue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.dotcms.jobs.business.processor;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Queue {
String value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.dotcms.jobs.business.processor.impl;

import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotmarketing.util.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

@Queue("FileReader")
public class FileReaderJob implements JobProcessor, Cancellable {

boolean working = true;

@Override
public void process(Job job) {
// Retrieve job parameters
Logger.info(this.getClass(), "Processing job: " + job.id());
Map<String, Object> params = job.parameters();
String filePath = (String) params.get("filePath");
final Object nLinesRaw = params.get("nLines");
if(!(nLinesRaw instanceof String)) {
Logger.error(this.getClass(), "Parameter 'nLines' is required.");
return;
}
int nLines = Integer.parseInt((String) nLinesRaw);
// Validate required parameters
if (filePath == null || nLines <= 0) {
Logger.error(this.getClass(), "Parameters 'filePath' and 'nLines' (greater than zero) are required.");
return;
}

try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String line;
int lineCount = 0;
int totalLines = 0;

Logger.info(this.getClass(), "Starting to read the file: " + filePath);

while (working && (line = reader.readLine()) != null) {
lineCount++;
totalLines++;

// Print the line when the counter reaches nLines
if (lineCount == nLines) {
Logger.info(this.getClass(), "Line " + totalLines + ": " + line);
lineCount = 0; // Reset the counter
}
Thread.sleep(1000); // Simulate processing time
}

Logger.info(this.getClass(), "Reading completed. Total lines read: " + totalLines);

} catch (IOException e) {
Logger.error(this.getClass(), "Error reading the file: " + e.getMessage());
} catch (Exception e) {
Logger.error(this.getClass(), "Unexpected error during processing: " + e.getMessage());
}
}

@Override
public Map<String, Object> getResultMetadata(Job job) {
return Map.of();
}


@Override
public void cancel(Job job) throws JobCancellationException {
Logger.info(this.getClass(), "Job cancelled: " + job.id());
working = false;
}
}
58 changes: 58 additions & 0 deletions dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.dotcms.rest.api.v1.job;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.InputStream;
import java.util.Map;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

public class JobParams {
@FormDataParam("file")
private InputStream fileInputStream;

@FormDataParam("file")
private FormDataContentDisposition contentDisposition;

@FormDataParam("params")
private String jsonParams;

@FormDataParam("params")
private Map params;

public InputStream getFileInputStream() {
return fileInputStream;
}

public void setFileInputStream(InputStream fileInputStream) {
this.fileInputStream = fileInputStream;
}

public FormDataContentDisposition getContentDisposition() {
return contentDisposition;
}

public void setContentDisposition(FormDataContentDisposition contentDisposition) {
this.contentDisposition = contentDisposition;
}

public void setJsonParams(String jsonParams) {
this.jsonParams = jsonParams;
}

public Map getParams() throws JsonProcessingException {
if (null == params) {
params = new ObjectMapper().readValue(jsonParams, Map.class);
}
return params;
}

public void setParams(Map<Object, Object> params) {
this.params = params;
}

public String getJsonParams() {
return jsonParams;
}

}
Loading
Loading