diff --git a/consumer/spring-rabbit-consumer/README.adoc b/consumer/spring-rabbit-consumer/README.adoc index c0531b36..6edd8681 100644 --- a/consumer/spring-rabbit-consumer/README.adoc +++ b/consumer/spring-rabbit-consumer/README.adoc @@ -4,7 +4,7 @@ A consumer that allows you to send messages to RabbitMQ. ## Beans for injection -You can import `RabbitConsumerConfiguration` in the application and then inject the following bean. +The `RabbitConsumerConfiguration` auto-configuration provides the following bean: `Function, Object> rabbitConsumer` @@ -12,7 +12,7 @@ You can use `rabbitConsumer` as a qualifier when injecting. ## Configuration Options -All configuration properties are prefixed with `rabbit`. +All configuration properties are prefixed with `rabbit.consumer`. For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java[RabbitConsumerProperties]. diff --git a/consumer/spring-rabbit-consumer/build.gradle b/consumer/spring-rabbit-consumer/build.gradle index e008d507..baa5440a 100644 --- a/consumer/spring-rabbit-consumer/build.gradle +++ b/consumer/spring-rabbit-consumer/build.gradle @@ -1,5 +1,6 @@ dependencies { api 'org.springframework.integration:spring-integration-amqp' api 'org.springframework.boot:spring-boot-starter-amqp' - api 'org.springframework.boot:spring-boot-starter-web' + + testImplementation 'org.springframework.amqp:spring-rabbit-junit' } diff --git a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java index 9b010f71..147eeba7 100644 --- a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java +++ b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ package org.springframework.cloud.fn.consumer.rabbit; -import java.util.function.Function; +import java.util.function.Consumer; import com.rabbitmq.client.impl.CredentialsProvider; import com.rabbitmq.client.impl.CredentialsRefreshService; @@ -33,15 +33,16 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer; import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer; +import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import org.springframework.expression.Expression; import org.springframework.integration.amqp.dsl.Amqp; @@ -51,7 +52,7 @@ import org.springframework.messaging.MessageHandler; /** - * A configuration for RabbitMQ Consumer function. Uses a + * Auto-configuration for RabbitMQ Consumer function. Uses a * {@link AmqpOutboundChannelAdapterSpec} to save payload contents to RabbitMQ. * * @author Soby Chako @@ -59,7 +60,7 @@ * @author Chris Bono */ @EnableConfigurationProperties(RabbitConsumerProperties.class) -@Configuration +@AutoConfiguration(after = RabbitAutoConfiguration.class) public class RabbitConsumerConfiguration implements DisposableBean { @Autowired @@ -86,11 +87,8 @@ public class RabbitConsumerConfiguration implements DisposableBean { private CachingConnectionFactory ownConnectionFactory; @Bean - public Function, Object> rabbitConsumer(@Qualifier("amqpChannelAdapter") MessageHandler messageHandler) { - return o -> { - messageHandler.handleMessage(o); - return ""; - }; + public Consumer> rabbitConsumer(@Qualifier("amqpChannelAdapter") MessageHandler messageHandler) { + return messageHandler::handleMessage; } @Bean @@ -100,9 +98,9 @@ public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbi AmqpOutboundChannelAdapterSpec handler = Amqp .outboundAdapter(rabbitTemplate( - this.properties.isOwnConnection() ? buildLocalConnectionFactory() : rabbitConnectionFactory)) - .mappedRequestHeaders(properties.getMappedRequestHeaders()) - .defaultDeliveryMode(properties.getPersistentDeliveryMode() ? MessageDeliveryMode.PERSISTENT + (this.properties.isOwnConnection()) ? buildLocalConnectionFactory() : rabbitConnectionFactory)) + .mappedRequestHeaders(this.properties.getMappedRequestHeaders()) + .defaultDeliveryMode((this.properties.getPersistentDeliveryMode()) ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT) .headersMappedLast(this.properties.isHeadersMappedLast()); @@ -129,8 +127,7 @@ public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbi return handler; } - @Bean - public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) { + private RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); if (this.messageConverter != null) { rabbitTemplate.setMessageConverter(this.messageConverter); @@ -184,7 +181,7 @@ private CachingConnectionFactory rabbitConnectionFactory(RabbitProperties proper CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer( properties); - cachingConnectionFactoryConfigurer.setConnectionNameStrategy(cf -> "rabbit.sink.own.connection"); + cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.sink.own.connection"); cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory); cachingConnectionFactory.afterPropertiesSet(); diff --git a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java index c5c05bfe..8b0c9b30 100644 --- a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java +++ b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,12 @@ import org.springframework.expression.Expression; import org.springframework.validation.annotation.Validated; -@ConfigurationProperties("rabbit") +/** + * Configuration properties for RabbitMQ consumer. + * + * @author Soby Chacko + */ +@ConfigurationProperties("rabbit.consumer") @Validated public class RabbitConsumerProperties { @@ -76,7 +81,7 @@ public class RabbitConsumerProperties { /** * When mapping headers for the outbound message, determine whether the headers are - * mapped before the message is converted, or afterwards. + * mapped before the message is converted, or afterward. */ private boolean headersMappedLast = true; diff --git a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/package-info.java b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/package-info.java new file mode 100644 index 00000000..0a43bff8 --- /dev/null +++ b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/package-info.java @@ -0,0 +1,4 @@ +/** + * The RabbitMQ consumer auto-configuration support. + */ +package org.springframework.cloud.fn.consumer.rabbit; diff --git a/consumer/spring-rabbit-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/consumer/spring-rabbit-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..f0617a48 --- /dev/null +++ b/consumer/spring-rabbit-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.consumer.rabbit.RabbitConsumerConfiguration diff --git a/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java b/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java new file mode 100644 index 00000000..2c66c45d --- /dev/null +++ b/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java @@ -0,0 +1,58 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.fn.consumer.rabbit; + +import java.util.function.Consumer; + +import org.junit.jupiter.api.Test; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.RabbitAvailable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +import static org.assertj.core.api.Assertions.assertThat; + +@RabbitAvailable(RabbitConsumerTests.TEST_QUEUE) +@SpringBootTest(properties = { "rabbit.consumer.routingKey=" + RabbitConsumerTests.TEST_QUEUE, + "rabbit.consumer.own-connection=true" }) +public class RabbitConsumerTests { + + static final String TEST_QUEUE = "test-consumer-queue"; + + @Test + void rabbitSupplierReceivesData(@Autowired RabbitTemplate rabbitTemplate, + @Autowired Consumer> rabbitConsumer) { + + rabbitConsumer.accept(new GenericMessage<>("test data1")); + rabbitConsumer.accept(new GenericMessage<>("test data2")); + + Object received = rabbitTemplate.receiveAndConvert(TEST_QUEUE, 10_000); + assertThat(received).isEqualTo("test data1"); + received = rabbitTemplate.receiveAndConvert(TEST_QUEUE, 10_000); + assertThat(received).isEqualTo("test data2"); + } + + @SpringBootApplication + public static class TestConfiguration { + + } + +} diff --git a/supplier/spring-rabbit-supplier/README.adoc b/supplier/spring-rabbit-supplier/README.adoc index c8015051..175367f7 100644 --- a/supplier/spring-rabbit-supplier/README.adoc +++ b/supplier/spring-rabbit-supplier/README.adoc @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data. ## Beans for injection -You can import the `RabbitSupplierConfiguration` in the application and then inject the following bean. +The `RabbitSupplierConfiguration` auto-configuration provides the following bean: `rabbitSupplier` diff --git a/supplier/spring-rabbit-supplier/build.gradle b/supplier/spring-rabbit-supplier/build.gradle index e008d507..baa5440a 100644 --- a/supplier/spring-rabbit-supplier/build.gradle +++ b/supplier/spring-rabbit-supplier/build.gradle @@ -1,5 +1,6 @@ dependencies { api 'org.springframework.integration:spring-integration-amqp' api 'org.springframework.boot:spring-boot-starter-amqp' - api 'org.springframework.boot:spring-boot-starter-web' + + testImplementation 'org.springframework.amqp:spring-rabbit-junit' } diff --git a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java index e28234d0..7e584ba6 100644 --- a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java +++ b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,14 +38,15 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer; import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer; +import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import org.springframework.integration.amqp.dsl.Amqp; import org.springframework.integration.amqp.dsl.AmqpInboundChannelAdapterSMLCSpec; @@ -56,7 +57,7 @@ import org.springframework.util.Assert; /** - * A source module that receives data from RabbitMQ. + * Auto-configuration for supplier that receives data from RabbitMQ. * * @author Gary Russell * @author Chris Schaefer @@ -64,11 +65,11 @@ * @author Chris Bono * @author Artem Bilan */ -@Configuration(proxyBeanMethods = false) +@AutoConfiguration(after = RabbitAutoConfiguration.class) @EnableConfigurationProperties(RabbitSupplierProperties.class) public class RabbitSupplierConfiguration implements DisposableBean { - private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter() { + private static final MessagePropertiesConverter INBOUND_MESSAGE_PROPERTIES_CONVERTER = new DefaultMessagePropertiesConverter() { @Override public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) { @@ -105,7 +106,7 @@ public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelo @Bean public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbitSourceRetryInterceptor) { - ConnectionFactory connectionFactory = this.properties.isOwnConnection() ? buildLocalConnectionFactory() + ConnectionFactory connectionFactory = (this.properties.isOwnConnection()) ? buildLocalConnectionFactory() : this.rabbitConnectionFactory; SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setAutoStartup(false); @@ -140,7 +141,7 @@ public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbi if (this.properties.isEnableRetry()) { container.setAdviceChain(rabbitSourceRetryInterceptor); } - container.setMessagePropertiesConverter(inboundMessagePropertiesConverter); + container.setMessagePropertiesConverter(INBOUND_MESSAGE_PROPERTIES_CONVERTER); return container; } @@ -149,7 +150,7 @@ public Publisher> rabbitPublisher(SimpleMessageListenerContainer @Nullable ComponentCustomizer amqpMessageProducerCustomizer) { AmqpInboundChannelAdapterSMLCSpec messageProducerSpec = Amqp.inboundAdapter(container) - .mappedRequestHeaders(properties.getMappedRequestHeaders()); + .mappedRequestHeaders(this.properties.getMappedRequestHeaders()); if (amqpMessageProducerCustomizer != null) { amqpMessageProducerCustomizer.customize(messageProducerSpec); @@ -220,7 +221,7 @@ private static CachingConnectionFactory rabbitConnectionFactory(RabbitProperties CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer( properties); - cachingConnectionFactoryConfigurer.setConnectionNameStrategy(cf -> "rabbit.supplier.own.connection"); + cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.supplier.own.connection"); cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory); cachingConnectionFactory.afterPropertiesSet(); diff --git a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java index 3def1023..afbd3aa6 100644 --- a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java +++ b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,11 @@ import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +/** + * The RabbitMQ supplier configuration properties. + * + * @author Gary Russell + */ @ConfigurationProperties("rabbit.supplier") @Validated public class RabbitSupplierProperties { @@ -77,7 +82,7 @@ public class RabbitSupplierProperties { private boolean ownConnection; public boolean getRequeue() { - return requeue; + return this.requeue; } public void setRequeue(boolean requeue) { @@ -85,7 +90,7 @@ public void setRequeue(boolean requeue) { } public boolean getTransacted() { - return transacted; + return this.transacted; } public void setTransacted(boolean transacted) { @@ -95,7 +100,7 @@ public void setTransacted(boolean transacted) { @NotNull(message = "queue(s) are required") @Size(min = 1, message = "At least one queue is required") public String[] getQueues() { - return queues; + return this.queues; } public void setQueues(String[] queues) { @@ -104,7 +109,7 @@ public void setQueues(String[] queues) { @NotNull public String[] getMappedRequestHeaders() { - return mappedRequestHeaders; + return this.mappedRequestHeaders; } public void setMappedRequestHeaders(String[] mappedRequestHeaders) { @@ -112,7 +117,7 @@ public void setMappedRequestHeaders(String[] mappedRequestHeaders) { } public int getInitialRetryInterval() { - return initialRetryInterval; + return this.initialRetryInterval; } public void setInitialRetryInterval(int initialRetryInterval) { @@ -120,7 +125,7 @@ public void setInitialRetryInterval(int initialRetryInterval) { } public int getMaxRetryInterval() { - return maxRetryInterval; + return this.maxRetryInterval; } public void setMaxRetryInterval(int maxRetryInterval) { @@ -128,7 +133,7 @@ public void setMaxRetryInterval(int maxRetryInterval) { } public double getRetryMultiplier() { - return retryMultiplier; + return this.retryMultiplier; } public void setRetryMultiplier(double retryMultiplier) { @@ -136,7 +141,7 @@ public void setRetryMultiplier(double retryMultiplier) { } public int getMaxAttempts() { - return maxAttempts; + return this.maxAttempts; } public void setMaxAttempts(int maxAttempts) { @@ -144,7 +149,7 @@ public void setMaxAttempts(int maxAttempts) { } public boolean isEnableRetry() { - return enableRetry; + return this.enableRetry; } public void setEnableRetry(boolean enableRetry) { diff --git a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/package-info.java b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/package-info.java new file mode 100644 index 00000000..652febac --- /dev/null +++ b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/package-info.java @@ -0,0 +1,4 @@ +/** + * The RabbitMQ supplier auto-configuration support. + */ +package org.springframework.cloud.fn.supplier.rabbit; diff --git a/supplier/spring-rabbit-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/supplier/spring-rabbit-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..159ed225 --- /dev/null +++ b/supplier/spring-rabbit-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.supplier.rabbit.RabbitSupplierConfiguration diff --git a/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java b/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java new file mode 100644 index 00000000..10a197a4 --- /dev/null +++ b/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java @@ -0,0 +1,57 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.fn.supplier.rabbit; + +import java.time.Duration; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.junit.RabbitAvailable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.messaging.Message; + +@RabbitAvailable(RabbitSupplierTests.TEST_QUEUE) +@SpringBootTest(properties = "rabbit.supplier.queues=" + RabbitSupplierTests.TEST_QUEUE) +public class RabbitSupplierTests { + + static final String TEST_QUEUE = "test-supplier-queue"; + + @Test + void rabbitSupplierReceivesData(@Autowired RabbitTemplate rabbitTemplate, + @Autowired Supplier>> rabbitSupplier) { + + Flux mapped = rabbitSupplier.get().map(Message::getPayload).map(String::new); + StepVerifier stepVerifier = StepVerifier.create(mapped).expectNext("test1", "test2").thenCancel().verifyLater(); + + rabbitTemplate.convertAndSend(TEST_QUEUE, "test1".getBytes()); + rabbitTemplate.convertAndSend(TEST_QUEUE, "test2".getBytes()); + + stepVerifier.verify(Duration.ofSeconds(10)); + } + + @SpringBootApplication + public static class TestConfiguration { + + } + +}