diff --git a/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java b/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java index 509c4f7b..21aac481 100644 --- a/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java +++ b/supplier/spring-jdbc-supplier/src/main/java/org/springframework/cloud/fn/supplier/jdbc/JdbcSupplierConfiguration.java @@ -32,6 +32,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.integration.core.MessageSource; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; +import org.springframework.integration.util.IntegrationReactiveUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -55,7 +56,7 @@ public JdbcSupplierConfiguration(JdbcSupplierProperties properties, DataSource d } @Bean - public MessageSource jdbcMessageSource( + public JdbcPollingChannelAdapter jdbcMessageSource( @Nullable ComponentCustomizer jdbcPollingChannelAdapterCustomizer) { JdbcPollingChannelAdapter jdbcPollingChannelAdapter = new JdbcPollingChannelAdapter(this.dataSource, @@ -70,15 +71,10 @@ public MessageSource jdbcMessageSource( @Bean(name = "jdbcSupplier") @ConditionalOnProperty(prefix = "jdbc.supplier", name = "split", matchIfMissing = true) - public Supplier>> splittedSupplier(MessageSource jdbcMessageSource, - Function>, Flux>> splitterFunction) { + public Supplier>> splittedSupplier(JdbcPollingChannelAdapter jdbcMessageSource, + Function>, Flux>> splitterFunction) { - return () -> Flux.>create((sink) -> { - Message received = jdbcMessageSource.receive(); - if (received != null) { - sink.next(received); - } - }).transform(splitterFunction); + return () -> IntegrationReactiveUtils.messageSourceToFlux(jdbcMessageSource).transform(splitterFunction); } @Bean diff --git a/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java b/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java index 72483c13..a4f1392e 100644 --- a/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java +++ b/supplier/spring-mongodb-supplier/src/main/java/org/springframework/cloud/fn/supplier/mongo/MongodbSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2025 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.mongodb.inbound.MongoDbMessageSource; +import org.springframework.integration.util.IntegrationReactiveUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -59,14 +60,9 @@ public MongodbSupplierConfiguration(MongodbSupplierProperties properties, MongoT @Bean(name = "mongodbSupplier") @ConditionalOnProperty(prefix = "mongodb", name = "split", matchIfMissing = true) public Supplier>> splittedSupplier(MongoDbMessageSource mongoDbSource, - Function>, Flux>> splitterFunction) { - - return () -> Flux.>create((sink) -> { - Message received = mongoDbSource.receive(); - if (received != null) { - sink.next(received); - } - }).transform(splitterFunction); + Function>, Flux>> splitterFunction) { + + return () -> IntegrationReactiveUtils.messageSourceToFlux(mongoDbSource).transform(splitterFunction); } @Bean