Skip to content

Commit

Permalink
Fix SftpSupplier bean naming race condition
Browse files Browse the repository at this point in the history
Since we create several `MessageSource` bean in the configuration
it is better to be more specific with their injection via `@Qualifier`
  • Loading branch information
artembilan committed Jan 8, 2024
1 parent 56f75fb commit 476bb6d
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public class SftpSupplierConfiguration {
private static final String FILE_MODIFIED_TIME_HEADER = "FILE_MODIFIED_TIME";

@Bean
public Supplier<Flux<? extends Message<?>>> sftpSupplier(MessageSource<?> sftpMessageSource,
public Supplier<Flux<? extends Message<?>>> sftpSupplier(
@Qualifier("sftpMessageSource") MessageSource<?> sftpMessageSource,
@Nullable Publisher<Message<Object>> sftpReadingFlow, SftpSupplierProperties sftpSupplierProperties) {

Flux<? extends Message<?>> flux = (sftpReadingFlow != null) ? Flux.from(sftpReadingFlow)
Expand All @@ -115,8 +116,8 @@ public Supplier<Flux<? extends Message<?>>> sftpSupplier(MessageSource<?> sftpMe

@Bean
@Primary
public MessageSource<?> sftpMessageSource(MessageSource<?> messageSource, BeanFactory beanFactory,
@Nullable List<ReceiveMessageAdvice> receiveMessageAdvice) {
public MessageSource<?> sftpMessageSource(@Qualifier("targetMessageSource") MessageSource<?> messageSource,
BeanFactory beanFactory, @Nullable List<ReceiveMessageAdvice> receiveMessageAdvice) {

if (CollectionUtils.isEmpty(receiveMessageAdvice)) {
return messageSource;
Expand Down Expand Up @@ -157,7 +158,8 @@ else if (sftpSupplierProperties.getFilenameRegex() != null) {
/*
* Create a Flux from a MessageSource that will be used by the supplier.
*/
private Flux<? extends Message<?>> sftpMessageFlux(MessageSource<?> sftpMessageSource,
private Flux<? extends Message<?>> sftpMessageFlux(
@Qualifier("sftpMessageSource") MessageSource<?> sftpMessageSource,
SftpSupplierProperties sftpSupplierProperties) {

return IntegrationReactiveUtils.messageSourceToFlux(sftpMessageSource)
Expand Down Expand Up @@ -201,7 +203,7 @@ SftpStreamingInboundChannelAdapterSpec targetMessageSource(SftpRemoteFileTemplat
}

@Bean
Publisher<Message<Object>> sftpReadingFlow(MessageSource<?> sftpMessageSource,
Publisher<Message<Object>> sftpReadingFlow(@Qualifier("sftpMessageSource") MessageSource<?> sftpMessageSource,
SftpSupplierProperties sftpSupplierProperties, FileConsumerProperties fileConsumerProperties) {

return FileUtils
Expand Down Expand Up @@ -247,7 +249,7 @@ static class NonStreamingConfiguration {
*/
@Bean
@ConditionalOnExpression("environment['file.consumer.mode']!='ref' && environment['sftp.supplier.list-only']!='true'")
Publisher<Message<Object>> sftpReadingFlow(MessageSource<?> sftpMessageSource,
Publisher<Message<Object>> sftpReadingFlow(@Qualifier("sftpMessageSource") MessageSource<?> sftpMessageSource,
SftpSupplierProperties sftpSupplierProperties, FileConsumerProperties fileConsumerProperties,
@Nullable @Qualifier("renameRemoteFileHandler") MessageHandler renameRemoteFileHandler) {

Expand Down

0 comments on commit 476bb6d

Please sign in to comment.