Skip to content

Commit

Permalink
Use IntegrationReactiveUtils.messageSourceToFlux() API
Browse files Browse the repository at this point in the history
The `IntegrationReactiveUtils.messageSourceToFlux()` provides convenient API to represent a `MessageSource`
as a `Flux` to poll this source.
The API has an error handling logic and delay when no data emitted by the source
  • Loading branch information
artembilan committed Dec 27, 2024
1 parent 18b0995 commit 9360bac
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

Expand All @@ -55,7 +56,7 @@ public JdbcSupplierConfiguration(JdbcSupplierProperties properties, DataSource d
}

@Bean
public MessageSource<Object> jdbcMessageSource(
public JdbcPollingChannelAdapter jdbcMessageSource(
@Nullable ComponentCustomizer<JdbcPollingChannelAdapter> jdbcPollingChannelAdapterCustomizer) {

JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource,
Expand All @@ -70,15 +71,10 @@ public MessageSource<Object> jdbcMessageSource(

@Bean(name = "jdbcSupplier")
@ConditionalOnProperty(prefix = "jdbc.supplier", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MessageSource<Object> jdbcMessageSource,
Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction) {
public Supplier<Flux<Message<?>>> splittedSupplier(JdbcPollingChannelAdapter jdbcMessageSource,
Function<Flux<Message<Object>>, Flux<Message<?>>> splitterFunction) {

return () -> Flux.<Message<?>>create((sink) -> {
Message<?> received = jdbcMessageSource.receive();
if (received != null) {
sink.next(received);
}
}).transform(splitterFunction);
return () -> IntegrationReactiveUtils.messageSourceToFlux(jdbcMessageSource).transform(splitterFunction);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2025 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.mongodb.inbound.MongoDbMessageSource;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

Expand Down Expand Up @@ -59,14 +60,9 @@ public MongodbSupplierConfiguration(MongodbSupplierProperties properties, MongoT
@Bean(name = "mongodbSupplier")
@ConditionalOnProperty(prefix = "mongodb", name = "split", matchIfMissing = true)
public Supplier<Flux<Message<?>>> splittedSupplier(MongoDbMessageSource mongoDbSource,
Function<Flux<Message<?>>, Flux<Message<?>>> splitterFunction) {

return () -> Flux.<Message<?>>create((sink) -> {
Message<?> received = mongoDbSource.receive();
if (received != null) {
sink.next(received);
}
}).transform(splitterFunction);
Function<Flux<Message<Object>>, Flux<Message<?>>> splitterFunction) {

return () -> IntegrationReactiveUtils.messageSourceToFlux(mongoDbSource).transform(splitterFunction);
}

@Bean
Expand Down

0 comments on commit 9360bac

Please sign in to comment.