From aef9602d4bba8719db57777af920f8dae25b3233 Mon Sep 17 00:00:00 2001 From: musketyr Date: Thu, 24 Oct 2024 11:15:44 +0200 Subject: [PATCH] batch publish in declarative clients --- docs/guide/src/docs/asciidoc/sqs.adoc | 4 ++- .../awssdk/sqs/QueueClientIntroduction.java | 32 +++++++++++++++++-- .../amazon/awssdk/sqs/DefaultClient.java | 7 +++- .../amazon/awssdk/sqs/QueueClientSpec.groovy | 25 +++++++++++++++ 4 files changed, 64 insertions(+), 4 deletions(-) diff --git a/docs/guide/src/docs/asciidoc/sqs.adoc b/docs/guide/src/docs/asciidoc/sqs.adoc index 715843d37..88b2523e0 100644 --- a/docs/guide/src/docs/asciidoc/sqs.adoc +++ b/docs/guide/src/docs/asciidoc/sqs.adoc @@ -92,7 +92,9 @@ include::{root-dir}/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/ <6> You can publish a string with custom delay <7> You can publish a string with custom FIFO queue group <8> You can publish a string with custom delay and FIFO queue group -<9> You can delete published message using the message ID if the +<9> You can send multiple messages at once when the argument is `Publisher` +<10> If the return type is also publisher type then **you need to subscribe to the publisher to actually send the messages** +<11> You can delete published message using the message ID if the [source,java,indent=0,options="nowrap",role="secondary"] .Publishing String Records (AWS SDK 1.x) diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java index e5048e653..f2ccb6bae 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/main/java/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientIntroduction.java @@ -28,9 +28,12 @@ import io.micronaut.context.BeanContext; import io.micronaut.context.annotation.Requires; import io.micronaut.core.annotation.AnnotationValue; +import io.micronaut.core.async.publisher.Publishers; import io.micronaut.core.type.Argument; import io.micronaut.core.util.StringUtils; import io.micronaut.inject.qualifiers.Qualifiers; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; @@ -108,7 +111,7 @@ public Object intercept(MethodInvocationContext context) { } private Object doIntercept(MethodInvocationContext context, SimpleQueueService service, String queueName, String group, Integer delay) { - Argument[] arguments = context.getArguments(); + Argument[] arguments = context.getArguments(); Map params = context.getParameterValueMap(); if (arguments.length == 1 && context.getMethodName().startsWith("delete")) { @@ -141,6 +144,27 @@ private Object doIntercept(MethodInvocationContext context, Simp return service.sendMessage(queueName, new String((byte[]) message), delay, group); } + if (Publisher.class.isAssignableFrom(messageType)) { + Publisher messageIdsPublisher; + + if (queueArguments.message.getTypeParameters()[0].equalsType(Argument.STRING)) { + messageIdsPublisher = service.sendMessages(queueName, (Publisher) message, delay, group); + } else { + messageIdsPublisher = service.sendMessages(queueName, Flux.from((Publisher) message).map(this::convertMessageToJson), delay, group); + } + + if (context.getReturnType().asArgument().isVoid()) { + Flux.from(messageIdsPublisher).subscribe(); + return null; + } + + if (Publishers.isConvertibleToPublisher(context.getReturnType().getType())) { + return Publishers.convertPublisher(beanContext.getConversionService(), messageIdsPublisher, context.getReturnType().getType()); + } + + return beanContext.getConversionService().convert(messageIdsPublisher, context.getReturnType().getType()); + } + return sendJson(service, queueName, message, delay, group); } @@ -148,8 +172,12 @@ private Object doIntercept(MethodInvocationContext context, Simp } private String sendJson(SimpleQueueService service, String queueName, Object message, int delay, String group) { + return service.sendMessage(queueName, convertMessageToJson(message), delay, group); + } + + private String convertMessageToJson(Object message) { try { - return service.sendMessage(queueName, objectMapper.writeValueAsString(message), delay, group); + return objectMapper.writeValueAsString(message); } catch (JsonProcessingException e) { throw new IllegalArgumentException("Failed to marshal " + message + " to JSON", e); } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java index 03b84d4e4..90e92de4d 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/DefaultClient.java @@ -19,6 +19,7 @@ import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.Queue; import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.QueueClient; +import org.reactivestreams.Publisher; @QueueClient // <1> interface DefaultClient { @@ -38,7 +39,11 @@ interface DefaultClient { String sendMessage(String record, int delay, String group); // <8> - void deleteMessage(String messageId); // <9> + void sendStringMessages(Publisher messages); // <9> + + Publisher sendMessages(Publisher messages); // <10> + + void deleteMessage(String messageId); // <11> String OTHER_QUEUE = "OtherQueue"; } diff --git a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy index ab4816408..1b4ff2702 100644 --- a/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy +++ b/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/agorapulse/micronaut/amazon/awssdk/sqs/QueueClientSpec.groovy @@ -20,6 +20,8 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.context.ApplicationContext import io.micronaut.inject.qualifiers.Qualifiers +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux import spock.lang.AutoCleanup import spock.lang.Specification @@ -146,6 +148,29 @@ class QueueClientSpec extends Specification { 1 * defaultService.sendMessage(DEFAULT_QUEUE_NAME, MESSAGE, DELAY, GROUP) >> ID } + void 'can send multiple messages when publisher is a parameter'() { + given: + List ids = [ID + 1, ID + 2, ID + 3] + DefaultClient client = context.getBean(DefaultClient) + + when: + Publisher messages = client.sendMessages(Flux.just(POGO, POGO, POGO)) + + then: + Flux.from(messages).collectList().block() == ids + + 1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3) + } + + void 'can send multiple string messages and return void'() { + given: + DefaultClient client = context.getBean(DefaultClient) + when: + client.sendStringMessages(Flux.just(MESSAGE, MESSAGE, MESSAGE)) + then: + 1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3) + } + void 'needs to follow the method convention rules'() { given: TestClient client = context.getBean(TestClient)