diff --git a/supplier/spring-jms-supplier/README.adoc b/supplier/spring-jms-supplier/README.adoc index 4aa18754..befc28cc 100644 --- a/supplier/spring-jms-supplier/README.adoc +++ b/supplier/spring-jms-supplier/README.adoc @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and then receive the data. ## Beans for injection -You can import the `JmsSupplierConfiguration` in the application and then inject the following bean. +The `JmsSupplierConfiguration` auto-configuration provides the following bean: `jmsSupplier` diff --git a/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/JmsSupplierConfiguration.java b/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/JmsSupplierConfiguration.java index 5375b5be..5a599aa9 100644 --- a/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/JmsSupplierConfiguration.java +++ b/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/JmsSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,11 +23,14 @@ import reactor.core.publisher.Flux; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.jms.AcknowledgeMode; +import org.springframework.boot.autoconfigure.jms.JmsAutoConfiguration; import org.springframework.boot.autoconfigure.jms.JmsProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; +import org.springframework.integration.JavaUtils; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.jms.dsl.Jms; import org.springframework.integration.jms.dsl.JmsMessageDrivenChannelAdapterSpec; @@ -37,7 +40,14 @@ import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -@Configuration(proxyBeanMethods = false) +/** + * Auto-configuration for JMS supplier. + * + * @author Gary Russell + * @author Soby Chako + * @author Artem Bilan + */ +@AutoConfiguration(after = JmsAutoConfiguration.class) @EnableConfigurationProperties(JmsSupplierProperties.class) public class JmsSupplierConfiguration { @@ -71,47 +81,46 @@ public Publisher> jmsPublisher(AbstractMessageListenerContainer @Bean public AbstractMessageListenerContainer container() { AbstractMessageListenerContainer container; + JmsProperties.Listener listenerProperties = this.jmsProperties.getListener(); + Integer concurrency = listenerProperties.getMinConcurrency(); if (this.properties.isSessionTransacted()) { DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer(); dmlc.setSessionTransacted(true); - if (listenerProperties.getConcurrency() != null) { - dmlc.setConcurrentConsumers(listenerProperties.getConcurrency()); + if (concurrency != null) { + dmlc.setConcurrentConsumers(concurrency); } - if (listenerProperties.getMaxConcurrency() != null) { - dmlc.setMaxConcurrentConsumers(listenerProperties.getMaxConcurrency()); + Integer maxConcurrency = listenerProperties.getMaxConcurrency(); + if (maxConcurrency != null) { + dmlc.setMaxConcurrentConsumers(maxConcurrency); } container = dmlc; } else { SimpleMessageListenerContainer smlc = new SimpleMessageListenerContainer(); smlc.setSessionTransacted(false); - if (listenerProperties != null && listenerProperties.getConcurrency() != null) { - smlc.setConcurrentConsumers(listenerProperties.getConcurrency()); + if (concurrency != null) { + smlc.setConcurrentConsumers(concurrency); } container = smlc; } + container.setConnectionFactory(this.connectionFactory); - if (this.properties.getClientId() != null) { - container.setClientId(this.properties.getClientId()); - } container.setDestinationName(this.properties.getDestination()); - if (this.properties.getMessageSelector() != null) { - container.setMessageSelector(this.properties.getMessageSelector()); - } container.setPubSubDomain(this.jmsProperties.isPubSubDomain()); - if (this.properties.getMessageSelector() != null && listenerProperties.getAcknowledgeMode() != null) { - container.setSessionAcknowledgeMode(listenerProperties.getAcknowledgeMode().getMode()); - } - if (this.properties.getSubscriptionDurable() != null) { - container.setSubscriptionDurable(this.properties.getSubscriptionDurable()); - } - if (this.properties.getSubscriptionName() != null) { - container.setSubscriptionName(this.properties.getSubscriptionName()); - } - if (this.properties.getSubscriptionShared() != null) { - container.setSubscriptionShared(this.properties.getSubscriptionShared()); + + String messageSelector = this.properties.getMessageSelector(); + AcknowledgeMode acknowledgeMode = listenerProperties.getSession().getAcknowledgeMode(); + if (messageSelector != null && acknowledgeMode != null) { + container.setSessionAcknowledgeMode(acknowledgeMode.getMode()); } + + JavaUtils.INSTANCE.acceptIfNotNull(this.properties.getClientId(), container::setClientId) + .acceptIfNotNull(messageSelector, container::setMessageSelector) + .acceptIfNotNull(this.properties.getSubscriptionDurable(), container::setSubscriptionDurable) + .acceptIfNotNull(this.properties.getSubscriptionName(), container::setSubscriptionName) + .acceptIfNotNull(this.properties.getSubscriptionShared(), container::setSubscriptionShared); + return container; } diff --git a/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/package-info.java b/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/package-info.java new file mode 100644 index 00000000..e1d56957 --- /dev/null +++ b/supplier/spring-jms-supplier/src/main/java/org/springframework/cloud/fn/supplier/jms/package-info.java @@ -0,0 +1,4 @@ +/** + * The JMS supplier auto-configuration support. + */ +package org.springframework.cloud.fn.supplier.jms; diff --git a/supplier/spring-jms-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/supplier/spring-jms-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..0f662967 --- /dev/null +++ b/supplier/spring-jms-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.supplier.jms.JmsSupplierConfiguration diff --git a/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated1Tests.java b/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated1Tests.java index 147510d2..e8676e39 100644 --- a/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated1Tests.java +++ b/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated1Tests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,8 @@ @TestPropertySource(properties = { "jms.supplier.sessionTransacted = false", "jms.supplier.destination = topic", "jms.supplier.messageSelector = JMSCorrelationId=foo", "jms.supplier.subscriptionDurable = false", - "jms.supplier.subscriptionShared = false", "spring.jms.listener.acknowledgeMode = DUPS_OK", - "spring.jms.listener.concurrency = 3", "spring.jms.listener.maxConcurrency = 4", + "jms.supplier.subscriptionShared = false", "spring.jms.listener.session.acknowledge-mode = dups_ok", + "spring.jms.listener.min-concurrency = 3", "spring.jms.listener.maxConcurrency = 4", "spring.jms.pubSubDomain = true" }) public class PropertiesPopulated1Tests extends AbstractJmsSupplierTests { diff --git a/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated2Tests.java b/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated2Tests.java index 33e20107..d6d3d5c3 100644 --- a/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated2Tests.java +++ b/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated2Tests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ @TestPropertySource(properties = { "jms.supplier.sessionTransacted = true", "jms.supplier.clientId = client", "jms.supplier.destination = topic", "jms.supplier.subscriptionName = subName", "jms.supplier.subscriptionDurable = true", "jms.supplier.subscriptionShared = false", - "spring.jms.listener.acknowledgeMode = AUTO", "spring.jms.listener.concurrency = 3", + "spring.jms.listener.session.acknowledge-mode = auto", "spring.jms.listener.min-concurrency = 3", "spring.jms.listener.maxConcurrency = 4" }) public class PropertiesPopulated2Tests extends AbstractJmsSupplierTests { diff --git a/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated3Tests.java b/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated3Tests.java index e62605ce..33a3e278 100644 --- a/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated3Tests.java +++ b/supplier/spring-jms-supplier/src/test/java/org/springframework/cloud/fn/supplier/jms/PropertiesPopulated3Tests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,7 @@ @TestPropertySource(properties = { "jms.supplier.sessionTransacted = true", "jms.supplier.destination = jmssource.test.queue", "jms.supplier.messageSelector = JMSCorrelationId=foo", "jms.supplier.subscriptionDurable = false", "jms.supplier.subscriptionShared = false", - "spring.jms.listener.acknowledgeMode = AUTO", "spring.jms.listener.concurrency = 3", + "spring.jms.listener.session.acknowledge-mode = auto", "spring.jms.listener.min-concurrency = 3", "spring.jms.listener.maxConcurrency = 4", "spring.jms.pubSubDomain = false" }) public class PropertiesPopulated3Tests extends AbstractJmsSupplierTests { @@ -47,7 +47,7 @@ public class PropertiesPopulated3Tests extends AbstractJmsSupplierTests { private Supplier>> jmsSupplier; @Test - public void test() throws Exception { + public void test() { AbstractMessageListenerContainer container = TestUtils.getPropertyValue(this.endpoint, "listenerContainer", AbstractMessageListenerContainer.class); assertThat(container).isInstanceOf(DefaultMessageListenerContainer.class); @@ -63,9 +63,10 @@ public void test() throws Exception { final Flux> messageFlux = jmsSupplier.get(); - final StepVerifier stepVerifier = StepVerifier.create(messageFlux).assertNext((message) -> { - assertThat(message.getPayload()).isEqualTo("Hello, world!"); - }).thenCancel().verifyLater(); + final StepVerifier stepVerifier = StepVerifier.create(messageFlux) + .assertNext((message) -> assertThat(message.getPayload()).isEqualTo("Hello, world!")) + .thenCancel() + .verifyLater(); template.convertAndSend("jmssource.test.queue", "Hello, world!");