Skip to content

Commit

Permalink
GH-124: Fix auto-wiring ambiguity in the ZeroMqSupplierConfiguration
Browse files Browse the repository at this point in the history
Fixes: #124

Also use `toReactivePublisher(true)` to avoid manual lifecycle management.
  • Loading branch information
artembilan committed Dec 19, 2024
1 parent 34892ae commit f531bf2
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
Expand All @@ -28,7 +29,7 @@
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.zeromq.inbound.ZeroMqMessageProducer;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
Expand All @@ -37,21 +38,19 @@
* A supplier auto-configuration that receives data from ZeroMQ.
*
* @author Daniel Frey
* @since 3.1.0
* @author Artem Bilan
*/
@AutoConfiguration
@EnableConfigurationProperties(ZeroMqSupplierProperties.class)
public class ZeroMqSupplierConfiguration {

private FluxMessageChannel output = new FluxMessageChannel();

@Bean
public ZContext zContext() {
return new ZContext();
}

@Bean
public ZeroMqMessageProducer adapter(ZeroMqSupplierProperties properties, ZContext zContext,
public ZeroMqMessageProducer zeroMqSupplierMessageProducer(ZeroMqSupplierProperties properties, ZContext zContext,
@Autowired(required = false) Consumer<ZMQ.Socket> socketConfigurer) {

ZeroMqMessageProducer zeroMqMessageProducer = new ZeroMqMessageProducer(zContext, properties.getSocketType());
Expand All @@ -70,15 +69,17 @@ else if (properties.getBindPort() > 0) {
if (socketConfigurer != null) {
zeroMqMessageProducer.setSocketConfigurer(socketConfigurer);
}
zeroMqMessageProducer.setOutputChannel(this.output);
zeroMqMessageProducer.setAutoStartup(false);

return zeroMqMessageProducer;
}

@Bean
public Supplier<Flux<Message<?>>> zeromqSupplier(ZeroMqMessageProducer adapter) {
return () -> Flux.from(this.output).doOnSubscribe((subscription) -> adapter.start());
Publisher<Message<Object>> zeroMqSupplierFlow(ZeroMqMessageProducer zeroMqSupplierMessageProducer) {
return IntegrationFlow.from(zeroMqSupplierMessageProducer).toReactivePublisher(true);
}

@Bean
public Supplier<Flux<Message<?>>> zeromqSupplier(Publisher<Message<Object>> zeroMqSupplierFlow) {
return () -> Flux.from(zeroMqSupplierFlow);
}

}

0 comments on commit f531bf2

Please sign in to comment.