Skip to content

Commit

Permalink
Make JMS supplier as auto-config
Browse files Browse the repository at this point in the history
* Fix all its Checkstyle violations
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent a0e6d7c commit 84aa306
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 38 deletions.
2 changes: 1 addition & 1 deletion supplier/spring-jms-supplier/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and then receive the data.

## Beans for injection

You can import the `JmsSupplierConfiguration` in the application and then inject the following bean.
The `JmsSupplierConfiguration` auto-configuration provides the following bean:

`jmsSupplier`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,11 +23,14 @@
import reactor.core.publisher.Flux;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.jms.AcknowledgeMode;
import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.JmsProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.JavaUtils;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.integration.jms.dsl.JmsMessageDrivenChannelAdapterSpec;
Expand All @@ -37,7 +40,14 @@
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

@Configuration(proxyBeanMethods = false)
/**
* Auto-configuration for JMS supplier.
*
* @author Gary Russell
* @author Soby Chako
* @author Artem Bilan
*/
@AutoConfiguration(after = JmsAutoConfiguration.class)
@EnableConfigurationProperties(JmsSupplierProperties.class)
public class JmsSupplierConfiguration {

Expand Down Expand Up @@ -71,47 +81,46 @@ public Publisher<Message<byte[]>> jmsPublisher(AbstractMessageListenerContainer
@Bean
public AbstractMessageListenerContainer container() {
AbstractMessageListenerContainer container;

JmsProperties.Listener listenerProperties = this.jmsProperties.getListener();
Integer concurrency = listenerProperties.getMinConcurrency();
if (this.properties.isSessionTransacted()) {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setSessionTransacted(true);
if (listenerProperties.getConcurrency() != null) {
dmlc.setConcurrentConsumers(listenerProperties.getConcurrency());
if (concurrency != null) {
dmlc.setConcurrentConsumers(concurrency);
}
if (listenerProperties.getMaxConcurrency() != null) {
dmlc.setMaxConcurrentConsumers(listenerProperties.getMaxConcurrency());
Integer maxConcurrency = listenerProperties.getMaxConcurrency();
if (maxConcurrency != null) {
dmlc.setMaxConcurrentConsumers(maxConcurrency);
}
container = dmlc;
}
else {
SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer();
smlc.setSessionTransacted(false);
if (listenerProperties != null && listenerProperties.getConcurrency() != null) {
smlc.setConcurrentConsumers(listenerProperties.getConcurrency());
if (concurrency != null) {
smlc.setConcurrentConsumers(concurrency);
}
container = smlc;
}

container.setConnectionFactory(this.connectionFactory);
if (this.properties.getClientId() != null) {
container.setClientId(this.properties.getClientId());
}
container.setDestinationName(this.properties.getDestination());
if (this.properties.getMessageSelector() != null) {
container.setMessageSelector(this.properties.getMessageSelector());
}
container.setPubSubDomain(this.jmsProperties.isPubSubDomain());
if (this.properties.getMessageSelector() != null && listenerProperties.getAcknowledgeMode() != null) {
container.setSessionAcknowledgeMode(listenerProperties.getAcknowledgeMode().getMode());
}
if (this.properties.getSubscriptionDurable() != null) {
container.setSubscriptionDurable(this.properties.getSubscriptionDurable());
}
if (this.properties.getSubscriptionName() != null) {
container.setSubscriptionName(this.properties.getSubscriptionName());
}
if (this.properties.getSubscriptionShared() != null) {
container.setSubscriptionShared(this.properties.getSubscriptionShared());

String messageSelector = this.properties.getMessageSelector();
AcknowledgeMode acknowledgeMode = listenerProperties.getSession().getAcknowledgeMode();
if (messageSelector != null && acknowledgeMode != null) {
container.setSessionAcknowledgeMode(acknowledgeMode.getMode());
}

JavaUtils.INSTANCE.acceptIfNotNull(this.properties.getClientId(), container::setClientId)
.acceptIfNotNull(messageSelector, container::setMessageSelector)
.acceptIfNotNull(this.properties.getSubscriptionDurable(), container::setSubscriptionDurable)
.acceptIfNotNull(this.properties.getSubscriptionName(), container::setSubscriptionName)
.acceptIfNotNull(this.properties.getSubscriptionShared(), container::setSubscriptionShared);

return container;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The JMS supplier auto-configuration support.
*/
package org.springframework.cloud.fn.supplier.jms;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.supplier.jms.JmsSupplierConfiguration
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,8 @@

@TestPropertySource(properties = { "jms.supplier.sessionTransacted = false", "jms.supplier.destination = topic",
"jms.supplier.messageSelector = JMSCorrelationId=foo", "jms.supplier.subscriptionDurable = false",
"jms.supplier.subscriptionShared = false", "spring.jms.listener.acknowledgeMode = DUPS_OK",
"spring.jms.listener.concurrency = 3", "spring.jms.listener.maxConcurrency = 4",
"jms.supplier.subscriptionShared = false", "spring.jms.listener.session.acknowledge-mode = dups_ok",
"spring.jms.listener.min-concurrency = 3", "spring.jms.listener.maxConcurrency = 4",
"spring.jms.pubSubDomain = true" })
public class PropertiesPopulated1Tests extends AbstractJmsSupplierTests {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,7 @@
@TestPropertySource(properties = { "jms.supplier.sessionTransacted = true", "jms.supplier.clientId = client",
"jms.supplier.destination = topic", "jms.supplier.subscriptionName = subName",
"jms.supplier.subscriptionDurable = true", "jms.supplier.subscriptionShared = false",
"spring.jms.listener.acknowledgeMode = AUTO", "spring.jms.listener.concurrency = 3",
"spring.jms.listener.session.acknowledge-mode = auto", "spring.jms.listener.min-concurrency = 3",
"spring.jms.listener.maxConcurrency = 4" })
public class PropertiesPopulated2Tests extends AbstractJmsSupplierTests {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,7 @@
@TestPropertySource(properties = { "jms.supplier.sessionTransacted = true",
"jms.supplier.destination = jmssource.test.queue", "jms.supplier.messageSelector = JMSCorrelationId=foo",
"jms.supplier.subscriptionDurable = false", "jms.supplier.subscriptionShared = false",
"spring.jms.listener.acknowledgeMode = AUTO", "spring.jms.listener.concurrency = 3",
"spring.jms.listener.session.acknowledge-mode = auto", "spring.jms.listener.min-concurrency = 3",
"spring.jms.listener.maxConcurrency = 4", "spring.jms.pubSubDomain = false" })
public class PropertiesPopulated3Tests extends AbstractJmsSupplierTests {

Expand All @@ -47,7 +47,7 @@ public class PropertiesPopulated3Tests extends AbstractJmsSupplierTests {
private Supplier<Flux<Message<?>>> jmsSupplier;

@Test
public void test() throws Exception {
public void test() {
AbstractMessageListenerContainer container = TestUtils.getPropertyValue(this.endpoint, "listenerContainer",
AbstractMessageListenerContainer.class);
assertThat(container).isInstanceOf(DefaultMessageListenerContainer.class);
Expand All @@ -63,9 +63,10 @@ public void test() throws Exception {

final Flux<Message<?>> messageFlux = jmsSupplier.get();

final StepVerifier stepVerifier = StepVerifier.create(messageFlux).assertNext((message) -> {
assertThat(message.getPayload()).isEqualTo("Hello, world!");
}).thenCancel().verifyLater();
final StepVerifier stepVerifier = StepVerifier.create(messageFlux)
.assertNext((message) -> assertThat(message.getPayload()).isEqualTo("Hello, world!"))
.thenCancel()
.verifyLater();

template.convertAndSend("jmssource.test.queue", "Hello, world!");

Expand Down

0 comments on commit 84aa306

Please sign in to comment.