From 494311bcdd770989524e6aced0719d441dffa649 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 16 Jan 2024 17:57:38 -0500 Subject: [PATCH] Enforce Checkstyle failures * Fix `FileSupplierConfiguration` to use `toReactivePublisher(true)` to defer a start of channel adapters until reactive subscription. * Also use `IntegrationReactiveUtils.messageSourceToFlux()` API instead of manual Reactor API composition --- build.gradle | 1 - .../file/FileSupplierConfiguration.java | 27 ++++++------------- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/build.gradle b/build.gradle index 5324799a..b4d87717 100644 --- a/build.gradle +++ b/build.gradle @@ -114,7 +114,6 @@ configure(javaProjects) { subproject -> checkstyle { toolVersion = '10.3' configDirectory = rootProject.file('etc/checkstyle') - ignoreFailures = true } // dependencies that are common across all java projects diff --git a/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java b/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java index a493f56f..fff34fc8 100644 --- a/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java +++ b/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java @@ -21,12 +21,9 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; +import reactor.util.context.Context; import org.springframework.beans.factory.BeanInitializationException; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -35,7 +32,6 @@ import org.springframework.cloud.fn.common.file.FileReadingMode; import org.springframework.cloud.fn.common.file.FileUtils; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Lazy; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlowBuilder; import org.springframework.integration.file.FileReadingMessageSource; @@ -47,6 +43,7 @@ import org.springframework.integration.file.filters.RegexPatternFileListFilter; import org.springframework.integration.file.filters.SimplePatternFileListFilter; import org.springframework.integration.metadata.ConcurrentMetadataStore; +import org.springframework.integration.util.IntegrationReactiveUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.StringUtils; @@ -67,11 +64,6 @@ public class FileSupplierConfiguration { private final FileConsumerProperties fileConsumerProperties; - @Autowired - @Lazy - @Qualifier("fileMessageSource") - private FileReadingMessageSource fileMessageSource; - public FileSupplierConfiguration(FileSupplierProperties fileSupplierProperties, FileConsumerProperties fileConsumerProperties) { @@ -110,21 +102,18 @@ public FileInboundChannelAdapterSpec fileMessageSource(FileListFilter file } @Bean - public Flux> fileMessageFlux() { - return Mono - .>create( - (monoSink) -> monoSink.onRequest((value) -> monoSink.success(this.fileMessageSource.receive()))) - .subscribeOn(Schedulers.boundedElastic()) - .repeatWhenEmpty((it) -> it.delayElements(this.fileSupplierProperties.getDelayWhenEmpty())) - .repeat() - .doOnRequest((r) -> this.fileMessageSource.start()); + public Flux> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) { + return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource) + .contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, + this.fileSupplierProperties.getDelayWhenEmpty())) + .doOnRequest((r) -> fileReadingMessageSource.start()); } @Bean @ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'") public Publisher> fileReadingFlow(Flux> fileMessageFlux) { IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux); - return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher(); + return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher(true); } @Bean