Skip to content

Commit

Permalink
Use Testcontainers for RabbitMQ modules
Browse files Browse the repository at this point in the history
* Remove `ownConnection` feature from RabbitMQ modules.
It does not bring too much value over Spring Boot config.
Plus no one else modules tries to mimic such a behavior.
So, fully rely on Spring Boot auto-configuration for RabbitMQ as well.
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent f509b15 commit 26479fb
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 233 deletions.
2 changes: 1 addition & 1 deletion consumer/spring-rabbit-consumer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ dependencies {
api 'org.springframework.integration:spring-integration-amqp'
api 'org.springframework.boot:spring-boot-starter-amqp'

testImplementation 'org.springframework.amqp:spring-rabbit-junit'
testImplementation 'org.testcontainers:rabbitmq'
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,17 @@

import java.util.function.Consumer;

import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;

import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer;
import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ResourceLoader;
import org.springframework.expression.Expression;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.dsl.AmqpOutboundChannelAdapterSpec;
Expand All @@ -61,44 +46,22 @@
*/
@EnableConfigurationProperties(RabbitConsumerProperties.class)
@AutoConfiguration(after = RabbitAutoConfiguration.class)
public class RabbitConsumerConfiguration implements DisposableBean {

@Autowired
private RabbitProperties bootProperties;

@Autowired
private ResourceLoader resourceLoader;

@Autowired
private ObjectProvider<CredentialsProvider> credentialsProvider;

@Autowired
private ObjectProvider<CredentialsRefreshService> credentialsRefreshService;

@Autowired
private ObjectProvider<ConnectionFactoryCustomizer> connectionFactoryCustomizers;
public class RabbitConsumerConfiguration {

@Autowired
private RabbitConsumerProperties properties;

@Value("#{${rabbit.converterBeanName:null}}")
private MessageConverter messageConverter;

private CachingConnectionFactory ownConnectionFactory;

@Bean
public Consumer<Message<?>> rabbitConsumer(@Qualifier("amqpChannelAdapter") MessageHandler messageHandler) {
return messageHandler::handleMessage;
}

@Bean
public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbitConnectionFactory,
public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(RabbitTemplate rabbitTemplate,
@Nullable ComponentCustomizer<AmqpOutboundChannelAdapterSpec> amqpOutboundChannelAdapterSpecCustomizer)
throws Exception {

AmqpOutboundChannelAdapterSpec handler = Amqp
.outboundAdapter(rabbitTemplate(
(this.properties.isOwnConnection()) ? buildLocalConnectionFactory() : rabbitConnectionFactory))
AmqpOutboundChannelAdapterSpec handler = Amqp.outboundAdapter(rabbitTemplate)
.mappedRequestHeaders(this.properties.getMappedRequestHeaders())
.defaultDeliveryMode((this.properties.getPersistentDeliveryMode()) ? MessageDeliveryMode.PERSISTENT
: MessageDeliveryMode.NON_PERSISTENT)
Expand Down Expand Up @@ -127,65 +90,10 @@ public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbi
return handler;
}

private RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
if (this.messageConverter != null) {
rabbitTemplate.setMessageConverter(this.messageConverter);
}
return rabbitTemplate;
}

@Bean
@ConditionalOnProperty(name = "rabbit.converterBeanName", havingValue = RabbitConsumerProperties.JSON_CONVERTER)
public Jackson2JsonMessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}

@Override
public void destroy() {
if (this.ownConnectionFactory != null) {
this.ownConnectionFactory.destroy();
}
}

private ConnectionFactory buildLocalConnectionFactory() throws Exception {
this.ownConnectionFactory = rabbitConnectionFactory(this.bootProperties, this.resourceLoader,
this.credentialsProvider, this.credentialsRefreshService, this.connectionFactoryCustomizers);
return this.ownConnectionFactory;
}

private CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ResourceLoader resourceLoader,
ObjectProvider<CredentialsProvider> credentialsProvider,
ObjectProvider<CredentialsRefreshService> credentialsRefreshService,
ObjectProvider<ConnectionFactoryCustomizer> connectionFactoryCustomizers) throws Exception {

/*
* NOTE: This is based on RabbitAutoConfiguration.RabbitConnectionFactoryCreator
* https://github.com/spring-projects/spring-boot/blob/
* c820ad01a108d419d8548265b8a34ed7c5591f7c/spring-boot-project/spring-boot-
* autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/
* RabbitAutoConfiguration.java#L95 [UPGRADE_CONSIDERATION] this should stay
* somewhat in sync w/ the functionality provided by its original source.
*/
RabbitConnectionFactoryBean connectionFactoryBean = new RabbitConnectionFactoryBean();
RabbitConnectionFactoryBeanConfigurer connectionFactoryBeanConfigurer = new RabbitConnectionFactoryBeanConfigurer(
resourceLoader, properties);
connectionFactoryBeanConfigurer.setCredentialsProvider(credentialsProvider.getIfUnique());
connectionFactoryBeanConfigurer.setCredentialsRefreshService(credentialsRefreshService.getIfUnique());
connectionFactoryBeanConfigurer.configure(connectionFactoryBean);
connectionFactoryBean.afterPropertiesSet();

com.rabbitmq.client.ConnectionFactory connectionFactory = connectionFactoryBean.getObject();
connectionFactoryCustomizers.orderedStream().forEach((customizer) -> customizer.customize(connectionFactory));

CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer(
properties);
cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.sink.own.connection");
cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory);
cachingConnectionFactory.afterPropertiesSet();

