From f531bf26d42b58e9e1c2aa9e5f458da94b8730d5 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 19 Dec 2024 16:55:17 -0500 Subject: [PATCH] GH-124: Fix auto-wiring ambiguity in the `ZeroMqSupplierConfiguration` Fixes: https://github.com/spring-cloud/spring-functions-catalog/issues/124 Also use `toReactivePublisher(true)` to avoid manual lifecycle management. --- .../zeromq/ZeroMqSupplierConfiguration.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/supplier/spring-zeromq-supplier/src/main/java/org/springframework/cloud/fn/supplier/zeromq/ZeroMqSupplierConfiguration.java b/supplier/spring-zeromq-supplier/src/main/java/org/springframework/cloud/fn/supplier/zeromq/ZeroMqSupplierConfiguration.java index e4b22e8b..b40a5858 100644 --- a/supplier/spring-zeromq-supplier/src/main/java/org/springframework/cloud/fn/supplier/zeromq/ZeroMqSupplierConfiguration.java +++ b/supplier/spring-zeromq-supplier/src/main/java/org/springframework/cloud/fn/supplier/zeromq/ZeroMqSupplierConfiguration.java @@ -19,6 +19,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; +import org.reactivestreams.Publisher; import org.zeromq.SocketType; import org.zeromq.ZContext; import org.zeromq.ZMQ; @@ -28,7 +29,7 @@ import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.zeromq.inbound.ZeroMqMessageProducer; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; @@ -37,21 +38,19 @@ * A supplier auto-configuration that receives data from ZeroMQ. * * @author Daniel Frey - * @since 3.1.0 + * @author Artem Bilan */ @AutoConfiguration @EnableConfigurationProperties(ZeroMqSupplierProperties.class) public class ZeroMqSupplierConfiguration { - private FluxMessageChannel output = new FluxMessageChannel(); - @Bean public ZContext zContext() { return new ZContext(); } @Bean - public ZeroMqMessageProducer adapter(ZeroMqSupplierProperties properties, ZContext zContext, + public ZeroMqMessageProducer zeroMqSupplierMessageProducer(ZeroMqSupplierProperties properties, ZContext zContext, @Autowired(required = false) Consumer socketConfigurer) { ZeroMqMessageProducer zeroMqMessageProducer = new ZeroMqMessageProducer(zContext, properties.getSocketType()); @@ -70,15 +69,17 @@ else if (properties.getBindPort() > 0) { if (socketConfigurer != null) { zeroMqMessageProducer.setSocketConfigurer(socketConfigurer); } - zeroMqMessageProducer.setOutputChannel(this.output); - zeroMqMessageProducer.setAutoStartup(false); - return zeroMqMessageProducer; } @Bean - public Supplier>> zeromqSupplier(ZeroMqMessageProducer adapter) { - return () -> Flux.from(this.output).doOnSubscribe((subscription) -> adapter.start()); + Publisher> zeroMqSupplierFlow(ZeroMqMessageProducer zeroMqSupplierMessageProducer) { + return IntegrationFlow.from(zeroMqSupplierMessageProducer).toReactivePublisher(true); + } + + @Bean + public Supplier>> zeromqSupplier(Publisher> zeroMqSupplierFlow) { + return () -> Flux.from(zeroMqSupplierFlow); } }