Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
fabrizzio-dotCMS committed Oct 2, 2024
1 parent 98f280e commit 37c6345
Show file tree
Hide file tree
Showing 8 changed files with 665 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotmarketing.util.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.Index;
import org.jboss.jandex.IndexReader;

@ApplicationScoped
public class JobProcessorScanner {


public List<Class<? extends JobProcessor>> discoverJobProcessors() {
List<Class<? extends JobProcessor>> jobProcessors = new ArrayList<>();
try {

Index index = getJandexIndex();
DotName jobProcessorInterface = DotName.createSimple(JobProcessor.class.getName());

Collection<ClassInfo> implementors = index.getAllKnownImplementors(jobProcessorInterface);

for (ClassInfo classInfo : implementors) {
String className = classInfo.name().toString();

Class<?> clazz = Class.forName(className);
if (JobProcessor.class.isAssignableFrom(clazz)) {
jobProcessors.add((Class<? extends JobProcessor>) clazz);
}
}

} catch (IOException | ClassNotFoundException e) {
Logger.error(JobProcessorScanner.class, "Error discovering JobProcessors", e);

}
return jobProcessors;
}

private Index getJandexIndex() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream("META-INF/jandex.idx");
if (input == null) {
throw new IOException("Jandex index not found");
}
IndexReader reader = new IndexReader(input);
return reader.read();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.queue.JobQueue;
import com.dotcms.jobs.business.queue.error.JobQueueDataException;
import com.dotmarketing.exception.DotDataException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -57,6 +58,12 @@ public interface JobQueueManagerAPI {
*/
void registerProcessor(String queueName, JobProcessor processor);

/**
* Retrieves the job processors for all registered queues.
* @return A map of queue names to job processors
*/
Map<String,JobProcessor> getQueueNames();

/**
* Creates a new job in the specified queue.
*
Expand Down Expand Up @@ -88,6 +95,26 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException
*/
JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize) throws JobQueueDataException;

/**
* Cancels a job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,34 @@ public void close() throws Exception {

@Override
public void registerProcessor(final String queueName, final JobProcessor processor) {
processors.put(queueName, processor);
final String queueNameLower = queueName.toLowerCase();
final JobProcessor jobProcessor = processors.get(queueNameLower);
if (null != jobProcessor) {
Logger.warn(this, String.format(
"Job processor [%s] already registered for queue: [%s] is getting overridden.",
jobProcessor.getClass().getName(), queueName));
}
processors.put(queueNameLower, processor);
}

@Override
public Map<String,JobProcessor> getQueueNames() {
return Map.copyOf(processors);
}

@WrapInTransaction
@Override
public String createJob(final String queueName, final Map<String, Object> parameters)
throws JobProcessorNotFoundException, DotDataException {

if (!processors.containsKey(queueName)) {
final String queueNameLower = queueName.toLowerCase();
if (!processors.containsKey(queueNameLower)) {
final var error = new JobProcessorNotFoundException(queueName);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

try {
String jobId = jobQueue.createJob(queueName, parameters);
String jobId = jobQueue.createJob(queueNameLower, parameters);
eventProducer.getEvent(JobCreatedEvent.class).fire(
new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
);
Expand Down Expand Up @@ -272,6 +284,30 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
final String queueNameLower = queueName.toLowerCase();
try {
return jobQueue.getActiveJobs(queueNameLower, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getFailedJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
final String queueNameLower = queueName.toLowerCase();
try {
return jobQueue.getFailedJobs(queueNameLower, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@WrapInTransaction
@Override
public void cancelJob(final String jobId) throws DotDataException {
Expand All @@ -284,8 +320,8 @@ public void cancelJob(final String jobId) throws DotDataException {
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching job", e);
}

final var processor = processors.get(job.queueName());
final String queueNameLower = job.queueName().toLowerCase();
final var processor = processors.get(queueNameLower);
if (processor instanceof Cancellable) {

try {
Expand Down
14 changes: 14 additions & 0 deletions dotCMS/src/main/java/com/dotcms/jobs/business/processor/Queue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.dotcms.jobs.business.processor;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Queue {
String value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.dotcms.jobs.business.processor.impl;

import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotmarketing.util.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

@Queue("FileReader")
public class FileReaderJob implements JobProcessor, Cancellable {

boolean working = true;

@Override
public void process(Job job) {
// Retrieve job parameters
Logger.info(this.getClass(), "Processing job: " + job.id());
Map<String, Object> params = job.parameters();
String filePath = (String) params.get("filePath");
final Object nLinesRaw = params.get("nLines");
if(!(nLinesRaw instanceof String)) {
Logger.error(this.getClass(), "Parameter 'nLines' is required.");
return;
}
int nLines = Integer.parseInt((String) nLinesRaw);
// Validate required parameters
if (filePath == null || nLines <= 0) {
Logger.error(this.getClass(), "Parameters 'filePath' and 'nLines' (greater than zero) are required.");
return;
}

try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
String line;
int lineCount = 0;
int totalLines = 0;

Logger.info(this.getClass(), "Starting to read the file: " + filePath);

while (working && (line = reader.readLine()) != null) {
lineCount++;
totalLines++;

// Print the line when the counter reaches nLines
if (lineCount == nLines) {
Logger.info(this.getClass(), "Line " + totalLines + ": " + line);
lineCount = 0; // Reset the counter
}
Thread.sleep(1000); // Simulate processing time
}

Logger.info(this.getClass(), "Reading completed. Total lines read: " + totalLines);

} catch (IOException e) {
Logger.error(this.getClass(), "Error reading the file: " + e.getMessage());
} catch (Exception e) {
Logger.error(this.getClass(), "Unexpected error during processing: " + e.getMessage());
}
}

@Override
public Map<String, Object> getResultMetadata(Job job) {
return Map.of();
}


@Override
public void cancel(Job job) throws JobCancellationException {
Logger.info(this.getClass(), "Job cancelled: " + job.id());
working = false;
}
}
58 changes: 58 additions & 0 deletions dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.dotcms.rest.api.v1.job;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.InputStream;
import java.util.Map;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

public class JobParams {
@FormDataParam("file")
private InputStream fileInputStream;

@FormDataParam("file")
private FormDataContentDisposition contentDisposition;

@FormDataParam("params")
private String jsonParams;

@FormDataParam("params")
private Map params;

public InputStream getFileInputStream() {
return fileInputStream;
}

public void setFileInputStream(InputStream fileInputStream) {
this.fileInputStream = fileInputStream;
}

public FormDataContentDisposition getContentDisposition() {
return contentDisposition;
}

public void setContentDisposition(FormDataContentDisposition contentDisposition) {
this.contentDisposition = contentDisposition;
}

public void setJsonParams(String jsonParams) {
this.jsonParams = jsonParams;
}

public Map getParams() throws JsonProcessingException {
if (null == params) {
params = new ObjectMapper().readValue(jsonParams, Map.class);
}
return params;
}

public void setParams(Map<Object, Object> params) {
this.params = params;
}

public String getJsonParams() {
return jsonParams;
}

}
Loading

0 comments on commit 37c6345

Please sign in to comment.