Skip to content

Commit

Permalink
Enforce Checkstyle failures
Browse files Browse the repository at this point in the history
* Fix `FileSupplierConfiguration` to use `toReactivePublisher(true)`
to defer a start of channel adapters until reactive subscription.
* Also use `IntegrationReactiveUtils.messageSourceToFlux()` API instead of manual Reactor API composition
  • Loading branch information
artembilan committed Jan 16, 2024
1 parent 0f65110 commit 494311b
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 20 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ configure(javaProjects) { subproject ->
checkstyle {
toolVersion = '10.3'
configDirectory = rootProject.file('etc/checkstyle')
ignoreFailures = true
}

// dependencies that are common across all java projects
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -35,7 +32,6 @@
import org.springframework.cloud.fn.common.file.FileReadingMode;
import org.springframework.cloud.fn.common.file.FileUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.file.FileReadingMessageSource;
Expand All @@ -47,6 +43,7 @@
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import org.springframework.integration.util.IntegrationReactiveUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.StringUtils;
Expand All @@ -67,11 +64,6 @@ public class FileSupplierConfiguration {

private final FileConsumerProperties fileConsumerProperties;

@Autowired
@Lazy
@Qualifier("fileMessageSource")
private FileReadingMessageSource fileMessageSource;

public FileSupplierConfiguration(FileSupplierProperties fileSupplierProperties,
FileConsumerProperties fileConsumerProperties) {

Expand Down Expand Up @@ -110,21 +102,18 @@ public FileInboundChannelAdapterSpec fileMessageSource(FileListFilter<File> file
}

@Bean
public Flux<Message<?>> fileMessageFlux() {
return Mono
.<Message<?>>create(
(monoSink) -> monoSink.onRequest((value) -> monoSink.success(this.fileMessageSource.receive())))
.subscribeOn(Schedulers.boundedElastic())
.repeatWhenEmpty((it) -> it.delayElements(this.fileSupplierProperties.getDelayWhenEmpty()))
.repeat()
.doOnRequest((r) -> this.fileMessageSource.start());
public Flux<Message<File>> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) {
return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource)
.contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY,
this.fileSupplierProperties.getDelayWhenEmpty()))
.doOnRequest((r) -> fileReadingMessageSource.start());
}

@Bean
@ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'")
public Publisher<Message<Object>> fileReadingFlow(Flux<Message<?>> fileMessageFlux) {
IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux);
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher();
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher(true);
}

@Bean
Expand Down

0 comments on commit 494311b

Please sign in to comment.