Skip to content

Commit

Permalink
feat(amqp): be liberal and handle invalid configurations
Browse files Browse the repository at this point in the history
In case spring-amqp changes how it handles inputs, the library should still try to do its best
  • Loading branch information
timonback committed Sep 8, 2024
1 parent 3afca12 commit 888a78a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
* <li> Consumer consumes the message from the queue
* </ol>
*/
// TODO: should this do validation and throw errors when an invalid rabbit configuration is found?
@Slf4j
public class RabbitListenerUtil {
public static final String BINDING_NAME = "amqp";
Expand All @@ -60,47 +59,29 @@ public static String getChannelName(RabbitListener annotation, StringValueResolv
Stream<String> annotationBindingChannelNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> channelNameFromAnnotationBindings(binding, resolver));

return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(queue -> log.debug("Resolved channel name: {}", queue))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException(
"No channel name was found in @RabbitListener annotation (neither in queues nor bindings property)"));
Stream<String> stream = Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames);
return resolveFirstValue(stream, resolver, "channel name");
}

public static String getChannelId(RabbitListener annotation, StringValueResolver resolver) {
Stream<String> annotationBindingChannelIds = Arrays.stream(annotation.bindings())
.flatMap(binding -> channelIdFromAnnotationBindings(binding, resolver));

return Stream.concat(streamQueueNames(annotation).map(ReferenceUtil::toValidId), annotationBindingChannelIds)
.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(queue -> log.debug("Resolved channel id: {}", queue))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException(
"No channel id was found in @RabbitListener annotation (neither in queues nor bindings property)"));
Stream<String> stream =
Stream.concat(streamQueueNames(annotation).map(ReferenceUtil::toValidId), annotationBindingChannelIds);
return resolveFirstValue(stream, resolver, "channel id");
}

private static String getQueueName(RabbitListener annotation, StringValueResolver resolver) {
Stream<String> annotationBindingChannelNames = Arrays.stream(annotation.bindings())
Stream<String> annotationBindingQueueNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> Stream.of(binding.value().name()));

return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(queue -> log.debug("Resolved queue name: {}", queue))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException(
"No queue name was found in @RabbitListener annotation (neither in queues nor bindings property)"));
Stream<String> stream = Stream.concat(streamQueueNames(annotation), annotationBindingQueueNames);
return resolveFirstValue(stream, resolver, "queue name");
}

private static Stream<String> channelNameFromAnnotationBindings(
QueueBinding binding, StringValueResolver resolver) {
String queueName = resolver.resolveStringValue(binding.value().name());
String exchangeName = resolver.resolveStringValue(binding.exchange().name());

String[] routingKeys = binding.key();
Expand All @@ -111,15 +92,6 @@ private static Stream<String> channelNameFromAnnotationBindings(
return Arrays.stream(routingKeys).map(resolver::resolveStringValue).map(routingKey -> exchangeName);
}

private static String exchangeTargetChannelIdFromAnnotationBindings(
RabbitListener annotation, StringValueResolver resolver) {
return Arrays.stream(annotation.bindings())
.map(binding -> binding.value().name() + "-id")
.map(resolver::resolveStringValue)
.findFirst()
.orElse(null);
}

private static Stream<String> channelIdFromAnnotationBindings(QueueBinding binding, StringValueResolver resolver) {
String queueName = resolver.resolveStringValue(binding.value().name());
String exchangeName = resolver.resolveStringValue(binding.exchange().name());
Expand Down Expand Up @@ -170,6 +142,14 @@ public static Map<String, ChannelBinding> buildChannelBinding(
return Map.of(BINDING_NAME, channelBinding.build());
}

private static String exchangeTargetChannelIdFromAnnotationBindings(
RabbitListener annotation, StringValueResolver resolver) {
Stream<String> stream = Arrays.stream(annotation.bindings())
.map(binding -> binding.value().name() + "-id");

return resolveFirstValue(stream, resolver, "exchange target channel id");
}

private static AMQPChannelExchangeProperties buildExchangeProperties(
RabbitListener annotation, String exchangeName, RabbitListenerUtilContext context) {

Expand Down Expand Up @@ -314,4 +294,13 @@ public static Map<String, MessageBinding> buildMessageBinding() {
// currently the feature to define amqp message binding is not implemented.
return Map.of(BINDING_NAME, new AMQPMessageBinding());
}

private static String resolveFirstValue(Stream<String> values, StringValueResolver resolver, String valueType) {
return values.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(value -> log.debug("Resolved {}: {}", valueType, value))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("No " + valueType
+ " was found in @RabbitListener annotation (neither in queues nor bindings property)"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.github.springwolf.asyncapi.v3.model.channel.ChannelReference;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.TopicExchange;
Expand Down Expand Up @@ -106,8 +105,10 @@ void buildChannelBinding() {
channelBinding.get("amqp"));
}

/**
* Technically an invalid configuration as queue will be part of the spring context
*/
@Test
@Disabled("TODO: what to do with invalid configuration")
void buildChannelBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesConfiguration.class);
Expand Down Expand Up @@ -148,8 +149,10 @@ void buildOperationBinding() {
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}

/**
* Technically an invalid configuration as queue will be part of the spring context
*/
@Test
@Disabled("TODO: what to do with invalid configuration")
void buildOperationBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesConfiguration.class);
Expand All @@ -161,7 +164,7 @@ void buildOperationBindingWithEmptyContext() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
assertEquals(AMQPOperationBinding.builder().cc(List.of("queue-1")).build(), operationBinding.get("amqp"));
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}

@Test
Expand Down Expand Up @@ -197,8 +200,10 @@ void getChannelName() {
assertEquals("queue-1", channelName);
}

/**
* Technically an invalid configuration as context should be empty
*/
@Test
@Disabled("TODO: what to do with invalid configuration")
void buildChannelBinding() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesToDeclare.class);
Expand All @@ -215,9 +220,9 @@ void buildChannelBinding() {
.is(AMQPChannelType.QUEUE)
.queue(AMQPChannelQueueProperties.builder()
.name("queue-1")
.durable(true)
.autoDelete(false)
.exclusive(false)
.durable(false)
.autoDelete(true)
.exclusive(true)
.vhost("/")
.build())
.build(),
Expand Down Expand Up @@ -250,8 +255,10 @@ void buildChannelBindingWithEmptyContext() {
channelBinding.get("amqp"));
}

