Skip to content

Commit

Permalink
Make RabbitMQ modules as auto-configuration
Browse files Browse the repository at this point in the history
* Fix all the Checkstyle violations and compiler warnings
* Add tests to these modules
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent 80e0b80 commit f509b15
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 43 deletions.
4 changes: 2 additions & 2 deletions consumer/spring-rabbit-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ A consumer that allows you to send messages to RabbitMQ.

## Beans for injection

You can import `RabbitConsumerConfiguration` in the application and then inject the following bean.
The `RabbitConsumerConfiguration` auto-configuration provides the following bean:

`Function<Message<?>, Object> rabbitConsumer`

You can use `rabbitConsumer` as a qualifier when injecting.

## Configuration Options

All configuration properties are prefixed with `rabbit`.
All configuration properties are prefixed with `rabbit.consumer`.

For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/consumer/rabbit/RabbitConsumerProperties.java[RabbitConsumerProperties].

Expand Down
3 changes: 2 additions & 1 deletion consumer/spring-rabbit-consumer/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
api 'org.springframework.integration:spring-integration-amqp'
api 'org.springframework.boot:spring-boot-starter-amqp'
api 'org.springframework.boot:spring-boot-starter-web'

testImplementation 'org.springframework.amqp:spring-rabbit-junit'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package org.springframework.cloud.fn.consumer.rabbit;

import java.util.function.Function;
import java.util.function.Consumer;

import com.rabbitmq.client.impl.CredentialsProvider;
import com.rabbitmq.client.impl.CredentialsRefreshService;
Expand All @@ -33,15 +33,16 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer;
import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import org.springframework.expression.Expression;
import org.springframework.integration.amqp.dsl.Amqp;
Expand All @@ -51,15 +52,15 @@
import org.springframework.messaging.MessageHandler;

