Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
fabrizzio-dotCMS committed Oct 2, 2024
1 parent 98f280e commit 892e5cb
Show file tree
Hide file tree
Showing 10 changed files with 702 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotmarketing.util.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.enterprise.context.ApplicationScoped;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.Index;
import org.jboss.jandex.IndexReader;

@ApplicationScoped
public class JobProcessorScanner {


public List<Class<? extends JobProcessor>> discoverJobProcessors() {
List<Class<? extends JobProcessor>> jobProcessors = new ArrayList<>();
try {

Index index = getJandexIndex();
DotName jobProcessorInterface = DotName.createSimple(JobProcessor.class.getName());

Collection<ClassInfo> implementors = index.getAllKnownImplementors(jobProcessorInterface);

for (ClassInfo classInfo : implementors) {
String className = classInfo.name().toString();

Class<?> clazz = Class.forName(className);
if (JobProcessor.class.isAssignableFrom(clazz)) {
jobProcessors.add((Class<? extends JobProcessor>) clazz);
}
}

} catch (IOException | ClassNotFoundException e) {
Logger.error(JobProcessorScanner.class, "Error discovering JobProcessors", e);

}
return jobProcessors;
}

private Index getJandexIndex() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream("META-INF/jandex.idx");
if (input == null) {
throw new IOException("Jandex index not found");
}
IndexReader reader = new IndexReader(input);
return reader.read();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.queue.JobQueue;
import com.dotcms.jobs.business.queue.error.JobQueueDataException;
import com.dotmarketing.exception.DotDataException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -57,6 +58,12 @@ public interface JobQueueManagerAPI {
*/
void registerProcessor(String queueName, JobProcessor processor);

/**
* Retrieves the job processors for all registered queues.
* @return A map of queue names to job processors
*/
Map<String,JobProcessor> getQueueNames();

/**
* Creates a new job in the specified queue.
*
Expand Down Expand Up @@ -88,6 +95,25 @@ String createJob(String queueName, Map<String, Object> parameters)
*/
JobPaginatedResult getJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of active jobs for a specific queue.
* @param queueName The name of the queue
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of active jobs and pagination information.
* @throws JobQueueDataException if there's an error fetching the jobs
*/
JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) throws JobQueueDataException;

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of completed jobs and pagination information.
* @throws JobQueueDataException
*/
JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException;

/**
* Cancels a job.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,34 @@ public void close() throws Exception {

@Override
public void registerProcessor(final String queueName, final JobProcessor processor) {
processors.put(queueName, processor);
final String queueNameLower = queueName.toLowerCase();
final JobProcessor jobProcessor = processors.get(queueNameLower);
if (null != jobProcessor) {
Logger.warn(this, String.format(
"Job processor [%s] already registered for queue: [%s] is getting overridden.",
jobProcessor.getClass().getName(), queueName));
}
processors.put(queueNameLower, processor);
}

@Override
public Map<String,JobProcessor> getQueueNames() {
return Map.copyOf(processors);
}

@WrapInTransaction
@Override
public String createJob(final String queueName, final Map<String, Object> parameters)
throws JobProcessorNotFoundException, DotDataException {

if (!processors.containsKey(queueName)) {
final String queueNameLower = queueName.toLowerCase();
if (!processors.containsKey(queueNameLower)) {
final var error = new JobProcessorNotFoundException(queueName);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

try {
String jobId = jobQueue.createJob(queueName, parameters);
String jobId = jobQueue.createJob(queueNameLower, parameters);
eventProducer.getEvent(JobCreatedEvent.class).fire(
new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)
);
Expand Down Expand Up @@ -272,6 +284,29 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
throws JobQueueDataException {
final String queueNameLower = queueName.toLowerCase();
try {
return jobQueue.getActiveJobs(queueNameLower, page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getFailedJobs(int page, int pageSize)
throws JobQueueDataException {
try {
return jobQueue.getFailedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new JobQueueDataException("Error fetching active jobs", e);
}
}

@WrapInTransaction
@Override
public void cancelJob(final String jobId) throws DotDataException {
Expand All @@ -284,8 +319,8 @@ public void cancelJob(final String jobId) throws DotDataException {
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching job", e);
}

final var processor = processors.get(job.queueName());
final String queueNameLower = job.queueName().toLowerCase();
final var processor = processors.get(queueNameLower);
if (processor instanceof Cancellable) {

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.dotcms.jobs.business.error;



import com.dotcms.jobs.business.job.AbstractJob;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -44,6 +48,7 @@ public interface AbstractErrorDetail {
*
* @return A LocalDateTime representing when the error was recorded.
*/
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = AbstractJob.DATE_PATTERN)
LocalDateTime timestamp();

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.dotcms.jobs.business.job;

