From 105690e2d8d57a06684b8449b7413c03ee0a6d7a Mon Sep 17 00:00:00 2001 From: Jorge Ortiz Date: Fri, 27 Sep 2024 16:03:32 +0200 Subject: [PATCH] MET-5960 debias processing --- .../europeana/metis/sandbox/common/Step.java | 3 +- .../metis/sandbox/config/DeBiasConfig.java | 9 ++- .../metis/sandbox/config/SandboxConfig.java | 8 +++ .../config/amqp/AmqpConfiguration.java | 31 ++++++--- .../config/amqp/DeBiasReadyQueueConfig.java | 63 +++++++++++++++++++ .../executor/workflow/DeBiasExecutor.java | 39 ++++++++++++ .../executor/workflow/StepExecutor.java | 54 +++++++++++++++- .../repository/RecordLogRepository.java | 10 +++ .../service/debias/DeBiasDetectService.java | 11 +++- .../sandbox/service/debias/ReadyState.java | 61 +++++++++++++++--- .../RecordPublishDeBiasQueueService.java | 52 +++++++++++++++ .../service/debias/RecordPublishable.java | 16 +++++ .../workflow/DeBiasProcessService.java | 19 ++++++ .../workflow/DeBiasProcessServiceImpl.java | 33 ++++++++++ src/main/resources/sample.application.yml | 11 +++- 15 files changed, 396 insertions(+), 24 deletions(-) create mode 100644 src/main/java/eu/europeana/metis/sandbox/config/amqp/DeBiasReadyQueueConfig.java create mode 100644 src/main/java/eu/europeana/metis/sandbox/executor/workflow/DeBiasExecutor.java create mode 100644 src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishDeBiasQueueService.java create mode 100644 src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishable.java create mode 100644 src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessService.java create mode 100644 src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessServiceImpl.java diff --git a/src/main/java/eu/europeana/metis/sandbox/common/Step.java b/src/main/java/eu/europeana/metis/sandbox/common/Step.java index 0feb59df..83c7735e 100644 --- a/src/main/java/eu/europeana/metis/sandbox/common/Step.java +++ b/src/main/java/eu/europeana/metis/sandbox/common/Step.java @@ -14,7 +14,8 @@ public enum Step { ENRICH("enrich", 8), MEDIA_PROCESS("process media", 9), PUBLISH("publish", 10), - CLOSE("close", 11); + CLOSE("close", 11), + DEBIAS("debias", 12); private final String value; private final int precedence; diff --git a/src/main/java/eu/europeana/metis/sandbox/config/DeBiasConfig.java b/src/main/java/eu/europeana/metis/sandbox/config/DeBiasConfig.java index c150547f..32925750 100644 --- a/src/main/java/eu/europeana/metis/sandbox/config/DeBiasConfig.java +++ b/src/main/java/eu/europeana/metis/sandbox/config/DeBiasConfig.java @@ -3,9 +3,11 @@ import eu.europeana.metis.debias.detect.client.DeBiasClient; import eu.europeana.metis.sandbox.repository.DatasetRepository; +import eu.europeana.metis.sandbox.repository.RecordLogRepository; import eu.europeana.metis.sandbox.repository.debias.DetectRepository; import eu.europeana.metis.sandbox.service.debias.DeBiasDetectService; import eu.europeana.metis.sandbox.service.debias.DetectService; +import eu.europeana.metis.sandbox.service.debias.RecordPublishable; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -32,8 +34,11 @@ public class DeBiasConfig { * @return the detect service */ @Bean - public DetectService debiasMachine(DetectRepository detectRepository, DatasetRepository datasetRepository) { - return new DeBiasDetectService(detectRepository, datasetRepository); + public DetectService debiasMachine(DetectRepository detectRepository, + DatasetRepository datasetRepository, + RecordLogRepository recordLogRepository, + RecordPublishable recordPublishable) { + return new DeBiasDetectService(detectRepository, datasetRepository, recordLogRepository, recordPublishable); } @Bean diff --git a/src/main/java/eu/europeana/metis/sandbox/config/SandboxConfig.java b/src/main/java/eu/europeana/metis/sandbox/config/SandboxConfig.java index 1f2ea3b0..cdeba3ff 100644 --- a/src/main/java/eu/europeana/metis/sandbox/config/SandboxConfig.java +++ b/src/main/java/eu/europeana/metis/sandbox/config/SandboxConfig.java @@ -66,6 +66,9 @@ class SandboxConfig { @Value("${sandbox.rabbitmq.queues.record.created.queue}") private String createdQueue; + @Value("${sandbox.rabbitmq.queues.record.debias.ready.queue}") + private String deBiasReadyQueue; + @Value("${sandbox.rabbitmq.queues.record.transformation.edm.external.queue}") private String transformationToEdmExternalQueue; @@ -134,6 +137,11 @@ String createdQueue() { return createdQueue; } + @Bean(name = "deBiasReadyQueue") + String deBiasReadyQueue() { + return deBiasReadyQueue; + } + @Bean(name = "transformationToEdmExternalQueue") String transformationToEdmExternalQueue() { return transformationToEdmExternalQueue; diff --git a/src/main/java/eu/europeana/metis/sandbox/config/amqp/AmqpConfiguration.java b/src/main/java/eu/europeana/metis/sandbox/config/amqp/AmqpConfiguration.java index 9be7300d..feaf32cc 100644 --- a/src/main/java/eu/europeana/metis/sandbox/config/amqp/AmqpConfiguration.java +++ b/src/main/java/eu/europeana/metis/sandbox/config/amqp/AmqpConfiguration.java @@ -85,6 +85,11 @@ public class AmqpConfiguration { @Value("${sandbox.rabbitmq.queues.record.published.dlq:#{null}}") private String publishedDlq; + @Value("${sandbox.rabbitmq.queues.record.debias.ready.queue:#{null}}") + private String deBiasReadyQueue; + + @Value("${sandbox.rabbitmq.queues.record.debias.ready.dlq:#{null}}") + private String deBiasReadyDlq; public AmqpConfiguration(MessageConverter messageConverter, AmqpAdmin amqpAdmin) { this.messageConverter = messageConverter; @@ -120,7 +125,8 @@ Declarables deadQueues() { QueueBuilder.durable(internalValidatedDlq).build(), QueueBuilder.durable(enrichedDlq).build(), QueueBuilder.durable(mediaProcessedDlq).build(), - QueueBuilder.durable(publishedDlq).build() + QueueBuilder.durable(publishedDlq).build(), + QueueBuilder.durable(deBiasReadyDlq).build() ); } @@ -128,14 +134,14 @@ Declarables deadQueues() { Declarables bindings() { return getDeclarables(exchange, createdQueue, transformationToEdmExternalQueue, externalValidatedQueue, transformedQueue, normalizedQueue, internalValidatedQueue, - enrichedQueue, mediaProcessedQueue, publishedQueue); + enrichedQueue, mediaProcessedQueue, publishedQueue, deBiasReadyQueue); } @Bean Declarables dlqBindings() { return getDeclarables(exchangeDlq, createdDlq, transformationToEdmExternalDlq, externalValidatedDlq, transformedDlq, normalizedDlq, internalValidatedDlq, enrichedDlq, - mediaProcessedDlq, publishedDlq); + mediaProcessedDlq, publishedDlq, deBiasReadyDlq); } //Suppress: Methods should not have too many parameters warning @@ -145,7 +151,7 @@ Declarables dlqBindings() { private Declarables getDeclarables(String exchange, String created, String transformationToEdmExternal, String externalValidated, String transformed, String normalized, String internalValidated, String enriched, String mediaProcessed, - String published) { + String published, String deBiasReady) { return new Declarables( new Binding(created, DestinationType.QUEUE, exchange, created, null), new Binding(transformationToEdmExternal, DestinationType.QUEUE, exchange, @@ -156,7 +162,8 @@ private Declarables getDeclarables(String exchange, String created, new Binding(internalValidated, DestinationType.QUEUE, exchange, internalValidated, null), new Binding(enriched, DestinationType.QUEUE, exchange, enriched, null), new Binding(mediaProcessed, DestinationType.QUEUE, exchange, mediaProcessed, null), - new Binding(published, DestinationType.QUEUE, exchange, published, null) + new Binding(published, DestinationType.QUEUE, exchange, published, null), + new Binding(deBiasReady, DestinationType.QUEUE, exchange, deBiasReady, null) ); } @@ -181,7 +188,9 @@ Declarables queues() { QueueBuilder.durable(mediaProcessedQueue).deadLetterExchange(exchangeDlq) .deadLetterRoutingKey(mediaProcessedDlq).build(), QueueBuilder.durable(publishedQueue).deadLetterExchange(exchangeDlq) - .deadLetterRoutingKey(publishedDlq).build() + .deadLetterRoutingKey(publishedDlq).build(), + QueueBuilder.durable(deBiasReadyQueue).deadLetterExchange(deBiasReadyDlq) + .deadLetterRoutingKey(deBiasReadyDlq).build() ); } @@ -271,6 +280,14 @@ public String getPublishedDlq() { return publishedDlq; } + public String getDeBiasReadyQueue() { + return deBiasReadyQueue; + } + + public String getDeBiasReadyDlq() { + return deBiasReadyDlq; + } + public AmqpAdmin getAmqpAdmin(){ return amqpAdmin; } @@ -279,6 +296,6 @@ public List getAllQueuesNames(){ return List.of(createdQueue, createdDlq, transformationToEdmExternalQueue, transformationToEdmExternalDlq, externalValidatedQueue, externalValidatedDlq, transformedQueue, transformedDlq, internalValidatedQueue, internalValidatedDlq, normalizedQueue, normalizedDlq, enrichedQueue, enrichedDlq, mediaProcessedQueue, - mediaProcessedDlq, publishedQueue, publishedDlq); + mediaProcessedDlq, publishedQueue, publishedDlq, deBiasReadyQueue, deBiasReadyDlq); } } diff --git a/src/main/java/eu/europeana/metis/sandbox/config/amqp/DeBiasReadyQueueConfig.java b/src/main/java/eu/europeana/metis/sandbox/config/amqp/DeBiasReadyQueueConfig.java new file mode 100644 index 00000000..e231f662 --- /dev/null +++ b/src/main/java/eu/europeana/metis/sandbox/config/amqp/DeBiasReadyQueueConfig.java @@ -0,0 +1,63 @@ +package eu.europeana.metis.sandbox.config.amqp; + +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * The type De bias ready queue config. + */ +@Configuration +public class DeBiasReadyQueueConfig { + + private final MessageConverter messageConverter; + + @Value("${sandbox.rabbitmq.queues.record.debias.ready.concurrency}") + private int concurrentConsumers; + + @Value("${sandbox.rabbitmq.queues.record.debias.ready.max-concurrency}") + private int maxConsumers; + + @Value("${sandbox.rabbitmq.queues.record.debias.ready.prefetch}") + private int messagePrefetchCount; + + @Value("${sandbox.rabbitmq.queues.record.debias.ready.batch-size}") + private int batchSize; + + /** + * Instantiates a new De bias ready queue config. + * + * @param messageConverter the message converter + */ + public DeBiasReadyQueueConfig(MessageConverter messageConverter) { + this.messageConverter = messageConverter; + } + + /** + * Closing factory simple rabbit listener container factory. + * + * @param configurer the configurer + * @param connectionFactory the connection factory + * @return the simple rabbit listener container factory + */ + @Bean + SimpleRabbitListenerContainerFactory deBiasFactory( + SimpleRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + var factory = new SimpleRabbitListenerContainerFactory(); + configurer.configure(factory, connectionFactory); + factory.setConcurrentConsumers(concurrentConsumers); + factory.setMaxConcurrentConsumers(maxConsumers); + factory.setPrefetchCount(messagePrefetchCount); + factory.setMessageConverter(messageConverter); + factory.setConsumerBatchEnabled(true); + factory.setBatchListener(true); + factory.setBatchSize(batchSize); + return factory; + + } +} diff --git a/src/main/java/eu/europeana/metis/sandbox/executor/workflow/DeBiasExecutor.java b/src/main/java/eu/europeana/metis/sandbox/executor/workflow/DeBiasExecutor.java new file mode 100644 index 00000000..fce81d53 --- /dev/null +++ b/src/main/java/eu/europeana/metis/sandbox/executor/workflow/DeBiasExecutor.java @@ -0,0 +1,39 @@ +package eu.europeana.metis.sandbox.executor.workflow; + +import eu.europeana.metis.sandbox.common.Step; +import eu.europeana.metis.sandbox.domain.RecordProcessEvent; +import eu.europeana.metis.sandbox.service.workflow.DeBiasProcessService; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * Consumes debias events and performs processing to the contained record
the result will be stored on database + */ +@Component +class DeBiasExecutor extends StepExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(DeBiasExecutor.class); + private final DeBiasProcessService service; + @Value("${sandbox.rabbitmq.queues.record.debias.ready.queue}") + private String routingKey; + + public DeBiasExecutor(AmqpTemplate amqpTemplate, + DeBiasProcessService service) { + super(amqpTemplate); + this.service = service; + } + + @RabbitListener(queues = "${sandbox.rabbitmq.queues.record.debias.ready.queue}", + containerFactory = "deBiasFactory", + autoStartup = "${sandbox.rabbitmq.queues.record.transformed.auto-start:true}") + public void debiasProcess(List input) { + input.forEach(r -> LOGGER.info("pulling record {} from queue", r.getRecord().getRecordId())); + consumeBatch(routingKey, input, Step.DEBIAS, + () -> service.process(input.stream().map(RecordProcessEvent::getRecord).toList())); + } +} diff --git a/src/main/java/eu/europeana/metis/sandbox/executor/workflow/StepExecutor.java b/src/main/java/eu/europeana/metis/sandbox/executor/workflow/StepExecutor.java index 63b9f1b6..3c36ea7d 100644 --- a/src/main/java/eu/europeana/metis/sandbox/executor/workflow/StepExecutor.java +++ b/src/main/java/eu/europeana/metis/sandbox/executor/workflow/StepExecutor.java @@ -6,6 +6,7 @@ import eu.europeana.metis.sandbox.domain.RecordError; import eu.europeana.metis.sandbox.domain.RecordInfo; import eu.europeana.metis.sandbox.domain.RecordProcessEvent; +import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; import org.slf4j.Logger; @@ -20,10 +21,23 @@ class StepExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(StepExecutor.class); private final AmqpTemplate amqpTemplate; + /** + * Instantiates a new Step executor. + * + * @param amqpTemplate the amqp template + */ StepExecutor(AmqpTemplate amqpTemplate) { this.amqpTemplate = amqpTemplate; } + /** + * Consume. + * + * @param routingKey the routing key + * @param input the input + * @param step the step + * @param recordInfoSupplier the record info supplier + */ public void consume(String routingKey, RecordProcessEvent input, Step step, Supplier recordInfoSupplier) { if (input.getStatus() == Status.FAIL) { @@ -48,10 +62,48 @@ public void consume(String routingKey, RecordProcessEvent input, Step step, } } + /** + * Consume batch. + * + * @param routingKey the routing key + * @param input the input + * @param step the step + * @param recordInfoSupplier the record info supplier + */ + public void consumeBatch(String routingKey, List input, Step step, + Supplier> recordInfoSupplier) { + List outputRecordProcessEvents = new ArrayList<>(); + try { + var listRecordInfo = recordInfoSupplier.get(); + for (RecordInfo recordInfo : listRecordInfo) { + var status = recordInfo.getErrors().isEmpty() ? Status.SUCCESS : Status.WARN; + outputRecordProcessEvents.add(new RecordProcessEvent(recordInfo, step, status)); + } + } catch (RecordProcessingException ex) { + for (RecordProcessEvent recordProcessEvent : input) { + outputRecordProcessEvents.add(createFailEvent(recordProcessEvent, step, ex)); + } + } catch (Exception ex) { + for (RecordProcessEvent recordProcessEvent : input) { + outputRecordProcessEvents.add( + createFailEvent(recordProcessEvent, step, + new RecordProcessingException(Long.toString(recordProcessEvent.getRecord().getRecordId()), ex) + ) + ); + } + } + try { + outputRecordProcessEvents.forEach(output -> amqpTemplate.convertAndSend(routingKey, output)); + } catch (RuntimeException rabbitException) { + LOGGER.error("Queue step execution error", rabbitException); + } + } + private RecordProcessEvent createFailEvent(RecordProcessEvent input, Step step, RecordProcessingException ex) { final String stepName = step.value(); final RecordError recordError = new RecordError(ex); - final RecordProcessEvent output = new RecordProcessEvent(new RecordInfo(input.getRecord(), List.of(recordError)), step, Status.FAIL); + final RecordProcessEvent output = new RecordProcessEvent(new RecordInfo(input.getRecord(), List.of(recordError)), step, + Status.FAIL); LOGGER.error("Exception while performing step: [{}]. ", stepName, ex); return output; } diff --git a/src/main/java/eu/europeana/metis/sandbox/repository/RecordLogRepository.java b/src/main/java/eu/europeana/metis/sandbox/repository/RecordLogRepository.java index f4ca6734..b8ad69ba 100644 --- a/src/main/java/eu/europeana/metis/sandbox/repository/RecordLogRepository.java +++ b/src/main/java/eu/europeana/metis/sandbox/repository/RecordLogRepository.java @@ -68,6 +68,16 @@ public interface RecordLogRepository extends JpaRepository findRecordLogByRecordIdDatasetIdAndStepIn(String recordId, String datasetId, Set steps); + /** + * Find record log by dataset id and step set. + * + * @param datasetId the dataset id + * @param step the step + * @return the set + */ + @Query("SELECT rle FROM RecordLogEntity rle WHERE rle.recordId.datasetId = ?1 AND rle.step = ?2") + Set findRecordLogByDatasetIdAndStep(String datasetId, Step step); + /** * Delete records that belong to the given dataset id * diff --git a/src/main/java/eu/europeana/metis/sandbox/service/debias/DeBiasDetectService.java b/src/main/java/eu/europeana/metis/sandbox/service/debias/DeBiasDetectService.java index 3ced5127..fb374ef5 100644 --- a/src/main/java/eu/europeana/metis/sandbox/service/debias/DeBiasDetectService.java +++ b/src/main/java/eu/europeana/metis/sandbox/service/debias/DeBiasDetectService.java @@ -3,6 +3,7 @@ import eu.europeana.metis.sandbox.dto.debias.DetectionInfoDto; import eu.europeana.metis.sandbox.entity.debias.DetectionEntity; import eu.europeana.metis.sandbox.repository.DatasetRepository; +import eu.europeana.metis.sandbox.repository.RecordLogRepository; import eu.europeana.metis.sandbox.repository.debias.DetectRepository; import java.time.ZonedDateTime; import org.springframework.transaction.annotation.Transactional; @@ -24,9 +25,15 @@ public class DeBiasDetectService implements DetectService { * Instantiates a new DeBias detect service. * * @param detectRepository the detect repository + * @param datasetRepository the dataset repository + * @param recordLogRepository the record log repository + * @param recordPublishable the record publishable */ - public DeBiasDetectService(DetectRepository detectRepository, DatasetRepository datasetRepository) { - this.ready = new ReadyState(this, detectRepository, datasetRepository); + public DeBiasDetectService(DetectRepository detectRepository, + DatasetRepository datasetRepository, + RecordLogRepository recordLogRepository, + RecordPublishable recordPublishable) { + this.ready = new ReadyState(this, detectRepository, datasetRepository, recordLogRepository, recordPublishable); this.processing = new ProcessingState(this, detectRepository); this.completed = new CompletedState(this, detectRepository); this.error = new ErrorState(this, detectRepository); diff --git a/src/main/java/eu/europeana/metis/sandbox/service/debias/ReadyState.java b/src/main/java/eu/europeana/metis/sandbox/service/debias/ReadyState.java index aab26719..675d0d7a 100644 --- a/src/main/java/eu/europeana/metis/sandbox/service/debias/ReadyState.java +++ b/src/main/java/eu/europeana/metis/sandbox/service/debias/ReadyState.java @@ -1,9 +1,16 @@ package eu.europeana.metis.sandbox.service.debias; +import eu.europeana.metis.sandbox.common.Step; +import eu.europeana.metis.sandbox.domain.Record.RecordBuilder; +import eu.europeana.metis.sandbox.domain.RecordInfo; import eu.europeana.metis.sandbox.entity.DatasetEntity; import eu.europeana.metis.sandbox.entity.debias.DetectionEntity; import eu.europeana.metis.sandbox.repository.DatasetRepository; +import eu.europeana.metis.sandbox.repository.RecordLogRepository; import eu.europeana.metis.sandbox.repository.debias.DetectRepository; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.transaction.annotation.Transactional; @@ -16,6 +23,8 @@ public class ReadyState extends State implements Stateful { private static final Logger LOGGER = LoggerFactory.getLogger(ReadyState.class); private static final String STATE_NAME = "READY"; private final DatasetRepository datasetRepository; + private final RecordLogRepository recordLogRepository; + private final RecordPublishable recordPublishable; /** * Instantiates a new Ready state. @@ -24,11 +33,16 @@ public class ReadyState extends State implements Stateful { * @param detectRepository the detect repository */ public ReadyState(DetectService debiasMachine, - DetectRepository detectRepository, DatasetRepository datasetRepository) { + DetectRepository detectRepository, + DatasetRepository datasetRepository, + RecordLogRepository recordLogRepository, + RecordPublishable recordPublishable) { this.stateMachine = debiasMachine; this.name = STATE_NAME; this.detectRepository = detectRepository; this.datasetRepository = datasetRepository; + this.recordLogRepository = recordLogRepository; + this.recordPublishable = recordPublishable; this.terminalState = false; } @@ -47,15 +61,12 @@ public void success(Integer datasetId) { public boolean process(Integer datasetId) { LOGGER.info("{} {}", STATE_NAME, datasetId); try { - DatasetEntity dataset = datasetRepository.findById(datasetId).orElseThrow(); - DetectionEntity detectionEntity = detectRepository.findDetectionEntityByDatasetId_DatasetId(datasetId); - if (detectionEntity == null) { - detectionEntity = new DetectionEntity(dataset, STATE_NAME); - detectRepository.save(detectionEntity); - } else { - detectRepository.updateState(datasetId, STATE_NAME); - } + DatasetEntity dataset = getDatasetAndProcessDetectionEntity(datasetId); + + processDatasetAndPublishToDeBiasReadyQueue(dataset); + success(datasetId); + LOGGER.info("success {} {}", STATE_NAME, datasetId); } catch (RuntimeException e) { fail(datasetId); @@ -64,4 +75,36 @@ public boolean process(Integer datasetId) { } return this.stateMachine.process(datasetId); } + + private @NotNull DatasetEntity getDatasetAndProcessDetectionEntity(Integer datasetId) { + DatasetEntity dataset = datasetRepository.findById(datasetId).orElseThrow(); + DetectionEntity detectionEntity = detectRepository.findDetectionEntityByDatasetId_DatasetId(datasetId); + if (detectionEntity == null) { + detectionEntity = new DetectionEntity(dataset, STATE_NAME); + detectRepository.save(detectionEntity); + } else { + detectRepository.updateState(datasetId, STATE_NAME); + } + return dataset; + } + + private void processDatasetAndPublishToDeBiasReadyQueue(DatasetEntity dataset) { + this.recordLogRepository.findRecordLogByDatasetIdAndStep(dataset.getDatasetId().toString(), Step.VALIDATE_INTERNAL) + .parallelStream() + .map(r -> { + LOGGER.info("Records in: {} :: {}", STATE_NAME, r.getRecordId()); + return new RecordInfo(new RecordBuilder() + .recordId(r.getRecordId().getId()) + .providerId(r.getRecordId().getProviderId()) + .europeanaId(r.getRecordId().getEuropeanaId()) + .datasetId(r.getRecordId().getDatasetId()) + .datasetName(dataset.getDatasetName()) + .country(dataset.getCountry()) + .language(dataset.getLanguage()) + .content(r.getContent().getBytes(StandardCharsets.UTF_8)) + .build(), new ArrayList<>());} + ) + .forEach(recordPublishable::publishToDeBiasQueue); + } + } diff --git a/src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishDeBiasQueueService.java b/src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishDeBiasQueueService.java new file mode 100644 index 00000000..f23e2491 --- /dev/null +++ b/src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishDeBiasQueueService.java @@ -0,0 +1,52 @@ +package eu.europeana.metis.sandbox.service.debias; + +import eu.europeana.metis.sandbox.common.Status; +import eu.europeana.metis.sandbox.common.Step; +import eu.europeana.metis.sandbox.domain.RecordInfo; +import eu.europeana.metis.sandbox.domain.RecordProcessEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +/** + * The type Record publish de bias queue service. + */ +@Service +public class RecordPublishDeBiasQueueService implements RecordPublishable { + + private static final Logger LOGGER = LoggerFactory.getLogger( + RecordPublishDeBiasQueueService.class); + + private final AmqpTemplate amqpTemplate; + private final String deBiasReadyQueue; + + /** + * Instantiates a new Record publish de bias queue service. + * + * @param amqpTemplate the amqp template + * @param deBiasReadyQueue the de bias ready queue + */ + public RecordPublishDeBiasQueueService( + AmqpTemplate amqpTemplate, + @Qualifier("deBiasReadyQueue") String deBiasReadyQueue) { + this.amqpTemplate = amqpTemplate; + this.deBiasReadyQueue = deBiasReadyQueue; + } + + /** + * Publish to de bias queue. + * + * @param recordToAnalyse the record to publish + */ + @Override + public void publishToDeBiasQueue(RecordInfo recordToAnalyse) { + try { + amqpTemplate.convertAndSend(deBiasReadyQueue, new RecordProcessEvent(recordToAnalyse, Step.DEBIAS, Status.SUCCESS)); + } catch (AmqpException e) { + LOGGER.error("There was an issue publishing the record: {} ", recordToAnalyse.getRecord().getRecordId(), e); + } + } +} diff --git a/src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishable.java b/src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishable.java new file mode 100644 index 00000000..b9cc3dec --- /dev/null +++ b/src/main/java/eu/europeana/metis/sandbox/service/debias/RecordPublishable.java @@ -0,0 +1,16 @@ +package eu.europeana.metis.sandbox.service.debias; + +import eu.europeana.metis.sandbox.domain.RecordInfo; + +/** + * The interface Record publishable. + */ +public interface RecordPublishable { + + /** + * Publish to de bias queue. + * + * @param recordToPublish the record to publish + */ + void publishToDeBiasQueue(RecordInfo recordToPublish); +} diff --git a/src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessService.java b/src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessService.java new file mode 100644 index 00000000..2e0727ca --- /dev/null +++ b/src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessService.java @@ -0,0 +1,19 @@ +package eu.europeana.metis.sandbox.service.workflow; + +import eu.europeana.metis.sandbox.domain.Record; +import eu.europeana.metis.sandbox.domain.RecordInfo; +import java.util.List; + +/** + * The interface De bias process service. + */ +public interface DeBiasProcessService { + + /** + * Process record info. + * + * @param recordToProcess the record to process + * @return the record info + */ + List process(List recordToProcess); +} diff --git a/src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessServiceImpl.java b/src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessServiceImpl.java new file mode 100644 index 00000000..3f6fcb24 --- /dev/null +++ b/src/main/java/eu/europeana/metis/sandbox/service/workflow/DeBiasProcessServiceImpl.java @@ -0,0 +1,33 @@ +package eu.europeana.metis.sandbox.service.workflow; + +import eu.europeana.metis.sandbox.domain.Record; +import eu.europeana.metis.sandbox.domain.RecordInfo; +import java.util.List; +import java.util.Objects; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.stereotype.Service; + +@Service +class DeBiasProcessServiceImpl implements DeBiasProcessService { + + private static final Logger LOGGER = LoggerFactory.getLogger(DeBiasProcessServiceImpl.class); + + private final LockRegistry lockRegistry; + + public DeBiasProcessServiceImpl(LockRegistry lockRegistry) { + this.lockRegistry = lockRegistry; + } + + @Override + public List process(List recordToProcess) { + Objects.requireNonNull(recordToProcess, "List of records is required"); + recordToProcess.forEach(record -> { + + LOGGER.info("DeBias Execution over: {}", record.getRecordId(), record.getContent()); + }); + + return List.of(); + } +} diff --git a/src/main/resources/sample.application.yml b/src/main/resources/sample.application.yml index 77e7a52a..ceb6e22c 100644 --- a/src/main/resources/sample.application.yml +++ b/src/main/resources/sample.application.yml @@ -96,8 +96,6 @@ sandbox: url: connectTimeout: 300 requestTimeout: 300 - batchSize: 20 - workers: 2 metrics: frequency: '*/5 * * * * *' # every five seconds. validation: @@ -233,6 +231,15 @@ sandbox: concurrency: 2 max-concurrency: 2 prefetch: 1 + debias: + ready: + queue: sandbox.record.debias.ready + dlq: ${sandbox.rabbitmq.queues.record.debias.ready.queue}.dlq + auto-start: true + concurrency: 2 + max-concurrency: 2 + prefetch: 1 + batch-size: 20 elastic: apm: service_name: metis-sandbox-rest-dev