/**
* Technically an invalid configuration as context should be empty
*/
@Test
@Disabled("TODO: what to do with invalid configuration")
void buildOperationBinding() {
// given
RabbitListener annotation = getAnnotation(ClassWithQueuesToDeclare.class);
Expand All @@ -263,7 +270,7 @@ void buildOperationBinding() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
assertEquals(AMQPOperationBinding.builder().cc(List.of("queue-1")).build(), operationBinding.get("amqp"));
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}

@Test
Expand Down Expand Up @@ -380,8 +387,10 @@ void buildChannelBindingWithExchangeContext() {
channelBinding.get("amqp"));
}

/**
* Technically an invalid configuration as queue and exchange will be part of the spring context
*/
@Test
@Disabled("TODO: what to do with invalid configuration")
void buildChannelBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingConfiguration.class);
Expand All @@ -396,9 +405,11 @@ void buildChannelBindingWithEmptyContext() {
assertEquals(
AMQPChannelBinding.builder()
.is(AMQPChannelType.ROUTING_KEY)
.name("#")
.channel(ChannelReference.fromChannel("queue-1-id"))
.exchange(AMQPChannelExchangeProperties.builder()
.name("exchange-name")
.type(AMQPChannelExchangeType.TOPIC)
.type(AMQPChannelExchangeType.DIRECT)
.durable(true)
.autoDelete(false)
.build())
Expand Down Expand Up @@ -436,8 +447,10 @@ void buildOperationBindingWithExchangeContext() {
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}

/**
* Technically an invalid configuration as queue and exchange will be part of the spring context
*/
@Test
@Disabled("TODO: what to do with invalid configuration")
void buildOperationBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingConfiguration.class);
Expand All @@ -449,7 +462,7 @@ void buildOperationBindingWithEmptyContext() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
assertEquals(AMQPOperationBinding.builder().cc(List.of("")).build(), operationBinding.get("amqp"));
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}

@Test
Expand Down Expand Up @@ -528,7 +541,7 @@ void buildChannelBinding() {
}

@Test
@Disabled("TODO: what to do with invalid configuration")
// @Disabled("TODO: what to do with invalid configuration")
void buildChannelBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingsAndRoutingKeyConfiguration.class);
Expand All @@ -543,6 +556,8 @@ void buildChannelBindingWithEmptyContext() {
assertEquals(
AMQPChannelBinding.builder()
.is(AMQPChannelType.ROUTING_KEY)
.name("routing-key")
.channel(ChannelReference.fromChannel("queue-1-id"))
.exchange(AMQPChannelExchangeProperties.builder()
.name("exchange-name")
.type(AMQPChannelExchangeType.DIRECT)
Expand All @@ -569,7 +584,7 @@ void buildOperationBinding() {
}

@Test
@Disabled("TODO: what to do with invalid configuration")
// @Disabled("TODO: what to do with invalid configuration")
void buildOperationBindingWithEmptyContext() {
// given
RabbitListener annotation = getAnnotation(ClassWithBindingsAndRoutingKeyConfiguration.class);
Expand All @@ -581,8 +596,7 @@ void buildOperationBindingWithEmptyContext() {
// then
assertEquals(1, operationBinding.size());
assertEquals(Sets.newTreeSet("amqp"), operationBinding.keySet());
assertEquals(
AMQPOperationBinding.builder().cc(List.of("routing-key")).build(), operationBinding.get("amqp"));
assertEquals(AMQPOperationBinding.builder().build(), operationBinding.get("amqp"));
}

@Test
Expand Down

0 comments on commit 888a78a

Please sign in to comment.