return cachingConnectionFactory;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ public class RabbitConsumerProperties {
*/
private String converterBeanName;

/**
* When true, use a separate connection based on the boot properties.
*/
private boolean ownConnection;

/**
* When mapping headers for the outbound message, determine whether the headers are
* mapped before the message is converted, or afterward.
Expand Down Expand Up @@ -146,14 +141,6 @@ public boolean isRoutingKeyProvided() {
return this.routingKey != null || this.routingKeyExpression != null;
}

public boolean isOwnConnection() {
return this.ownConnection;
}

public void setOwnConnection(boolean ownConnection) {
this.ownConnection = ownConnection;
}

public boolean isHeadersMappedLast() {
return this.headersMappedLast;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,21 @@

import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.annotation.DirtiesContext;

import static org.assertj.core.api.Assertions.assertThat;

@RabbitAvailable(RabbitConsumerTests.TEST_QUEUE)
@SpringBootTest(properties = { "rabbit.consumer.routingKey=" + RabbitConsumerTests.TEST_QUEUE,
"rabbit.consumer.own-connection=true" })
public class RabbitConsumerTests {
@SpringBootTest(properties = "rabbit.consumer.routingKey=" + RabbitConsumerTests.TEST_QUEUE)
@DirtiesContext
public class RabbitConsumerTests implements RabbitTestContainer {

static final String TEST_QUEUE = "test-consumer-queue";

Expand All @@ -53,6 +54,11 @@ void rabbitSupplierReceivesData(@Autowired RabbitTemplate rabbitTemplate,
@SpringBootApplication
public static class TestConfiguration {

@Bean
Queue testQueue() {
return new Queue(TEST_QUEUE);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.fn.consumer.rabbit;

import java.io.IOException;

import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Testcontainers;

import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;

/**
* Provides a static {@link RabbitMQContainer} that can be shared across test classes.
*
* @author Chris Bono
* @author Gary Russell
* @author Artem Bilan
*/
@Testcontainers(disabledWithoutDocker = true)
public interface RabbitTestContainer {

RabbitMQContainer RABBITMQ = new RabbitMQContainer("rabbitmq").withExposedPorts(5672, 5552);

@BeforeAll
static void startContainer() throws IOException, InterruptedException {
RABBITMQ.start();
RABBITMQ.execInContainer("rabbitmq-plugins", "enable", "rabbitmq_stream");
}

@DynamicPropertySource
static void RabbitMqProperties(DynamicPropertyRegistry propertyRegistry) {
propertyRegistry.add("spring.rabbitmq.port", () -> RABBITMQ.getMappedPort(5672));
propertyRegistry.add("spring.rabbitmq.stream.port", () -> RABBITMQ.getMappedPort(5552));
}

}
3 changes: 2 additions & 1 deletion supplier/spring-rabbit-supplier/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ dependencies {
api 'org.springframework.integration:spring-integration-amqp'
api 'org.springframework.boot:spring-boot-starter-amqp'

testImplementation 'org.springframework.amqp:spring-rabbit-junit'
testImplementation 'org.testcontainers:rabbitmq'
testImplementation project(':spring-rabbit-consumer').sourceSets.test.output
}
Loading

0 comments on commit 26479fb

Please sign in to comment.