import com.dotcms.jobs.business.processor.ProgressTracker;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.time.LocalDateTime;
Expand All @@ -20,6 +21,8 @@
@JsonDeserialize(as = Job.class)
public interface AbstractJob {

String DATE_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS";

String id();

String queueName();
Expand All @@ -28,12 +31,16 @@ public interface AbstractJob {

Optional<String> executionNode();

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = DATE_PATTERN)
Optional<LocalDateTime> createdAt();

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = DATE_PATTERN)
Optional<LocalDateTime> startedAt();

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = DATE_PATTERN)
Optional<LocalDateTime> updatedAt();

@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = DATE_PATTERN)
Optional<LocalDateTime> completedAt();

Optional<JobResult> result();
Expand Down
14 changes: 14 additions & 0 deletions dotCMS/src/main/java/com/dotcms/jobs/business/processor/Queue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.dotcms.jobs.business.processor;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Queue {
String value();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.dotcms.jobs.business.processor.impl;

import com.dotcms.api.web.HttpServletRequestThreadLocal;
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.mock.request.MockHeaderRequest;
import com.dotcms.mock.request.MockSessionRequest;
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotcms.rest.api.v1.temp.TempFileAPI;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.util.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;

@Queue("FileReader")
public class FileReaderJob implements JobProcessor, Cancellable {

boolean working = true;

@Override
public void process(Job job) {
// Retrieve job parameters
working = true;
Logger.info(this.getClass(), "Processing job: " + job.id());
Map<String, Object> params = job.parameters();
String tempFileId = (String) params.get("tempFileId");
final Object nLinesRaw = params.get("nLines");
if(!(nLinesRaw instanceof String)) {
Logger.error(this.getClass(), "Parameter 'nLines' is required.");
return;
}

final Object requestFingerPrintRaw = params.get("requestFingerPrint");
if(!(requestFingerPrintRaw instanceof String)) {
Logger.error(this.getClass(), "Parameter 'requestFingerPrint' is required.");
return;
}
final String requestFingerPrint = (String) requestFingerPrintRaw;

int nLines = Integer.parseInt((String) nLinesRaw);
// Validate required parameters
if (tempFileId == null || nLines <= 0) {
Logger.error(this.getClass(), "Parameters 'tempFileId' and 'nLines' (greater than zero) are required.");
return;
}


final TempFileAPI tempFileAPI = APILocator.getTempFileAPI();
final Optional<DotTempFile> tempFile = tempFileAPI.getTempFile(List.of(requestFingerPrint), tempFileId);
if (tempFile.isEmpty()) {
Logger.error(this.getClass(), "Temporary file not found: " + tempFileId);
return;
}
final DotTempFile dotTempFile = tempFile.get();
try (BufferedReader reader = new BufferedReader(new FileReader(dotTempFile.file))) {
String line;
int lineCount = 0;
int totalLines = 0;

Logger.info(this.getClass(), "Starting to read the file: " + dotTempFile.file.getName());

while (working && (line = reader.readLine()) != null) {
lineCount++;
totalLines++;

// Print the line when the counter reaches nLines
if (lineCount == nLines) {
Logger.info(this.getClass(), "Line " + totalLines + ": " + line);
lineCount = 0; // Reset the counter
}
Thread.sleep(1000); // Simulate processing time
}

Logger.info(this.getClass(), "Reading completed. Total lines read: " + totalLines);

} catch (IOException e) {
Logger.error(this.getClass(), "Error reading the file: " + e.getMessage());
} catch (Exception e) {
Logger.error(this.getClass(), "Unexpected error during processing: " + e.getMessage());
}
}

@Override
public Map<String, Object> getResultMetadata(Job job) {
return Map.of();
}

@Override
public void cancel(Job job) throws JobCancellationException {
Logger.info(this.getClass(), "Job cancelled: " + job.id());

working = false;
}



}
Loading

0 comments on commit 892e5cb

Please sign in to comment.