Skip to content

Commit

Permalink
batch publish in declarative clients
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Oct 24, 2024
1 parent ea4b91b commit aef9602
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 4 deletions.
4 changes: 3 additions & 1 deletion docs/guide/src/docs/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ include::{root-dir}/subprojects/micronaut-amazon-awssdk-sqs/src/test/groovy/com/
<6> You can publish a string with custom delay
<7> You can publish a string with custom FIFO queue group
<8> You can publish a string with custom delay and FIFO queue group
<9> You can delete published message using the message ID if the
<9> You can send multiple messages at once when the argument is `Publisher`
<10> If the return type is also publisher type then **you need to subscribe to the publisher to actually send the messages**
<11> You can delete published message using the message ID if the

[source,java,indent=0,options="nowrap",role="secondary"]
.Publishing String Records (AWS SDK 1.x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import io.micronaut.context.BeanContext;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;

Expand Down Expand Up @@ -108,7 +111,7 @@ public Object intercept(MethodInvocationContext<Object, Object> context) {
}

private Object doIntercept(MethodInvocationContext<Object, Object> context, SimpleQueueService service, String queueName, String group, Integer delay) {
Argument[] arguments = context.getArguments();
Argument<?>[] arguments = context.getArguments();
Map<String, Object> params = context.getParameterValueMap();

if (arguments.length == 1 && context.getMethodName().startsWith("delete")) {
Expand Down Expand Up @@ -141,15 +144,40 @@ private Object doIntercept(MethodInvocationContext<Object, Object> context, Simp
return service.sendMessage(queueName, new String((byte[]) message), delay, group);
}

if (Publisher.class.isAssignableFrom(messageType)) {
Publisher<String> messageIdsPublisher;

if (queueArguments.message.getTypeParameters()[0].equalsType(Argument.STRING)) {
messageIdsPublisher = service.sendMessages(queueName, (Publisher<String>) message, delay, group);
} else {
messageIdsPublisher = service.sendMessages(queueName, Flux.from((Publisher<?>) message).map(this::convertMessageToJson), delay, group);
}

if (context.getReturnType().asArgument().isVoid()) {
Flux.from(messageIdsPublisher).subscribe();
return null;
}

if (Publishers.isConvertibleToPublisher(context.getReturnType().getType())) {
return Publishers.convertPublisher(beanContext.getConversionService(), messageIdsPublisher, context.getReturnType().getType());
}

return beanContext.getConversionService().convert(messageIdsPublisher, context.getReturnType().getType());
}

return sendJson(service, queueName, message, delay, group);
}

throw new UnsupportedOperationException("Cannot implement method " + context.getExecutableMethod());
}

private String sendJson(SimpleQueueService service, String queueName, Object message, int delay, String group) {
return service.sendMessage(queueName, convertMessageToJson(message), delay, group);
}

private String convertMessageToJson(Object message) {
try {
return service.sendMessage(queueName, objectMapper.writeValueAsString(message), delay, group);
return objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Failed to marshal " + message + " to JSON", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.Queue;
import com.agorapulse.micronaut.amazon.awssdk.sqs.annotation.QueueClient;
import org.reactivestreams.Publisher;

@QueueClient // <1>
interface DefaultClient {
Expand All @@ -38,7 +39,11 @@ interface DefaultClient {

String sendMessage(String record, int delay, String group); // <8>

void deleteMessage(String messageId); // <9>
void sendStringMessages(Publisher<String> messages); // <9>

Publisher<String> sendMessages(Publisher<Pogo> messages); // <10>

void deleteMessage(String messageId); // <11>

String OTHER_QUEUE = "OtherQueue";
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package com.agorapulse.micronaut.amazon.awssdk.sqs
import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.context.ApplicationContext
import io.micronaut.inject.qualifiers.Qualifiers
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import spock.lang.AutoCleanup
import spock.lang.Specification

Expand Down Expand Up @@ -146,6 +148,29 @@ class QueueClientSpec extends Specification {
1 * defaultService.sendMessage(DEFAULT_QUEUE_NAME, MESSAGE, DELAY, GROUP) >> ID
}

void 'can send multiple messages when publisher is a parameter'() {
given:
List<String> ids = [ID + 1, ID + 2, ID + 3]
DefaultClient client = context.getBean(DefaultClient)

when:
Publisher<String> messages = client.sendMessages(Flux.just(POGO, POGO, POGO))

then:
Flux.from(messages).collectList().block() == ids

1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3)
}

void 'can send multiple string messages and return void'() {
given:
DefaultClient client = context.getBean(DefaultClient)
when:
client.sendStringMessages(Flux.just(MESSAGE, MESSAGE, MESSAGE))
then:
1 * defaultService.sendMessages(DEFAULT_QUEUE_NAME, _ as Publisher, 0, null) >> Flux.just(ID + 1, ID + 2, ID + 3)
}

void 'needs to follow the method convention rules'() {
given:
TestClient client = context.getBean(TestClient)
Expand Down

0 comments on commit aef9602

Please sign in to comment.