Skip to content

Commit

Permalink
MET-5960 debias processing
Browse files Browse the repository at this point in the history
  • Loading branch information
jeortizquan committed Sep 27, 2024
1 parent 3d5ddc1 commit 105690e
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 24 deletions.
3 changes: 2 additions & 1 deletion src/main/java/eu/europeana/metis/sandbox/common/Step.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ public enum Step {
ENRICH("enrich", 8),
MEDIA_PROCESS("process media", 9),
PUBLISH("publish", 10),
CLOSE("close", 11);
CLOSE("close", 11),
DEBIAS("debias", 12);

private final String value;
private final int precedence;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

import eu.europeana.metis.debias.detect.client.DeBiasClient;
import eu.europeana.metis.sandbox.repository.DatasetRepository;
import eu.europeana.metis.sandbox.repository.RecordLogRepository;
import eu.europeana.metis.sandbox.repository.debias.DetectRepository;
import eu.europeana.metis.sandbox.service.debias.DeBiasDetectService;
import eu.europeana.metis.sandbox.service.debias.DetectService;
import eu.europeana.metis.sandbox.service.debias.RecordPublishable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -32,8 +34,11 @@ public class DeBiasConfig {
* @return the detect service
*/
@Bean
public DetectService debiasMachine(DetectRepository detectRepository, DatasetRepository datasetRepository) {
return new DeBiasDetectService(detectRepository, datasetRepository);
public DetectService debiasMachine(DetectRepository detectRepository,
DatasetRepository datasetRepository,
RecordLogRepository recordLogRepository,
RecordPublishable recordPublishable) {
return new DeBiasDetectService(detectRepository, datasetRepository, recordLogRepository, recordPublishable);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class SandboxConfig {
@Value("${sandbox.rabbitmq.queues.record.created.queue}")
private String createdQueue;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.queue}")
private String deBiasReadyQueue;

@Value("${sandbox.rabbitmq.queues.record.transformation.edm.external.queue}")
private String transformationToEdmExternalQueue;

Expand Down Expand Up @@ -134,6 +137,11 @@ String createdQueue() {
return createdQueue;
}

@Bean(name = "deBiasReadyQueue")
String deBiasReadyQueue() {
return deBiasReadyQueue;
}

@Bean(name = "transformationToEdmExternalQueue")
String transformationToEdmExternalQueue() {
return transformationToEdmExternalQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public class AmqpConfiguration {
@Value("${sandbox.rabbitmq.queues.record.published.dlq:#{null}}")
private String publishedDlq;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.queue:#{null}}")
private String deBiasReadyQueue;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.dlq:#{null}}")
private String deBiasReadyDlq;

public AmqpConfiguration(MessageConverter messageConverter, AmqpAdmin amqpAdmin) {
this.messageConverter = messageConverter;
Expand Down Expand Up @@ -120,22 +125,23 @@ Declarables deadQueues() {
QueueBuilder.durable(internalValidatedDlq).build(),
QueueBuilder.durable(enrichedDlq).build(),
QueueBuilder.durable(mediaProcessedDlq).build(),
QueueBuilder.durable(publishedDlq).build()
QueueBuilder.durable(publishedDlq).build(),
QueueBuilder.durable(deBiasReadyDlq).build()
);
}

@Bean
Declarables bindings() {
return getDeclarables(exchange, createdQueue, transformationToEdmExternalQueue,
externalValidatedQueue, transformedQueue, normalizedQueue, internalValidatedQueue,
enrichedQueue, mediaProcessedQueue, publishedQueue);
enrichedQueue, mediaProcessedQueue, publishedQueue, deBiasReadyQueue);
}

@Bean
Declarables dlqBindings() {
return getDeclarables(exchangeDlq, createdDlq, transformationToEdmExternalDlq,
externalValidatedDlq, transformedDlq, normalizedDlq, internalValidatedDlq, enrichedDlq,
mediaProcessedDlq, publishedDlq);
mediaProcessedDlq, publishedDlq, deBiasReadyDlq);
}

//Suppress: Methods should not have too many parameters warning
Expand All @@ -145,7 +151,7 @@ Declarables dlqBindings() {
private Declarables getDeclarables(String exchange, String created,
String transformationToEdmExternal, String externalValidated, String transformed,
String normalized, String internalValidated, String enriched, String mediaProcessed,
String published) {
String published, String deBiasReady) {
return new Declarables(
new Binding(created, DestinationType.QUEUE, exchange, created, null),
new Binding(transformationToEdmExternal, DestinationType.QUEUE, exchange,
Expand All @@ -156,7 +162,8 @@ private Declarables getDeclarables(String exchange, String created,
new Binding(internalValidated, DestinationType.QUEUE, exchange, internalValidated, null),
new Binding(enriched, DestinationType.QUEUE, exchange, enriched, null),
new Binding(mediaProcessed, DestinationType.QUEUE, exchange, mediaProcessed, null),
new Binding(published, DestinationType.QUEUE, exchange, published, null)
new Binding(published, DestinationType.QUEUE, exchange, published, null),
new Binding(deBiasReady, DestinationType.QUEUE, exchange, deBiasReady, null)
);
}

Expand All @@ -181,7 +188,9 @@ Declarables queues() {
QueueBuilder.durable(mediaProcessedQueue).deadLetterExchange(exchangeDlq)
.deadLetterRoutingKey(mediaProcessedDlq).build(),
QueueBuilder.durable(publishedQueue).deadLetterExchange(exchangeDlq)
.deadLetterRoutingKey(publishedDlq).build()
.deadLetterRoutingKey(publishedDlq).build(),
QueueBuilder.durable(deBiasReadyQueue).deadLetterExchange(deBiasReadyDlq)
.deadLetterRoutingKey(deBiasReadyDlq).build()
);
}

Expand Down Expand Up @@ -271,6 +280,14 @@ public String getPublishedDlq() {
return publishedDlq;
}

public String getDeBiasReadyQueue() {
return deBiasReadyQueue;
}

public String getDeBiasReadyDlq() {
return deBiasReadyDlq;
}

public AmqpAdmin getAmqpAdmin(){
return amqpAdmin;
}
Expand All @@ -279,6 +296,6 @@ public List<String> getAllQueuesNames(){
return List.of(createdQueue, createdDlq, transformationToEdmExternalQueue, transformationToEdmExternalDlq,
externalValidatedQueue, externalValidatedDlq, transformedQueue, transformedDlq, internalValidatedQueue,
internalValidatedDlq, normalizedQueue, normalizedDlq, enrichedQueue, enrichedDlq, mediaProcessedQueue,
mediaProcessedDlq, publishedQueue, publishedDlq);
mediaProcessedDlq, publishedQueue, publishedDlq, deBiasReadyQueue, deBiasReadyDlq);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package eu.europeana.metis.sandbox.config.amqp;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* The type De bias ready queue config.
*/
@Configuration
public class DeBiasReadyQueueConfig {

private final MessageConverter messageConverter;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.concurrency}")
private int concurrentConsumers;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.max-concurrency}")
private int maxConsumers;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.prefetch}")
private int messagePrefetchCount;

@Value("${sandbox.rabbitmq.queues.record.debias.ready.batch-size}")
private int batchSize;

/**
* Instantiates a new De bias ready queue config.
*
* @param messageConverter the message converter
*/
public DeBiasReadyQueueConfig(MessageConverter messageConverter) {
this.messageConverter = messageConverter;
}

/**
* Closing factory simple rabbit listener container factory.
*
* @param configurer the configurer
* @param connectionFactory the connection factory
* @return the simple rabbit listener container factory
*/
@Bean
SimpleRabbitListenerContainerFactory deBiasFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
var factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConsumers);
factory.setPrefetchCount(messagePrefetchCount);
factory.setMessageConverter(messageConverter);
factory.setConsumerBatchEnabled(true);
factory.setBatchListener(true);
factory.setBatchSize(batchSize);
return factory;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eu.europeana.metis.sandbox.executor.workflow;

import eu.europeana.metis.sandbox.common.Step;
import eu.europeana.metis.sandbox.domain.RecordProcessEvent;
import eu.europeana.metis.sandbox.service.workflow.DeBiasProcessService;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* Consumes debias events and performs processing to the contained record <br/> the result will be stored on database
*/
@Component
class DeBiasExecutor extends StepExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(DeBiasExecutor.class);
private final DeBiasProcessService service;
@Value("${sandbox.rabbitmq.queues.record.debias.ready.queue}")
private String routingKey;

public DeBiasExecutor(AmqpTemplate amqpTemplate,
DeBiasProcessService service) {
super(amqpTemplate);
this.service = service;
}

@RabbitListener(queues = "${sandbox.rabbitmq.queues.record.debias.ready.queue}",
containerFactory = "deBiasFactory",
autoStartup = "${sandbox.rabbitmq.queues.record.transformed.auto-start:true}")
public void debiasProcess(List<RecordProcessEvent> input) {
input.forEach(r -> LOGGER.info("pulling record {} from queue", r.getRecord().getRecordId()));
consumeBatch(routingKey, input, Step.DEBIAS,
() -> service.process(input.stream().map(RecordProcessEvent::getRecord).toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import eu.europeana.metis.sandbox.domain.RecordError;
import eu.europeana.metis.sandbox.domain.RecordInfo;
import eu.europeana.metis.sandbox.domain.RecordProcessEvent;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -20,10 +21,23 @@ class StepExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(StepExecutor.class);
private final AmqpTemplate amqpTemplate;

/**
* Instantiates a new Step executor.
*
* @param amqpTemplate the amqp template
*/
StepExecutor(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}

/**
* Consume.
*
* @param routingKey the routing key
* @param input the input
* @param step the step
* @param recordInfoSupplier the record info supplier
*/
public void consume(String routingKey, RecordProcessEvent input, Step step,
Supplier<RecordInfo> recordInfoSupplier) {
if (input.getStatus() == Status.FAIL) {
Expand All @@ -48,10 +62,48 @@ public void consume(String routingKey, RecordProcessEvent input, Step step,
}
}

/**
* Consume batch.
*
* @param routingKey the routing key
* @param input the input
* @param step the step
* @param recordInfoSupplier the record info supplier
*/
public void consumeBatch(String routingKey, List<RecordProcessEvent> input, Step step,
Supplier<List<RecordInfo>> recordInfoSupplier) {
List<RecordProcessEvent> outputRecordProcessEvents = new ArrayList<>();
try {
var listRecordInfo = recordInfoSupplier.get();
for (RecordInfo recordInfo : listRecordInfo) {
var status = recordInfo.getErrors().isEmpty() ? Status.SUCCESS : Status.WARN;
outputRecordProcessEvents.add(new RecordProcessEvent(recordInfo, step, status));
}
} catch (RecordProcessingException ex) {
for (RecordProcessEvent recordProcessEvent : input) {
outputRecordProcessEvents.add(createFailEvent(recordProcessEvent, step, ex));
}
} catch (Exception ex) {
for (RecordProcessEvent recordProcessEvent : input) {
outputRecordProcessEvents.add(
createFailEvent(recordProcessEvent, step,
new RecordProcessingException(Long.toString(recordProcessEvent.getRecord().getRecordId()), ex)
)
);
}
}
try {
outputRecordProcessEvents.forEach(output -> amqpTemplate.convertAndSend(routingKey, output));
} catch (RuntimeException rabbitException) {
LOGGER.error("Queue step execution error", rabbitException);
}
}

private RecordProcessEvent createFailEvent(RecordProcessEvent input, Step step, RecordProcessingException ex) {
final String stepName = step.value();
final RecordError recordError = new RecordError(ex);
final RecordProcessEvent output = new RecordProcessEvent(new RecordInfo(input.getRecord(), List.of(recordError)), step, Status.FAIL);
final RecordProcessEvent output = new RecordProcessEvent(new RecordInfo(input.getRecord(), List.of(recordError)), step,
Status.FAIL);
LOGGER.error("Exception while performing step: [{}]. ", stepName, ex);
return output;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public interface RecordLogRepository extends JpaRepository<RecordLogEntity, Long
"AND rle.recordId.datasetId = ?2 AND rle.step IN ?3")
Set<RecordLogEntity> findRecordLogByRecordIdDatasetIdAndStepIn(String recordId, String datasetId, Set<Step> steps);

/**
* Find record log by dataset id and step set.
*
* @param datasetId the dataset id
* @param step the step
* @return the set
*/
@Query("SELECT rle FROM RecordLogEntity rle WHERE rle.recordId.datasetId = ?1 AND rle.step = ?2")
Set<RecordLogEntity> findRecordLogByDatasetIdAndStep(String datasetId, Step step);

/**
* Delete records that belong to the given dataset id
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import eu.europeana.metis.sandbox.dto.debias.DetectionInfoDto;
import eu.europeana.metis.sandbox.entity.debias.DetectionEntity;
import eu.europeana.metis.sandbox.repository.DatasetRepository;
import eu.europeana.metis.sandbox.repository.RecordLogRepository;
import eu.europeana.metis.sandbox.repository.debias.DetectRepository;
import java.time.ZonedDateTime;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -24,9 +25,15 @@ public class DeBiasDetectService implements DetectService {
* Instantiates a new DeBias detect service.
*
* @param detectRepository the detect repository
* @param datasetRepository the dataset repository
* @param recordLogRepository the record log repository
* @param recordPublishable the record publishable
*/
public DeBiasDetectService(DetectRepository detectRepository, DatasetRepository datasetRepository) {
this.ready = new ReadyState(this, detectRepository, datasetRepository);
public DeBiasDetectService(DetectRepository detectRepository,
DatasetRepository datasetRepository,
RecordLogRepository recordLogRepository,
RecordPublishable recordPublishable) {
this.ready = new ReadyState(this, detectRepository, datasetRepository, recordLogRepository, recordPublishable);
this.processing = new ProcessingState(this, detectRepository);
this.completed = new CompletedState(this, detectRepository);
this.error = new ErrorState(this, detectRepository);
Expand Down
Loading

0 comments on commit 105690e

Please sign in to comment.