/**
* A configuration for RabbitMQ Consumer function. Uses a
* Auto-configuration for RabbitMQ Consumer function. Uses a
* {@link AmqpOutboundChannelAdapterSpec} to save payload contents to RabbitMQ.
*
* @author Soby Chako
* @author Nicolas Labrot
* @author Chris Bono
*/
@EnableConfigurationProperties(RabbitConsumerProperties.class)
@Configuration
@AutoConfiguration(after = RabbitAutoConfiguration.class)
public class RabbitConsumerConfiguration implements DisposableBean {

@Autowired
Expand All @@ -86,11 +87,8 @@ public class RabbitConsumerConfiguration implements DisposableBean {
private CachingConnectionFactory ownConnectionFactory;

@Bean
public Function<Message<?>, Object> rabbitConsumer(@Qualifier("amqpChannelAdapter") MessageHandler messageHandler) {
return o -> {
messageHandler.handleMessage(o);
return "";
};
public Consumer<Message<?>> rabbitConsumer(@Qualifier("amqpChannelAdapter") MessageHandler messageHandler) {
return messageHandler::handleMessage;
}

@Bean
Expand All @@ -100,9 +98,9 @@ public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbi

AmqpOutboundChannelAdapterSpec handler = Amqp
.outboundAdapter(rabbitTemplate(
this.properties.isOwnConnection() ? buildLocalConnectionFactory() : rabbitConnectionFactory))
.mappedRequestHeaders(properties.getMappedRequestHeaders())
.defaultDeliveryMode(properties.getPersistentDeliveryMode() ? MessageDeliveryMode.PERSISTENT
(this.properties.isOwnConnection()) ? buildLocalConnectionFactory() : rabbitConnectionFactory))
.mappedRequestHeaders(this.properties.getMappedRequestHeaders())
.defaultDeliveryMode((this.properties.getPersistentDeliveryMode()) ? MessageDeliveryMode.PERSISTENT
: MessageDeliveryMode.NON_PERSISTENT)
.headersMappedLast(this.properties.isHeadersMappedLast());

Expand All @@ -129,8 +127,7 @@ public AmqpOutboundChannelAdapterSpec amqpChannelAdapter(ConnectionFactory rabbi
return handler;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
private RabbitTemplate rabbitTemplate(ConnectionFactory rabbitConnectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
if (this.messageConverter != null) {
rabbitTemplate.setMessageConverter(this.messageConverter);
Expand Down Expand Up @@ -184,7 +181,7 @@ private CachingConnectionFactory rabbitConnectionFactory(RabbitProperties proper
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer(
properties);
cachingConnectionFactoryConfigurer.setConnectionNameStrategy(cf -> "rabbit.sink.own.connection");
cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.sink.own.connection");
cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory);
cachingConnectionFactory.afterPropertiesSet();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2020 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,12 @@
import org.springframework.expression.Expression;
import org.springframework.validation.annotation.Validated;

@ConfigurationProperties("rabbit")
/**
* Configuration properties for RabbitMQ consumer.
*
* @author Soby Chacko
*/
@ConfigurationProperties("rabbit.consumer")
@Validated
public class RabbitConsumerProperties {

Expand Down Expand Up @@ -76,7 +81,7 @@ public class RabbitConsumerProperties {

/**
* When mapping headers for the outbound message, determine whether the headers are
* mapped before the message is converted, or afterwards.
* mapped before the message is converted, or afterward.
*/
private boolean headersMappedLast = true;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The RabbitMQ consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.rabbit;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.rabbit.RabbitConsumerConfiguration
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.fn.consumer.rabbit;

import java.util.function.Consumer;

import org.junit.jupiter.api.Test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

import static org.assertj.core.api.Assertions.assertThat;

@RabbitAvailable(RabbitConsumerTests.TEST_QUEUE)
@SpringBootTest(properties = { "rabbit.consumer.routingKey=" + RabbitConsumerTests.TEST_QUEUE,
"rabbit.consumer.own-connection=true" })
public class RabbitConsumerTests {

static final String TEST_QUEUE = "test-consumer-queue";

@Test
void rabbitSupplierReceivesData(@Autowired RabbitTemplate rabbitTemplate,
@Autowired Consumer<Message<?>> rabbitConsumer) {

rabbitConsumer.accept(new GenericMessage<>("test data1"));
rabbitConsumer.accept(new GenericMessage<>("test data2"));

Object received = rabbitTemplate.receiveAndConvert(TEST_QUEUE, 10_000);
assertThat(received).isEqualTo("test data1");
received = rabbitTemplate.receiveAndConvert(TEST_QUEUE, 10_000);
assertThat(received).isEqualTo("test data2");
}

@SpringBootApplication
public static class TestConfiguration {

}

}
2 changes: 1 addition & 1 deletion supplier/spring-rabbit-supplier/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data.

## Beans for injection

You can import the `RabbitSupplierConfiguration` in the application and then inject the following bean.
The `RabbitSupplierConfiguration` auto-configuration provides the following bean:

`rabbitSupplier`

Expand Down
3 changes: 2 additions & 1 deletion supplier/spring-rabbit-supplier/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
api 'org.springframework.integration:spring-integration-amqp'
api 'org.springframework.boot:spring-boot-starter-amqp'
api 'org.springframework.boot:spring-boot-starter-web'

testImplementation 'org.springframework.amqp:spring-rabbit-junit'
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,14 +38,15 @@
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.CachingConnectionFactoryConfigurer;
import org.springframework.boot.autoconfigure.amqp.ConnectionFactoryCustomizer;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
import org.springframework.boot.autoconfigure.amqp.RabbitConnectionFactoryBeanConfigurer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.amqp.dsl.AmqpInboundChannelAdapterSMLCSpec;
Expand All @@ -56,19 +57,19 @@
import org.springframework.util.Assert;

/**
* A source module that receives data from RabbitMQ.
* Auto-configuration for supplier that receives data from RabbitMQ.
*
* @author Gary Russell
* @author Chris Schaefer
* @author Roger Perez
* @author Chris Bono
* @author Artem Bilan
*/
@Configuration(proxyBeanMethods = false)
@AutoConfiguration(after = RabbitAutoConfiguration.class)
@EnableConfigurationProperties(RabbitSupplierProperties.class)
public class RabbitSupplierConfiguration implements DisposableBean {

private static final MessagePropertiesConverter inboundMessagePropertiesConverter = new DefaultMessagePropertiesConverter() {
private static final MessagePropertiesConverter INBOUND_MESSAGE_PROPERTIES_CONVERTER = new DefaultMessagePropertiesConverter() {

@Override
public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelope envelope, String charset) {
Expand Down Expand Up @@ -105,7 +106,7 @@ public MessageProperties toMessageProperties(AMQP.BasicProperties source, Envelo

@Bean
public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbitSourceRetryInterceptor) {
ConnectionFactory connectionFactory = this.properties.isOwnConnection() ? buildLocalConnectionFactory()
ConnectionFactory connectionFactory = (this.properties.isOwnConnection()) ? buildLocalConnectionFactory()
: this.rabbitConnectionFactory;
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setAutoStartup(false);
Expand Down Expand Up @@ -140,7 +141,7 @@ public SimpleMessageListenerContainer container(RetryOperationsInterceptor rabbi
if (this.properties.isEnableRetry()) {
container.setAdviceChain(rabbitSourceRetryInterceptor);
}
container.setMessagePropertiesConverter(inboundMessagePropertiesConverter);
container.setMessagePropertiesConverter(INBOUND_MESSAGE_PROPERTIES_CONVERTER);
return container;
}

Expand All @@ -149,7 +150,7 @@ public Publisher<Message<byte[]>> rabbitPublisher(SimpleMessageListenerContainer
@Nullable ComponentCustomizer<AmqpInboundChannelAdapterSMLCSpec> amqpMessageProducerCustomizer) {

AmqpInboundChannelAdapterSMLCSpec messageProducerSpec = Amqp.inboundAdapter(container)
.mappedRequestHeaders(properties.getMappedRequestHeaders());
.mappedRequestHeaders(this.properties.getMappedRequestHeaders());

if (amqpMessageProducerCustomizer != null) {
amqpMessageProducerCustomizer.customize(messageProducerSpec);
Expand Down Expand Up @@ -220,7 +221,7 @@ private static CachingConnectionFactory rabbitConnectionFactory(RabbitProperties
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
CachingConnectionFactoryConfigurer cachingConnectionFactoryConfigurer = new CachingConnectionFactoryConfigurer(
properties);
cachingConnectionFactoryConfigurer.setConnectionNameStrategy(cf -> "rabbit.supplier.own.connection");
cachingConnectionFactoryConfigurer.setConnectionNameStrategy((cf) -> "rabbit.supplier.own.connection");
cachingConnectionFactoryConfigurer.configure(cachingConnectionFactory);
cachingConnectionFactory.afterPropertiesSet();

Expand Down
Loading

0 comments on commit f509b15

Please sign in to comment.