diff --git a/consumer/spring-rabbit-consumer/build.gradle b/consumer/spring-rabbit-consumer/build.gradle index baa5440a..604573b7 100644 --- a/consumer/spring-rabbit-consumer/build.gradle +++ b/consumer/spring-rabbit-consumer/build.gradle @@ -2,5 +2,5 @@ dependencies { api 'org.springframework.integration:spring-integration-amqp' api 'org.springframework.boot:spring-boot-starter-amqp' - testImplementation 'org.springframework.amqp:spring-rabbit-junit' + testImplementation 'org.testcontainers:rabbitmq' } diff --git a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java index 147eeba7..f8abe432 100644 --- a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java +++ b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerConfiguration.java @@ -18,32 +18,17 @@ import java.util.function.Consumer; -import com.rabbitmq.client.impl.CredentialsProvider; -import com.rabbitmq.client.impl.CredentialsRefreshService; - import org.springframework.amqp.core.MessageDeliveryMode; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer; -import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; -import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer; -import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.context.annotation.Bean; -import org.springframework.core.io.ResourceLoader; import org.springframework.expression.Expression; import org.springframework.integration.amqp.dsl.Amqp; import org.springframework.integration.amqp.dsl.AmqpOutboundChannelAdapterSpec; @@ -61,44 +46,22 @@ */ @EnableConfigurationProperties(RabbitConsumerProperties.class) @AutoConfiguration(after = RabbitAutoConfiguration.class) -public class RabbitConsumerConfiguration implements DisposableBean { - - @Autowired - private RabbitProperties bootProperties; - - @Autowired - private ResourceLoader resourceLoader; - - @Autowired - private ObjectProvider credentialsProvider; - - @Autowired - private ObjectProvider credentialsRefreshService; - - @Autowired - private ObjectProvider connectionFactoryCustomizers; +public class RabbitConsumerConfiguration { @Autowired private RabbitConsumerProperties properties; - @Value("#{${rabbit.converterBeanName:null}}") - private MessageConverter messageConverter; - - private CachingConnectionFactory ownConnectionFactory; - @Bean public Consumer> rabbitConsumer(@Qualifier("amqpChannelAdapter") MessageHandler messageHandler) { return messageHandler::handleMessage; } @Bean - public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbitConnectionFactory, + public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(RabbitTemplate rabbitTemplate, @Nullable ComponentCustomizer amqpOutboundChannelAdapterSpecCustomizer) throws Exception { - AmqpOutboundChannelAdapterSpec handler = Amqp - .outboundAdapter(rabbitTemplate( - (this.properties.isOwnConnection()) ? buildLocalConnectionFactory() : rabbitConnectionFactory)) + AmqpOutboundChannelAdapterSpec handler = Amqp.outboundAdapter(rabbitTemplate) .mappedRequestHeaders(this.properties.getMappedRequestHeaders()) .defaultDeliveryMode((this.properties.getPersistentDeliveryMode()) ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT) @@ -127,65 +90,10 @@ public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbi return handler; } - private RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) { - RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory); - if (this.messageConverter != null) { - rabbitTemplate.setMessageConverter(this.messageConverter); - } - return rabbitTemplate; - } - @Bean @ConditionalOnProperty(name = "rabbit.converterBeanName", havingValue = RabbitConsumerProperties.JSON_CONVERTER) public Jackson2JsonMessageConverter jsonConverter() { return new Jackson2JsonMessageConverter(); } - @Override - public void destroy() { - if (this.ownConnectionFactory != null) { - this.ownConnectionFactory.destroy(); - } - } - - private ConnectionFactory buildLocalConnectionFactory() throws Exception { - this.ownConnectionFactory = rabbitConnectionFactory(this.bootProperties, this.resourceLoader, - this.credentialsProvider, this.credentialsRefreshService, this.connectionFactoryCustomizers); - return this.ownConnectionFactory; - } - - private CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ResourceLoader resourceLoader, - ObjectProvider credentialsProvider, - ObjectProvider credentialsRefreshService, - ObjectProvider connectionFactoryCustomizers) throws Exception { - - /* - * NOTE: This is based on RabbitAutoConfiguration.RabbitConnectionFactoryCreator - * https://github.com/spring-projects/spring-boot/blob/ - * c820ad01a108d419d8548265b8a34ed7c5591f7c/spring-boot-project/spring-boot- - * autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/ - * RabbitAutoConfiguration.java#L95 [UPGRADE_CONSIDERATION] this should stay - * somewhat in sync w/ the functionality provided by its original source. - */ - RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean(); - RabbitConnectionFactoryBeanConfigurer connectionFactoryBeanConfigurer = new RabbitConnectionFactoryBeanConfigurer( - resourceLoader, properties); - connectionFactoryBeanConfigurer.setCredentialsProvider(credentialsProvider.getIfUnique()); - connectionFactoryBeanConfigurer.setCredentialsRefreshService(credentialsRefreshService.getIfUnique()); - connectionFactoryBeanConfigurer.configure(connectionFactoryBean); - connectionFactoryBean.afterPropertiesSet(); - - com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject(); - connectionFactoryCustomizers.orderedStream().forEach((customizer) -> customizer.customize(connectionFactory)); - - CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); - CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer( - properties); - cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.sink.own.connection"); - cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory); - cachingConnectionFactory.afterPropertiesSet(); - - return cachingConnectionFactory; - } - } diff --git a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java index 8b0c9b30..779e0779 100644 --- a/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java +++ b/consumer/spring-rabbit-consumer/src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java @@ -74,11 +74,6 @@ public class RabbitConsumerProperties { */ private String converterBeanName; - /** - * When true, use a separate connection based on the boot properties. - */ - private boolean ownConnection; - /** * When mapping headers for the outbound message, determine whether the headers are * mapped before the message is converted, or afterward. @@ -146,14 +141,6 @@ public boolean isRoutingKeyProvided() { return this.routingKey != null || this.routingKeyExpression != null; } - public boolean isOwnConnection() { - return this.ownConnection; - } - - public void setOwnConnection(boolean ownConnection) { - this.ownConnection = ownConnection; - } - public boolean isHeadersMappedLast() { return this.headersMappedLast; } diff --git a/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java b/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java index 2c66c45d..20ba96e2 100644 --- a/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java +++ b/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerTests.java @@ -20,20 +20,21 @@ import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.rabbit.junit.RabbitAvailable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.annotation.DirtiesContext; import static org.assertj.core.api.Assertions.assertThat; -@RabbitAvailable(RabbitConsumerTests.TEST_QUEUE) -@SpringBootTest(properties = { "rabbit.consumer.routingKey=" + RabbitConsumerTests.TEST_QUEUE, - "rabbit.consumer.own-connection=true" }) -public class RabbitConsumerTests { +@SpringBootTest(properties = "rabbit.consumer.routingKey=" + RabbitConsumerTests.TEST_QUEUE) +@DirtiesContext +public class RabbitConsumerTests implements RabbitTestContainer { static final String TEST_QUEUE = "test-consumer-queue"; @@ -53,6 +54,11 @@ void rabbitSupplierReceivesData(@Autowired RabbitTemplate rabbitTemplate, @SpringBootApplication public static class TestConfiguration { + @Bean + Queue testQueue() { + return new Queue(TEST_QUEUE); + } + } } diff --git a/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitTestContainer.java b/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitTestContainer.java new file mode 100644 index 00000000..6f39e2d4 --- /dev/null +++ b/consumer/spring-rabbit-consumer/src/test/java/org/springframework/cloud/fn/consumer/rabbit/RabbitTestContainer.java @@ -0,0 +1,52 @@ +/* + * Copyright 2022-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.fn.consumer.rabbit; + +import java.io.IOException; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; + +/** + * Provides a static {@link RabbitMQContainer} that can be shared across test classes. + * + * @author Chris Bono + * @author Gary Russell + * @author Artem Bilan + */ +@Testcontainers(disabledWithoutDocker = true) +public interface RabbitTestContainer { + + RabbitMQContainer RABBITMQ = new RabbitMQContainer("rabbitmq").withExposedPorts(5672, 5552); + + @BeforeAll + static void startContainer() throws IOException, InterruptedException { + RABBITMQ.start(); + RABBITMQ.execInContainer("rabbitmq-plugins", "enable", "rabbitmq_stream"); + } + + @DynamicPropertySource + static void RabbitMqProperties(DynamicPropertyRegistry propertyRegistry) { + propertyRegistry.add("spring.rabbitmq.port", () -> RABBITMQ.getMappedPort(5672)); + propertyRegistry.add("spring.rabbitmq.stream.port", () -> RABBITMQ.getMappedPort(5552)); + } + +} diff --git a/supplier/spring-rabbit-supplier/build.gradle b/supplier/spring-rabbit-supplier/build.gradle index baa5440a..8bd14cdc 100644 --- a/supplier/spring-rabbit-supplier/build.gradle +++ b/supplier/spring-rabbit-supplier/build.gradle @@ -2,5 +2,6 @@ dependencies { api 'org.springframework.integration:spring-integration-amqp' api 'org.springframework.boot:spring-boot-starter-amqp' - testImplementation 'org.springframework.amqp:spring-rabbit-junit' + testImplementation 'org.testcontainers:rabbitmq' + testImplementation project(':spring-rabbit-consumer').sourceSets.test.output } diff --git a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java index 7e584ba6..96488126 100644 --- a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java +++ b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierConfiguration.java @@ -20,34 +20,23 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.impl.CredentialsProvider; -import com.rabbitmq.client.impl.CredentialsRefreshService; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder; -import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer; import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; import org.springframework.amqp.rabbit.support.MessagePropertiesConverter; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.ObjectProvider; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer; -import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer; import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; -import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer; import org.springframework.boot.autoconfigure.amqp.RabbitProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.context.annotation.Bean; -import org.springframework.core.io.ResourceLoader; import org.springframework.integration.amqp.dsl.Amqp; import org.springframework.integration.amqp.dsl.AmqpInboundChannelAdapterSMLCSpec; import org.springframework.integration.dsl.IntegrationFlow; @@ -67,7 +56,7 @@ */ @AutoConfiguration(after = RabbitAutoConfiguration.class) @EnableConfigurationProperties(RabbitSupplierProperties.class) -public class RabbitSupplierConfiguration implements DisposableBean { +public class RabbitSupplierConfiguration { private static final MessagePropertiesConverter INBOUND_MESSAGE_PROPERTIES_CONVERTER = new DefaultMessagePropertiesConverter() { @@ -81,36 +70,14 @@ public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelo }; - @Autowired - private RabbitProperties rabbitProperties; - - @Autowired - private ResourceLoader resourceLoader; - - @Autowired - private ObjectProvider credentialsProvider; - - @Autowired - private ObjectProvider credentialsRefreshService; - - @Autowired - private ObjectProvider connectionFactoryCustomizers; - - @Autowired - private RabbitSupplierProperties properties; - - @Autowired - private ConnectionFactory rabbitConnectionFactory; - - private CachingConnectionFactory ownConnectionFactory; - @Bean - public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbitSourceRetryInterceptor) { - ConnectionFactory connectionFactory = (this.properties.isOwnConnection()) ? buildLocalConnectionFactory() - : this.rabbitConnectionFactory; + public SimpleMessageListenerContainer container(RabbitProperties rabbitProperties, + RabbitSupplierProperties rabbitSupplierProperties, ConnectionFactory connectionFactory, + RetryOperationsInterceptor rabbitSourceRetryInterceptor) { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setAutoStartup(false); - RabbitProperties.SimpleContainer simpleContainer = this.rabbitProperties.getListener().getSimple(); + RabbitProperties.SimpleContainer simpleContainer = rabbitProperties.getListener().getSimple(); AcknowledgeMode acknowledgeMode = simpleContainer.getAcknowledgeMode(); if (acknowledgeMode != null) { @@ -132,13 +99,13 @@ public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbi if (transactionSize != null) { container.setBatchSize(transactionSize); } - container.setDefaultRequeueRejected(this.properties.getRequeue()); - container.setChannelTransacted(this.properties.getTransacted()); - String[] queues = this.properties.getQueues(); + container.setDefaultRequeueRejected(rabbitSupplierProperties.getRequeue()); + container.setChannelTransacted(rabbitSupplierProperties.getTransacted()); + String[] queues = rabbitSupplierProperties.getQueues(); Assert.state(queues.length > 0, "At least one queue is required"); Assert.noNullElements(queues, "queues cannot have null elements"); container.setQueueNames(queues); - if (this.properties.isEnableRetry()) { + if (rabbitSupplierProperties.isEnableRetry()) { container.setAdviceChain(rabbitSourceRetryInterceptor); } container.setMessagePropertiesConverter(INBOUND_MESSAGE_PROPERTIES_CONVERTER); @@ -147,10 +114,11 @@ public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbi @Bean public Publisher> rabbitPublisher(SimpleMessageListenerContainer container, + RabbitSupplierProperties rabbitSupplierProperties, @Nullable ComponentCustomizer amqpMessageProducerCustomizer) { AmqpInboundChannelAdapterSMLCSpec messageProducerSpec = Amqp.inboundAdapter(container) - .mappedRequestHeaders(this.properties.getMappedRequestHeaders()); + .mappedRequestHeaders(rabbitSupplierProperties.getMappedRequestHeaders()); if (amqpMessageProducerCustomizer != null) { amqpMessageProducerCustomizer.customize(messageProducerSpec); @@ -165,67 +133,13 @@ public Supplier>> rabbitSupplier(Publisher> } @Bean - public RetryOperationsInterceptor rabbitSourceRetryInterceptor() { + public RetryOperationsInterceptor rabbitSourceRetryInterceptor(RabbitSupplierProperties rabbitSupplierProperties) { return RetryInterceptorBuilder.stateless() - .maxAttempts(this.properties.getMaxAttempts()) - .backOffOptions(this.properties.getInitialRetryInterval(), this.properties.getRetryMultiplier(), - this.properties.getMaxRetryInterval()) + .maxAttempts(rabbitSupplierProperties.getMaxAttempts()) + .backOffOptions(rabbitSupplierProperties.getInitialRetryInterval(), + rabbitSupplierProperties.getRetryMultiplier(), rabbitSupplierProperties.getMaxRetryInterval()) .recoverer(new RejectAndDontRequeueRecoverer()) .build(); } - @Override - public void destroy() { - if (this.ownConnectionFactory != null) { - this.ownConnectionFactory.destroy(); - } - } - - private ConnectionFactory buildLocalConnectionFactory() { - try { - this.ownConnectionFactory = rabbitConnectionFactory(this.rabbitProperties, this.resourceLoader, - this.credentialsProvider, this.credentialsRefreshService, this.connectionFactoryCustomizers); - - } - catch (Exception exception) { - throw new IllegalStateException("Error building connection factory", exception); - } - - return this.ownConnectionFactory; - } - - /* - * NOTE: This is based on RabbitAutoConfiguration.RabbitConnectionFactoryCreator - * https://github.com/spring-projects/spring-boot/blob/ - * c820ad01a108d419d8548265b8a34ed7c5591f7c/spring-boot-project/spring-boot- - * autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/ - * RabbitAutoConfiguration.java#L95 [UPGRADE_CONSIDERATION] this should stay somewhat - * in sync w/ the functionality provided by its original source. - */ - private static CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, - ResourceLoader resourceLoader, ObjectProvider credentialsProvider, - ObjectProvider credentialsRefreshService, - ObjectProvider connectionFactoryCustomizers) throws Exception { - - RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean(); - RabbitConnectionFactoryBeanConfigurer connectionFactoryBeanConfigurer = new RabbitConnectionFactoryBeanConfigurer( - resourceLoader, properties); - connectionFactoryBeanConfigurer.setCredentialsProvider(credentialsProvider.getIfUnique()); - connectionFactoryBeanConfigurer.setCredentialsRefreshService(credentialsRefreshService.getIfUnique()); - connectionFactoryBeanConfigurer.configure(connectionFactoryBean); - connectionFactoryBean.afterPropertiesSet(); - - com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject(); - connectionFactoryCustomizers.orderedStream().forEach((customizer) -> customizer.customize(connectionFactory)); - - CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); - CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer( - properties); - cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.supplier.own.connection"); - cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory); - cachingConnectionFactory.afterPropertiesSet(); - - return cachingConnectionFactory; - } - } diff --git a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java index afbd3aa6..1b4c3630 100644 --- a/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java +++ b/supplier/spring-rabbit-supplier/src/main/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierProperties.java @@ -76,11 +76,6 @@ public class RabbitSupplierProperties { */ private boolean enableRetry = false; - /** - * When true, use a separate connection based on the boot properties. - */ - private boolean ownConnection; - public boolean getRequeue() { return this.requeue; } @@ -156,12 +151,4 @@ public void setEnableRetry(boolean enableRetry) { this.enableRetry = enableRetry; } - public boolean isOwnConnection() { - return this.ownConnection; - } - - public void setOwnConnection(boolean ownConnection) { - this.ownConnection = ownConnection; - } - } diff --git a/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java b/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java index 10a197a4..25751ba8 100644 --- a/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java +++ b/supplier/spring-rabbit-supplier/src/test/java/org/springframework/cloud/fn/supplier/rabbit/RabbitSupplierTests.java @@ -23,16 +23,19 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.rabbit.junit.RabbitAvailable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.fn.consumer.rabbit.RabbitTestContainer; +import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; -@RabbitAvailable(RabbitSupplierTests.TEST_QUEUE) @SpringBootTest(properties = "rabbit.supplier.queues=" + RabbitSupplierTests.TEST_QUEUE) -public class RabbitSupplierTests { +@DirtiesContext +public class RabbitSupplierTests implements RabbitTestContainer { static final String TEST_QUEUE = "test-supplier-queue"; @@ -52,6 +55,11 @@ void rabbitSupplierReceivesData(@Autowired RabbitTemplate rabbitTemplate, @SpringBootApplication public static class TestConfiguration { + @Bean + Queue testQueue() { + return new Queue(TEST_QUEUE); + } + } }