diff --git a/common/spring-function-test-support/src/main/java/org/springframework/cloud/fn/test/support/mqtt/MosquittoContainerTest.java b/common/spring-function-test-support/src/main/java/org/springframework/cloud/fn/test/support/mqtt/MosquittoContainerTest.java new file mode 100644 index 00000000..c1da74d2 --- /dev/null +++ b/common/spring-function-test-support/src/main/java/org/springframework/cloud/fn/test/support/mqtt/MosquittoContainerTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.fn.test.support.mqtt; + +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * The base contract for JUnit tests based on the container for MQTT Mosquitto broker. The + * Testcontainers 'reuse' option must be disabled,so, Ryuk container is started and will + * clean all the containers up from this test suite after JVM exit. Since the Mosquitto + * container instance is shared via static property, it is going to be started only once + * per JVM, therefore the target Docker container is reused automatically. + * + * @author Artem Bilan + */ +@Testcontainers(disabledWithoutDocker = true) +public interface MosquittoContainerTest { + + GenericContainer MOSQUITTO_CONTAINER = new GenericContainer<>("eclipse-mosquitto:2.0.13") + .withCommand("mosquitto -c /mosquitto-no-auth.conf") + .withExposedPorts(1883); + + @BeforeAll + static void startContainer() { + MOSQUITTO_CONTAINER.start(); + } + + static String mqttUrl() { + return "tcp://localhost:" + MOSQUITTO_CONTAINER.getFirstMappedPort(); + } + +} diff --git a/common/spring-function-test-support/src/main/java/org/springframework/cloud/fn/test/support/mqtt/package-info.java b/common/spring-function-test-support/src/main/java/org/springframework/cloud/fn/test/support/mqtt/package-info.java new file mode 100644 index 00000000..530d0ce4 --- /dev/null +++ b/common/spring-function-test-support/src/main/java/org/springframework/cloud/fn/test/support/mqtt/package-info.java @@ -0,0 +1,4 @@ +/** + * The MQTT protocol testing support. + */ +package org.springframework.cloud.fn.test.support.mqtt; diff --git a/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttConfiguration.java b/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttConfiguration.java index 7cdad151..eae9fe33 100644 --- a/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttConfiguration.java +++ b/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,28 +23,25 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.util.ObjectUtils; /** - * Generic mqtt configuration. + * The MQTT client auto-configuration. * * @author Janne Valkealahti * @author Artem Bilan */ -@Configuration +@AutoConfiguration +@EnableConfigurationProperties(MqttProperties.class) public class MqttConfiguration { - @Autowired - private MqttProperties mqttProperties; - @Bean - public MqttPahoClientFactory mqttClientFactory() { - + public MqttPahoClientFactory mqttClientFactory(MqttProperties mqttProperties) { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setServerURIs(mqttProperties.getUrl()); mqttConnectOptions.setUserName(mqttProperties.getUsername()); diff --git a/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttProperties.java b/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttProperties.java index ea70f75d..390dfa82 100644 --- a/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttProperties.java +++ b/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/MqttProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import org.springframework.validation.annotation.Validated; /** - * Generic mqtt connection properties. + * The MQTT client properties. * * @author Janne Valkealahti * @author Artem Bilan @@ -81,7 +81,7 @@ public class MqttProperties { @Size(min = 1) public String[] getUrl() { - return url; + return this.url; } public void setUrl(String[] url) { @@ -89,7 +89,7 @@ public void setUrl(String[] url) { } public String getUsername() { - return username; + return this.username; } public void setUsername(String username) { @@ -97,7 +97,7 @@ public void setUsername(String username) { } public String getPassword() { - return password; + return this.password; } public void setPassword(String password) { @@ -105,7 +105,7 @@ public void setPassword(String password) { } public boolean isCleanSession() { - return cleanSession; + return this.cleanSession; } public void setCleanSession(boolean cleanSession) { @@ -113,7 +113,7 @@ public void setCleanSession(boolean cleanSession) { } public int getKeepAliveInterval() { - return keepAliveInterval; + return this.keepAliveInterval; } public void setKeepAliveInterval(int keepAliveInterval) { @@ -121,7 +121,7 @@ public void setKeepAliveInterval(int keepAliveInterval) { } public int getConnectionTimeout() { - return connectionTimeout; + return this.connectionTimeout; } public void setConnectionTimeout(int connectionTimeout) { @@ -129,7 +129,7 @@ public void setConnectionTimeout(int connectionTimeout) { } public String getPersistence() { - return persistence; + return this.persistence; } public void setPersistence(String persistence) { @@ -137,7 +137,7 @@ public void setPersistence(String persistence) { } public String getPersistenceDirectory() { - return persistenceDirectory; + return this.persistenceDirectory; } public void setPersistenceDirectory(String persistenceDirectory) { diff --git a/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/package-info.java b/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/package-info.java new file mode 100644 index 00000000..bb070019 --- /dev/null +++ b/common/spring-mqtt-common/src/main/java/org/springframework/cloud/fn/common/mqtt/package-info.java @@ -0,0 +1,4 @@ +/** + * The MQTT client auto-configuration support. + */ +package org.springframework.cloud.fn.common.mqtt; diff --git a/common/spring-mqtt-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/common/spring-mqtt-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..868ed896 --- /dev/null +++ b/common/spring-mqtt-common/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.common.mqtt.MqttConfiguration diff --git a/consumer/spring-file-consumer/README.adoc b/consumer/spring-file-consumer/README.adoc index f85a6403..5d1417b7 100644 --- a/consumer/spring-file-consumer/README.adoc +++ b/consumer/spring-file-consumer/README.adoc @@ -5,7 +5,7 @@ The consumer uses the `FileWritingMessageHandler` from Spring Integration. ## Beans for injection -You can import `FileConsumerConfiguration` in the application and then inject the following bean. +The `FileConsumerConfiguration` auto-configuration provides the following bean: `Consumer> fileConsumer` diff --git a/consumer/spring-ftp-consumer/README.adoc b/consumer/spring-ftp-consumer/README.adoc index baa6efd5..332eabcd 100644 --- a/consumer/spring-ftp-consumer/README.adoc +++ b/consumer/spring-ftp-consumer/README.adoc @@ -5,7 +5,7 @@ The consumer uses the `FtpMessageHandler` from Spring Integration. ## Beans for injection -You can import `FtpConsumerConfiguration` in the application and then inject the following bean. +The `FtpConsumerConfiguration` auto-configuration provides the following bean: `Consumer> ftpConsumer` diff --git a/consumer/spring-mqtt-consumer/README.adoc b/consumer/spring-mqtt-consumer/README.adoc index 2bc79d88..8029a68f 100644 --- a/consumer/spring-mqtt-consumer/README.adoc +++ b/consumer/spring-mqtt-consumer/README.adoc @@ -4,7 +4,7 @@ A consumer that allows you to send messages using the MQTT protocol. ## Beans for injection -You can import `MqttConsumerConfiguration` in the application and then inject the following bean. +The `MqttConsumerConfiguration` auto-configuration provides the following bean: `Consumer> mqttConsumer` diff --git a/consumer/spring-mqtt-consumer/build.gradle b/consumer/spring-mqtt-consumer/build.gradle index 65cad74e..cfc152a3 100644 --- a/consumer/spring-mqtt-consumer/build.gradle +++ b/consumer/spring-mqtt-consumer/build.gradle @@ -1,3 +1,5 @@ dependencies { api project(':spring-mqtt-common') + + testImplementation project(':spring-function-test-support') } diff --git a/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerConfiguration.java b/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerConfiguration.java index cd8d8a0d..d12f6772 100644 --- a/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerConfiguration.java +++ b/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2022 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,11 @@ import java.util.function.Consumer; import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.cloud.fn.common.mqtt.MqttConfiguration; -import org.springframework.cloud.fn.common.mqtt.MqttProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; @@ -35,45 +32,37 @@ import org.springframework.messaging.MessageHandler; /** - * A consumer that sends data to Mqtt. + * A consumer that sends data to MQTT. * * @author Janne Valkealahti - * + * @author Artem Bilan */ -@Configuration(proxyBeanMethods = false) -@EnableConfigurationProperties({ MqttProperties.class, MqttConsumerProperties.class }) -@Import(MqttConfiguration.class) +@EnableConfigurationProperties(MqttConsumerProperties.class) +@AutoConfiguration(after = MqttConfiguration.class) public class MqttConsumerConfiguration { - @Autowired - private MqttConsumerProperties properties; - - @Autowired - private MqttPahoClientFactory mqttClientFactory; - - @Autowired - private BeanFactory beanFactory; - @Bean public Consumer> mqttConsumer(MessageHandler mqttOutbound) { return mqttOutbound::handleMessage; } @Bean - public MessageHandler mqttOutbound( + public MessageHandler mqttOutbound(MqttConsumerProperties properties, MqttPahoClientFactory mqttClientFactory, + BeanFactory beanFactory, @Nullable ComponentCustomizer mqttMessageHandlerCustomizer) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(properties.getClientId(), mqttClientFactory); messageHandler.setAsync(properties.isAsync()); messageHandler.setDefaultTopic(properties.getTopic()); - messageHandler.setConverter(pahoMessageConverter()); + messageHandler.setConverter(pahoMessageConverter(properties, beanFactory)); if (mqttMessageHandlerCustomizer != null) { mqttMessageHandlerCustomizer.customize(messageHandler); } return messageHandler; } - public DefaultPahoMessageConverter pahoMessageConverter() { + private DefaultPahoMessageConverter pahoMessageConverter(MqttConsumerProperties properties, + BeanFactory beanFactory) { DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(properties.getQos(), properties.isRetained(), properties.getCharset()); converter.setBeanFactory(beanFactory); diff --git a/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerProperties.java b/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerProperties.java index c7a3f5a0..511dbf52 100644 --- a/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerProperties.java +++ b/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ import org.springframework.validation.annotation.Validated; /** - * Properties for the Mqtt Consumer. + * Properties for the MQTT Consumer. * * @author Janne Valkealahti * @@ -34,32 +34,32 @@ public class MqttConsumerProperties { /** - * identifies the client. + * Identifies the client. */ private String clientId = "stream.client.id.sink"; /** - * the topic to which the sink will publish. + * The topic to which the sink will publish. */ private String topic = "stream.mqtt"; /** - * the quality of service to use. + * The quality of service to use. */ private int qos = 1; /** - * whether to set the 'retained' flag. + * Whether to set the 'retained' flag. */ private boolean retained = false; /** - * the charset used to convert a String payload to byte[]. + * The charset used to convert a String payload to byte[]. */ private String charset = "UTF-8"; /** - * whether or not to use async sends. + * Whether to use async sends. */ private boolean async = false; diff --git a/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/package-info.java b/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/package-info.java new file mode 100644 index 00000000..abcfd3f3 --- /dev/null +++ b/consumer/spring-mqtt-consumer/src/main/java/org/springframework/cloud/fn/consumer/mqtt/package-info.java @@ -0,0 +1,4 @@ +/** + * The MQTT consumer auto-configuration support. + */ +package org.springframework.cloud.fn.consumer.mqtt; diff --git a/consumer/spring-mqtt-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/consumer/spring-mqtt-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..7372bc43 --- /dev/null +++ b/consumer/spring-mqtt-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.consumer.mqtt.MqttConsumerConfiguration diff --git a/consumer/spring-mqtt-consumer/src/test/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerTests.java b/consumer/spring-mqtt-consumer/src/test/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerTests.java index 74bf584d..01ea0b2e 100644 --- a/consumer/spring-mqtt-consumer/src/test/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerTests.java +++ b/consumer/spring-mqtt-consumer/src/test/java/org/springframework/cloud/fn/consumer/mqtt/MqttConsumerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,19 @@ package org.springframework.cloud.fn.consumer.mqtt; -import java.time.Duration; import java.util.Properties; import java.util.function.Consumer; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.fn.test.support.mqtt.MosquittoContainerTest; import org.springframework.context.annotation.Bean; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @@ -39,6 +37,8 @@ import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; import static org.assertj.core.api.Assertions.assertThat; @@ -46,19 +46,7 @@ "mqtt.ssl-properties.com.ibm.ssl.keyStoreType=TEST" }) @DirtiesContext @Tag("integration") -public class MqttConsumerTests { - - static { - GenericContainer mosquitto = new GenericContainer<>("eclipse-mosquitto:2.0.13") - .withCommand("mosquitto -c /mosquitto-no-auth.conf") - .withReuse(true) - .withExposedPorts(1883) - .withStartupTimeout(Duration.ofSeconds(120)) - .withStartupAttempts(3); - mosquitto.start(); - final Integer mappedPort = mosquitto.getMappedPort(1883); - System.setProperty("mqtt.url", "tcp://localhost:" + mappedPort); - } +public class MqttConsumerTests implements MosquittoContainerTest { @Autowired private MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter; @@ -69,9 +57,9 @@ public class MqttConsumerTests { @Autowired protected QueueChannel queue; - @AfterAll - public static void cleanup() { - System.clearProperty("mqtt.url"); + @DynamicPropertySource + static void mongoDbProperties(DynamicPropertyRegistry registry) { + registry.add("mqtt.url", () -> "tcp://localhost:" + MOSQUITTO_CONTAINER.getMappedPort(1883)); } @Test @@ -91,11 +79,10 @@ public void testMqttConsumer() { @SpringBootApplication static class MqttConsumerTestApplication { - @Autowired - private MqttPahoClientFactory mqttClientFactory; - @Bean - public MqttPahoMessageDrivenChannelAdapter mqttInbound(BeanFactory beanFactory) { + MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory mqttClientFactory, + BeanFactory beanFactory) { + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("test", mqttClientFactory, "test"); adapter.setQos(0); @@ -104,7 +91,7 @@ public MqttPahoMessageDrivenChannelAdapter mqttInbound(BeanFactory beanFactory) return adapter; } - public DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) { + DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) { DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(1, true, "UTF-8"); converter.setPayloadAsBytes(false); converter.setBeanFactory(beanFactory); @@ -112,7 +99,7 @@ public DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) } @Bean - public QueueChannel queue() { + QueueChannel queue() { return new QueueChannel(); } diff --git a/supplier/spring-file-supplier/README.adoc b/supplier/spring-file-supplier/README.adoc index 1179253c..83d2584a 100644 --- a/supplier/spring-file-supplier/README.adoc +++ b/supplier/spring-file-supplier/README.adoc @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data. ## Beans for injection -You can import the `FileSupplierConfiguration` in the application and then inject the following bean. +The `FileSupplierConfiguration` auto-configuration provides the following bean: `fileSupplier` @@ -24,7 +24,7 @@ All configuration properties are prefixed with `file.supplier`. There are also properties that need to be used with the prefix `file.consumer`. For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierProperties.java[FileSupplierProperties]. -See link:src/main/java/org/springframework/cloud/fn/supplier/file/FileConsumerProperties.java[this] also. +See `FileConsumerProperties` also. A `ComponentCustomizer` bean can be added in the target project to provide any custom options for the `FileInboundChannelAdapterSpec` configuration used by the `fileSupplier`. diff --git a/supplier/spring-ftp-supplier/README.adoc b/supplier/spring-ftp-supplier/README.adoc index abca0e51..e9aa7695 100644 --- a/supplier/spring-ftp-supplier/README.adoc +++ b/supplier/spring-ftp-supplier/README.adoc @@ -1,6 +1,6 @@ # FTP Supplier -This module provides a FTP supplier that can be reused and composed in other applications. +This module provides an FTP supplier that can be reused and composed in other applications. The `Supplier` uses the `FtpInboundChannelAdapter` from Spring Integration. `FtpSupplier` is implemented as a `java.util.function.Supplier`. This supplier gives you a reactive stream of files from the provided directory as the supplier has a signature of `Supplier>>`. @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data. ## Beans for injection -You can import the `FtpSupplierConfiguration` in the application and then inject the following bean. +The `FtpSupplierConfiguration` auto-configuration provides the following bean: `ftpSupplier` @@ -24,7 +24,7 @@ All configuration properties are prefixed with `ftp.supplier`. There are also properties that need to be used with the prefix `file.consumer`. For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/ftp/FtpSupplierProperties.java[FtpSupplierProperties]. -See link:src/main/java/org/springframework/cloud/fn/supplier/file/FileConsumerProperties.java[this] also. +See `FileConsumerProperties` also. A `ComponentCustomizer` bean can be added in the target project to provide any custom options for the `FtpInboundChannelAdapterSpec` configuration used by the `ftpSupplier`. diff --git a/supplier/spring-mqtt-supplier/README.adoc b/supplier/spring-mqtt-supplier/README.adoc index 0fc7b2b7..2bef2c3f 100644 --- a/supplier/spring-mqtt-supplier/README.adoc +++ b/supplier/spring-mqtt-supplier/README.adoc @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and then receive the data. ## Beans for injection -You can import the `MqttSupplierConfiguration` in the application and then inject the following bean. +The `MqttSupplierConfiguration` auto-configuration provides the following bean: `mqttSupplier` diff --git a/supplier/spring-mqtt-supplier/build.gradle b/supplier/spring-mqtt-supplier/build.gradle index 65cad74e..cfc152a3 100644 --- a/supplier/spring-mqtt-supplier/build.gradle +++ b/supplier/spring-mqtt-supplier/build.gradle @@ -1,3 +1,5 @@ dependencies { api project(':spring-mqtt-common') + + testImplementation project(':spring-function-test-support') } diff --git a/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierConfiguration.java b/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierConfiguration.java index 07b3e606..0045b64f 100644 --- a/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierConfiguration.java +++ b/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,14 +22,11 @@ import reactor.core.publisher.Flux; import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.cloud.fn.common.mqtt.MqttConfiguration; -import org.springframework.cloud.fn.common.mqtt.MqttProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; @@ -38,38 +35,29 @@ import org.springframework.messaging.Message; /** - * A source module that receives data from Mqtt. + * A supplier that receives data from MQTT. * * @author Janne Valkealahti * @author Soby Chacko */ -@Configuration(proxyBeanMethods = false) -@EnableConfigurationProperties({ MqttProperties.class, MqttSupplierProperties.class }) -@Import(MqttConfiguration.class) +@EnableConfigurationProperties(MqttSupplierProperties.class) +@AutoConfiguration(after = MqttConfiguration.class) public class MqttSupplierConfiguration { - @Autowired - private MqttSupplierProperties properties; - - @Autowired - private MqttPahoClientFactory mqttClientFactory; - - @Autowired - private BeanFactory beanFactory; - @Bean public Supplier>> mqttSupplier(Publisher> mqttPublisher) { return () -> Flux.from(mqttPublisher); } @Bean - public MqttPahoMessageDrivenChannelAdapter mqttInbound( + public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttSupplierProperties properties, + MqttPahoClientFactory mqttClientFactory, BeanFactory beanFactory, @Nullable ComponentCustomizer mqttMessageProducerCustomizer) { - MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( - this.properties.getClientId(), this.mqttClientFactory, this.properties.getTopics()); - adapter.setQos(this.properties.getQos()); - adapter.setConverter(pahoMessageConverter(this.beanFactory)); + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(properties.getClientId(), + mqttClientFactory, properties.getTopics()); + adapter.setQos(properties.getQos()); + adapter.setConverter(pahoMessageConverter(properties, beanFactory)); adapter.setAutoStartup(false); if (mqttMessageProducerCustomizer != null) { @@ -84,7 +72,8 @@ public Publisher> mqttPublisher(MqttPahoMessageDrivenChannelAdap return IntegrationFlow.from(mqttInbound).toReactivePublisher(true); } - private DefaultPahoMessageConverter pahoMessageConverter(BeanFactory beanFactory) { + private DefaultPahoMessageConverter pahoMessageConverter(MqttSupplierProperties properties, + BeanFactory beanFactory) { DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(properties.getCharset()); converter.setPayloadAsBytes(properties.isBinary()); converter.setBeanFactory(beanFactory); diff --git a/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierProperties.java b/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierProperties.java index b8ca910b..a0fc67f1 100644 --- a/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierProperties.java +++ b/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ import org.springframework.validation.annotation.Validated; /** - * Properties for the Mqtt Source. + * Properties for the MQTT supplier. * * @author Janne Valkealahti * @author Soby Chacko @@ -34,28 +34,28 @@ public class MqttSupplierProperties { /** - * identifies the client. + * Identifies the client. */ private String clientId = "stream.client.id.source"; /** - * the topic(s) (comma-delimited) to which the source will subscribe. + * The topic(s) (comma-delimited) to which the source will subscribe. */ private String[] topics = new String[] { "stream.mqtt" }; /** - * the qos; a single value for all topics or a comma-delimited list to match the + * The qos; a single value for all topics or a comma-delimited list to match the * topics. */ private int[] qos = new int[] { 0 }; /** - * true to leave the payload as bytes. + * True to leave the payload as bytes. */ private boolean binary = false; /** - * the charset used to convert bytes to String (when binary is false). + * The charset used to convert bytes to String (when binary is false). */ private String charset = "UTF-8"; diff --git a/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/package-info.java b/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/package-info.java new file mode 100644 index 00000000..6173151c --- /dev/null +++ b/supplier/spring-mqtt-supplier/src/main/java/org/springframework/cloud/fn/supplier/mqtt/package-info.java @@ -0,0 +1,4 @@ +/** + * The MQTT supplier auto-configuration support. + */ +package org.springframework.cloud.fn.supplier.mqtt; diff --git a/supplier/spring-mqtt-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/supplier/spring-mqtt-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..90fab247 --- /dev/null +++ b/supplier/spring-mqtt-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.supplier.mqtt.MqttSupplierConfiguration diff --git a/supplier/spring-mqtt-supplier/src/test/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierTests.java b/supplier/spring-mqtt-supplier/src/test/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierTests.java index a6e4e08f..7a6603b9 100644 --- a/supplier/spring-mqtt-supplier/src/test/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierTests.java +++ b/supplier/spring-mqtt-supplier/src/test/java/org/springframework/cloud/fn/supplier/mqtt/MqttSupplierTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,22 +16,19 @@ package org.springframework.cloud.fn.supplier.mqtt; -import java.time.Duration; import java.util.Properties; import java.util.function.Supplier; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.GenericContainer; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.fn.test.support.mqtt.MosquittoContainerTest; import org.springframework.context.annotation.Bean; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; @@ -40,6 +37,8 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; import static org.assertj.core.api.Assertions.assertThat; @@ -55,23 +54,11 @@ @SpringBootTest(properties = { "mqtt.supplier.topics=test,fake", "mqtt.supplier.qos=0,0", "mqtt.ssl-properties.com.ibm.ssl.protocol=TLS", "mqtt.ssl-properties.com.ibm.ssl.keyStoreType=TEST" }) @DirtiesContext -public class MqttSupplierTests { - - static { - GenericContainer mosquitto = new GenericContainer<>("eclipse-mosquitto:2.0.13") - .withCommand("mosquitto -c /mosquitto-no-auth.conf") - .withReuse(true) - .withExposedPorts(1883) - .withStartupTimeout(Duration.ofSeconds(120)) - .withStartupAttempts(3); - mosquitto.start(); - final Integer mappedPort = mosquitto.getMappedPort(1883); - System.setProperty("mqtt.url", "tcp://localhost:" + mappedPort); - } +public class MqttSupplierTests implements MosquittoContainerTest { - @AfterAll - public static void cleanup() { - System.clearProperty("mqtt.url"); + @DynamicPropertySource + static void mongoDbProperties(DynamicPropertyRegistry registry) { + registry.add("mqtt.url", () -> "tcp://localhost:" + MOSQUITTO_CONTAINER.getMappedPort(1883)); } @Autowired @@ -91,19 +78,17 @@ public void testBasicFlow() { final Flux> messageFlux = mqttSupplier.get(); - StepVerifier.create(messageFlux).assertNext((message) -> { - assertThat(message.getPayload()).isEqualTo("hello"); - }).thenCancel().verify(); + StepVerifier.create(messageFlux) + .assertNext((message) -> assertThat(message.getPayload()).isEqualTo("hello")) + .thenCancel() + .verify(); } @SpringBootApplication static class MqttSupplierTestApplication { - @Autowired - private MqttPahoClientFactory mqttClientFactory; - @Bean - public MessageHandler mqttOutbound() { + MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("test", mqttClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultTopic("test"); @@ -112,7 +97,7 @@ public MessageHandler mqttOutbound() { } @Bean - public DefaultPahoMessageConverter producerConverter() { + DefaultPahoMessageConverter producerConverter() { return new DefaultPahoMessageConverter(1, true, "UTF-8"); }