From 56f75fb6dbda02daad7ce094e3d7907f9303ac07 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 8 Jan 2024 11:37:05 -0500 Subject: [PATCH] GH-2: Expose a `@MessagingGateway` for `sftpConsumer` Fixes: #2 * To avoid `@Lazy` and other possible injection problems when an `IntegrationFlow` is not ready yet, provide a `@MessagingGateway(name = "sftpConsumer")` explicitly for the `MessageConsumer` in the `SftpConsumerConfiguration` * Make SFTP modules as auto-configuration * Fix Checkstyle violations in those modules --- consumer/spring-sftp-consumer/README.adoc | 2 +- .../sftp/SftpConsumerConfiguration.java | 18 ++- .../consumer/sftp/SftpConsumerProperties.java | 10 +- ...tpConsumerSessionFactoryConfiguration.java | 10 +- .../cloud/fn/consumer/sftp/package-info.java | 4 + ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../sftp/SftpConsumerPropertiesTests.java | 59 +-------- supplier/spring-sftp-supplier/README.adoc | 6 +- .../sftp/SftpSupplierConfiguration.java | 123 +++++++++--------- .../SftpSupplierFactoryConfiguration.java | 25 ++-- .../supplier/sftp/SftpSupplierProperties.java | 60 +++++---- .../fn/supplier/sftp/SftpSupplierRotator.java | 9 +- .../cloud/fn/supplier/sftp/package-info.java | 4 + ...ot.autoconfigure.AutoConfiguration.imports | 1 + .../sftp/SftpSupplierApplicationTests.java | 80 ++++++------ 15 files changed, 180 insertions(+), 232 deletions(-) create mode 100644 consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/package-info.java create mode 100644 consumer/spring-sftp-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports create mode 100644 supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/package-info.java create mode 100644 supplier/spring-sftp-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports diff --git a/consumer/spring-sftp-consumer/README.adoc b/consumer/spring-sftp-consumer/README.adoc index 0baa33fb..f7527f6f 100644 --- a/consumer/spring-sftp-consumer/README.adoc +++ b/consumer/spring-sftp-consumer/README.adoc @@ -4,7 +4,7 @@ A consumer that allows you to SFTP files. ## Beans for injection -You can import `SftpConsumerConfiguration` in the application and then inject the following bean. +The `SftpConsumerConfiguration` auto-configuration provides the following bean: `Consumer> sftpConsumer` diff --git a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerConfiguration.java b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerConfiguration.java index c1c006da..1ac1aee7 100644 --- a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerConfiguration.java +++ b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,13 +20,13 @@ import org.apache.sshd.sftp.client.SftpClient; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlowBuilder; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.integration.sftp.dsl.Sftp; import org.springframework.integration.sftp.dsl.SftpMessageHandlerSpec; @@ -35,12 +35,12 @@ import org.springframework.messaging.Message; /** - * Configuration for SFTP Consumer. + * Auto-configuration for SFTP Consumer. * * @author Soby Chacko * @author Corneil du Plessis */ -@Configuration(proxyBeanMethods = false) +@AutoConfiguration @EnableConfigurationProperties(SftpConsumerProperties.class) @Import(SftpConsumerSessionFactoryConfiguration.class) public class SftpConsumerConfiguration { @@ -50,9 +50,6 @@ public IntegrationFlow ftpOutboundFlow(SftpConsumerProperties properties, SessionFactory ftpSessionFactory, @Nullable ComponentCustomizer sftpMessageHandlerSpecCustomizer) { - IntegrationFlowBuilder integrationFlowBuilder = IntegrationFlow.from(MessageConsumer.class, - (gateway) -> gateway.beanName("sftpConsumer")); - SftpMessageHandlerSpec handlerSpec = Sftp .outboundAdapter(new SftpRemoteFileTemplate(ftpSessionFactory), properties.getMode()) .remoteDirectory(properties.getRemoteDir()) @@ -69,10 +66,11 @@ public IntegrationFlow ftpOutboundFlow(SftpConsumerProperties properties, sftpMessageHandlerSpecCustomizer.customize(handlerSpec); } - return integrationFlowBuilder.handle(handlerSpec).get(); + return (flow) -> flow.handle(handlerSpec); } - private interface MessageConsumer extends Consumer> { + @MessagingGateway(name = "sftpConsumer", defaultRequestChannel = "ftpOutboundFlow.input") + public interface MessageConsumer extends Consumer> { } diff --git a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerProperties.java b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerProperties.java index 84fd95af..210b8f84 100644 --- a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerProperties.java +++ b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,8 @@ import org.springframework.validation.annotation.Validated; /** + * The configuration properties for SFTP consumer. + * * @author Gary Russell * @author Artem Bilan * @author Corneil du Plessis @@ -122,7 +124,7 @@ public void setFilenameExpression(String filenameExpression) { @NotBlank public String getRemoteDir() { - return remoteDir; + return this.remoteDir; } public final void setRemoteDir(String remoteDir) { @@ -131,7 +133,7 @@ public final void setRemoteDir(String remoteDir) { @NotBlank public String getTmpFileSuffix() { - return tmpFileSuffix; + return this.tmpFileSuffix; } public void setTmpFileSuffix(String tmpFileSuffix) { @@ -140,7 +142,7 @@ public void setTmpFileSuffix(String tmpFileSuffix) { @NotBlank public String getRemoteFileSeparator() { - return remoteFileSeparator; + return this.remoteFileSeparator; } public void setRemoteFileSeparator(String remoteFileSeparator) { diff --git a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerSessionFactoryConfiguration.java b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerSessionFactoryConfiguration.java index 61742d82..bfdd7c7f 100644 --- a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerSessionFactoryConfiguration.java +++ b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerSessionFactoryConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.core.io.Resource; -import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.expression.Expression; import org.springframework.integration.file.remote.session.CachingSessionFactory; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.integration.sftp.session.DefaultSftpSessionFactory; @@ -51,9 +51,9 @@ public SessionFactory sftpSessionFactory(SftpConsumerProper sftpSessionFactory.setPrivateKey(factory.getPrivateKey()); sftpSessionFactory.setPrivateKeyPassphrase(factory.getPassPhrase()); sftpSessionFactory.setAllowUnknownKeys(factory.isAllowUnknownKeys()); - if (factory.getKnownHostsExpression() != null) { - String knownHostsLocation = factory.getKnownHostsExpression() - .getValue(IntegrationContextUtils.getEvaluationContext(applicationContext), String.class); + Expression knownHostsExpression = factory.getKnownHostsExpression(); + if (knownHostsExpression != null) { + String knownHostsLocation = knownHostsExpression.getValue(String.class); Resource knownHostsResource = applicationContext.getResource(knownHostsLocation); sftpSessionFactory.setKnownHostsResource(knownHostsResource); } diff --git a/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/package-info.java b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/package-info.java new file mode 100644 index 00000000..786bb15e --- /dev/null +++ b/consumer/spring-sftp-consumer/src/main/java/org/springframework/cloud/fn/consumer/sftp/package-info.java @@ -0,0 +1,4 @@ +/** + * The SFTP consumer auto-configuration support. + */ +package org.springframework.cloud.fn.consumer.sftp; diff --git a/consumer/spring-sftp-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/consumer/spring-sftp-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..ebd5ba62 --- /dev/null +++ b/consumer/spring-sftp-consumer/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.consumer.sftp.SftpConsumerConfiguration diff --git a/consumer/spring-sftp-consumer/src/test/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerPropertiesTests.java b/consumer/spring-sftp-consumer/src/test/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerPropertiesTests.java index ab0a703a..885a5666 100644 --- a/consumer/spring-sftp-consumer/src/test/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerPropertiesTests.java +++ b/consumer/spring-sftp-consumer/src/test/java/org/springframework/cloud/fn/consumer/sftp/SftpConsumerPropertiesTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,26 +20,14 @@ import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.boot.context.properties.ConfigurationPropertiesBinding; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.test.util.TestPropertyValues; +import org.springframework.cloud.fn.common.config.SpelExpressionConverterConfiguration; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.context.annotation.Lazy; -import org.springframework.core.convert.converter.Converter; -import org.springframework.expression.EvaluationContext; -import org.springframework.expression.Expression; -import org.springframework.expression.ParseException; -import org.springframework.expression.spel.standard.SpelExpression; -import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.config.EnableIntegration; -import org.springframework.integration.config.IntegrationConverter; -import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.integration.file.support.FileExistsMode; import org.springframework.integration.test.util.TestUtils; @@ -160,50 +148,9 @@ static class Conf { @Configuration @EnableConfigurationProperties(SftpConsumerProperties.class) @EnableIntegration - @Import(SftpConsumerSessionFactoryConfiguration.class) + @Import({ SpelExpressionConverterConfiguration.class, SftpConsumerSessionFactoryConfiguration.class }) static class Factory { - @Bean - @ConfigurationPropertiesBinding - @IntegrationConverter - public Converter spelConverter() { - return new SpelConverter(); - } - - /** - * TODO: This needs to be refactored into a generic place for any functions to - * use. - * - * A simple converter from String to Expression. - * - * @author Eric Bottard - */ - public static class SpelConverter implements Converter { - - private SpelExpressionParser parser = new SpelExpressionParser(); - - @Autowired - @Qualifier(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME) - @Lazy - private EvaluationContext evaluationContext; - - @Override - public Expression convert(String source) { - try { - Expression expression = this.parser.parseExpression(source); - if (expression instanceof SpelExpression) { - ((SpelExpression) expression).setEvaluationContext(this.evaluationContext); - } - return expression; - } - catch (ParseException e) { - throw new IllegalArgumentException( - String.format("Could not convert '%s' into a SpEL expression", source), e); - } - } - - } - } } diff --git a/supplier/spring-sftp-supplier/README.adoc b/supplier/spring-sftp-supplier/README.adoc index b1748b1b..35b58148 100644 --- a/supplier/spring-sftp-supplier/README.adoc +++ b/supplier/spring-sftp-supplier/README.adoc @@ -25,7 +25,7 @@ When configuring the `sftp.factory.known-hosts-expression` option, the root obje ## Idempotency -By default the supplier uses a https://docs.spring.io/spring-integration/api/org/springframework/integration/metadata/SimpleMetadataStore.html[SimpleMetadataStore], storing the last modified time to track files that have already been processed in memory. +By default, the supplier uses a https://docs.spring.io/spring-integration/api/org/springframework/integration/metadata/SimpleMetadataStore.html[SimpleMetadataStore], storing the last modified time to track files that have already been processed in memory. If an application using this supplier is restarted, any existing files will be reprocessed. You can inject on of the persistent https://docs.spring.io/spring-integration/reference/html/meta-data-store.html[MetadataStore implementations] provided by Spring Integration, or your own of course, to maintain this state permanently. See also link:../../common/metadata-store-common/README.adoc[`MetadataStore`] options for possible shared persistent store configuration for the `SftpPersistentAcceptOnceFileListFilter` used in the SFTP Source. @@ -60,7 +60,7 @@ Users have to subscribe to this `Flux` and receive the data. ## Beans for injection -You can import the `SftpSupplierConfiguration` in the application and then inject the following bean. +The `SftpSupplierConfiguration` auto-configuration provides the following bean: `sftpSupplier` @@ -76,7 +76,7 @@ All configuration properties are prefixed with `sftp.supplier`. There are also properties that need to be used with the prefix `file.consumer`. For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierProperties.java[SftpSupplierProperties]. -Also see link:src/main/java/org/springframework/cloud/fn/supplier/file/FileConsumerProperties.java[FileConsumerProperties]. +Also see `FileConsumerProperties`. ## Examples diff --git a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierConfiguration.java b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierConfiguration.java index a3bf615c..e7447b6a 100644 --- a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierConfiguration.java +++ b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,13 +26,13 @@ import org.apache.sshd.sftp.client.SftpClient; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.core.publisher.MonoProcessor; import reactor.util.context.Context; import org.springframework.aop.framework.ProxyFactoryBean; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -40,6 +40,7 @@ import org.springframework.cloud.fn.common.file.FileUtils; import org.springframework.cloud.fn.common.file.remote.RemoteFileDeletingAdvice; import org.springframework.cloud.fn.common.file.remote.RemoteFileRenamingAdvice; +import org.springframework.context.Lifecycle; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @@ -61,8 +62,8 @@ import org.springframework.integration.metadata.ConcurrentMetadataStore; import org.springframework.integration.sftp.dsl.Sftp; import org.springframework.integration.sftp.dsl.SftpInboundChannelAdapterSpec; -import org.springframework.integration.sftp.dsl.SftpStreamingInboundChannelAdapterSpec; import org.springframework.integration.sftp.dsl.SftpOutboundGatewaySpec; +import org.springframework.integration.sftp.dsl.SftpStreamingInboundChannelAdapterSpec; import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter; import org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter; import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter; @@ -70,7 +71,6 @@ import org.springframework.integration.util.IntegrationReactiveUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; @@ -80,6 +80,8 @@ import org.springframework.util.StringUtils; /** + * The auto-configuration for SFTP supplier. + * * @author Gary Russell * @author Artem Bilan * @author Chris Schaefer @@ -88,7 +90,7 @@ * @author Corneil du Plessis */ -@Configuration +@AutoConfiguration @EnableConfigurationProperties({ SftpSupplierProperties.class, FileConsumerProperties.class }) @Import({ SftpSupplierFactoryConfiguration.class }) public class SftpSupplierConfiguration { @@ -97,21 +99,18 @@ public class SftpSupplierConfiguration { private static final String FILE_MODIFIED_TIME_HEADER = "FILE_MODIFIED_TIME"; - @Bean - public MonoProcessor subscriptionBarrier() { - return MonoProcessor.create(); - } - @Bean public Supplier>> sftpSupplier(MessageSource sftpMessageSource, - @Nullable Publisher> sftpReadingFlow, MonoProcessor subscriptionBarrier, - SftpSupplierProperties sftpSupplierProperties) { + @Nullable Publisher> sftpReadingFlow, SftpSupplierProperties sftpSupplierProperties) { - Flux> flux = sftpReadingFlow == null - ? sftpMessageFlux(sftpMessageSource, sftpSupplierProperties, subscriptionBarrier) - : Flux.from(sftpReadingFlow); + Flux> flux = (sftpReadingFlow != null) ? Flux.from(sftpReadingFlow) + : sftpMessageFlux(sftpMessageSource, sftpSupplierProperties); - return () -> flux.doOnRequest(l -> subscriptionBarrier.onNext(true)); + return () -> flux.doOnSubscribe((sub) -> { + if (sftpMessageSource instanceof Lifecycle lifecycle) { + lifecycle.start(); + } + }); } @Bean @@ -126,7 +125,7 @@ public MessageSource sftpMessageSource(MessageSource messageSource, BeanFa ProxyFactoryBean proxyFactoryBean = new ProxyFactoryBean(); proxyFactoryBean.setTarget(messageSource); proxyFactoryBean.setBeanFactory(beanFactory); - receiveMessageAdvice.stream().map(advice -> { + receiveMessageAdvice.stream().map((advice) -> { NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice); advisor.addMethodName("receive"); return advisor; @@ -159,37 +158,39 @@ else if (sftpSupplierProperties.getFilenameRegex() != null) { * Create a Flux from a MessageSource that will be used by the supplier. */ private Flux> sftpMessageFlux(MessageSource sftpMessageSource, - SftpSupplierProperties sftpSupplierProperties, MonoProcessor subscriptionBarrier) { + SftpSupplierProperties sftpSupplierProperties) { return IntegrationReactiveUtils.messageSourceToFlux(sftpMessageSource) - .delaySubscription(subscriptionBarrier) .contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, sftpSupplierProperties.getDelayWhenEmpty())); } private static String remoteDirectory(SftpSupplierProperties sftpSupplierProperties) { - return sftpSupplierProperties.isMultiSource() + return (sftpSupplierProperties.isMultiSource()) ? SftpSupplierProperties.keyDirectories(sftpSupplierProperties).get(0).getDirectory() : sftpSupplierProperties.getRemoteDir(); } - @Configuration + @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(prefix = "sftp.supplier", name = "stream") static class StreamingConfiguration { @Bean - public SftpRemoteFileTemplate sftpTemplate(SftpSupplierFactoryConfiguration.DelegatingFactoryWrapper wrapper) { + SftpRemoteFileTemplate sftpTemplate(SftpSupplierFactoryConfiguration.DelegatingFactoryWrapper wrapper) { return new SftpRemoteFileTemplate(wrapper.getFactory()); } /** * Streaming {@link MessageSource} that provides an InputStream for each remote * file. It does not synchronize files to a local directory. + * @param sftpTemplate the {@link SftpRemoteFileTemplate} to use. + * @param sftpSupplierProperties the {@link SftpSupplierProperties} to use. + * @param fileListFilter the {@link FileListFilter} to use. * @return a {@link MessageSource}. */ @Bean - public SftpStreamingInboundChannelAdapterSpec targetMessageSource(SftpRemoteFileTemplate sftpTemplate, + SftpStreamingInboundChannelAdapterSpec targetMessageSource(SftpRemoteFileTemplate sftpTemplate, SftpSupplierProperties sftpSupplierProperties, FileListFilter fileListFilter) { return Sftp.inboundStreamingAdapter(sftpTemplate) @@ -200,23 +201,21 @@ public SftpStreamingInboundChannelAdapterSpec targetMessageSource(SftpRemoteFile } @Bean - public Publisher> sftpReadingFlow(MessageSource sftpMessageSource, - MonoProcessor subscriptionBarrier, SftpSupplierProperties sftpSupplierProperties, - FileConsumerProperties fileConsumerProperties) { + Publisher> sftpReadingFlow(MessageSource sftpMessageSource, + SftpSupplierProperties sftpSupplierProperties, FileConsumerProperties fileConsumerProperties) { return FileUtils .enhanceStreamFlowForReadingMode( IntegrationFlow.from(IntegrationReactiveUtils.messageSourceToFlux(sftpMessageSource) - .delaySubscription(subscriptionBarrier) .contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, sftpSupplierProperties.getDelayWhenEmpty()))), fileConsumerProperties) - .toReactivePublisher(); + .toReactivePublisher(true); } @Bean @ConditionalOnProperty(prefix = "sftp.supplier", value = "delete-remote-files") - public RemoteFileDeletingAdvice remoteFileDeletingAdvice(SftpRemoteFileTemplate sftpTemplate, + RemoteFileDeletingAdvice remoteFileDeletingAdvice(SftpRemoteFileTemplate sftpTemplate, SftpSupplierProperties sftpSupplierProperties) { return new RemoteFileDeletingAdvice(sftpTemplate, sftpSupplierProperties.getRemoteFileSeparator()); @@ -224,7 +223,7 @@ public RemoteFileDeletingAdvice remoteFileDeletingAdvice(SftpRemoteFileTemplate @Bean @ConditionalOnProperty(prefix = "sftp.supplier", value = "rename-remote-files-to") - public RemoteFileRenamingAdvice remoteFileRenamingAdvice(SftpRemoteFileTemplate sftpTemplate, + RemoteFileRenamingAdvice remoteFileRenamingAdvice(SftpRemoteFileTemplate sftpTemplate, SftpSupplierProperties sftpSupplierProperties) { return new RemoteFileRenamingAdvice(sftpTemplate, sftpSupplierProperties.getRemoteFileSeparator(), @@ -233,46 +232,50 @@ public RemoteFileRenamingAdvice remoteFileRenamingAdvice(SftpRemoteFileTemplate } - @Configuration - @ConditionalOnExpression("environment['sftp.supplier.stream']!='true'") + @Configuration(proxyBeanMethods = false) + @ConditionalOnExpression("environment['sftp.supplier.stream'] != 'true'") static class NonStreamingConfiguration { /** * Enrich the flow to provide some standard headers, depending on * {@link FileConsumerProperties}, when consuming file contents. * @param sftpMessageSource the {@link MessageSource}. - * @param fileConsumerProperties the {@code FileConsumerProperties}. + * @param sftpSupplierProperties the {@link SftpSupplierProperties} to use. + * @param fileConsumerProperties the {@link FileConsumerProperties}. + * @param renameRemoteFileHandler the {@link MessageHandler} for SFTP protocol. * @return a {@code Publisher}. */ @Bean @ConditionalOnExpression("environment['file.consumer.mode']!='ref' && environment['sftp.supplier.list-only']!='true'") - public Publisher> sftpReadingFlow(MessageSource sftpMessageSource, - MonoProcessor subscriptionBarrier, SftpSupplierProperties sftpSupplierProperties, - FileConsumerProperties fileConsumerProperties, + Publisher> sftpReadingFlow(MessageSource sftpMessageSource, + SftpSupplierProperties sftpSupplierProperties, FileConsumerProperties fileConsumerProperties, @Nullable @Qualifier("renameRemoteFileHandler") MessageHandler renameRemoteFileHandler) { IntegrationFlowBuilder flowBuilder = FileUtils.enhanceFlowForReadingMode( IntegrationFlow.from(IntegrationReactiveUtils.messageSourceToFlux(sftpMessageSource) - .delaySubscription(subscriptionBarrier) .contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, sftpSupplierProperties.getDelayWhenEmpty()))), fileConsumerProperties); if (renameRemoteFileHandler != null) { - flowBuilder.publishSubscribeChannel( - pubsub -> pubsub.subscribe(subFlow -> subFlow.handle(renameRemoteFileHandler).nullChannel())); + flowBuilder.publishSubscribeChannel((pubsub) -> pubsub + .subscribe((subFlow) -> subFlow.handle(renameRemoteFileHandler).nullChannel())); } - return flowBuilder.toReactivePublisher(); + return flowBuilder.toReactivePublisher(true); } /** * A {@link MessageSource} that synchronizes files to a local directory. + * @param sftpSupplierProperties the properties. + * @param delegatingFactoryWrapper the + * {@link SftpSupplierFactoryConfiguration.DelegatingFactoryWrapper} to use. + * @param fileListFilter the {@link FileListFilter} to use. * @return the {code MessageSource}. */ @ConditionalOnExpression("environment['sftp.supplier.list-only'] != 'true'") @Bean - public SftpInboundChannelAdapterSpec targetMessageSource(SftpSupplierProperties sftpSupplierProperties, + SftpInboundChannelAdapterSpec targetMessageSource(SftpSupplierProperties sftpSupplierProperties, SftpSupplierFactoryConfiguration.DelegatingFactoryWrapper delegatingFactoryWrapper, FileListFilter fileListFilter) { @@ -291,7 +294,7 @@ public SftpInboundChannelAdapterSpec targetMessageSource(SftpSupplierProperties @Bean @ConditionalOnProperty(prefix = "sftp.supplier", value = "rename-remote-files-to") - public SftpOutboundGatewaySpec renameRemoteFileHandler( + SftpOutboundGatewaySpec renameRemoteFileHandler( SftpSupplierFactoryConfiguration.DelegatingFactoryWrapper delegatingFactoryWrapper, SftpSupplierProperties sftpSupplierProperties) { @@ -308,28 +311,24 @@ public SftpOutboundGatewaySpec renameRemoteFileHandler( /* * List only configuration */ - @Configuration + @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(prefix = "sftp.supplier", name = "list-only") static class ListingOnlyConfiguration { - @Bean - PollableChannel listingChannel() { - return new QueueChannel(); - } + private final PollableChannel listingChannel = new QueueChannel(); @Bean @SuppressWarnings("unchecked") - public MessageSource targetMessageSource(PollableChannel listingChannel, - SftpListingMessageProducer sftpListingMessageProducer) { + MessageSource targetMessageSource(SftpListingMessageProducer sftpListingMessageProducer) { return () -> { sftpListingMessageProducer.listNames(); - return (Message) listingChannel.receive(); + return (Message) this.listingChannel.receive(); }; } @Bean - public SftpListingMessageProducer sftpListingMessageProducer(SftpSupplierProperties sftpSupplierProperties, + SftpListingMessageProducer sftpListingMessageProducer(SftpSupplierProperties sftpSupplierProperties, SftpSupplierFactoryConfiguration.DelegatingFactoryWrapper delegatingFactoryWrapper) { return new SftpListingMessageProducer(delegatingFactoryWrapper.getFactory(), @@ -339,7 +338,7 @@ public SftpListingMessageProducer sftpListingMessageProducer(SftpSupplierPropert @Bean GenericSelector listOnlyFilter(SftpSupplierProperties sftpSupplierProperties) { - Predicate predicate = s -> true; + Predicate predicate = (s) -> true; if (StringUtils.hasText(sftpSupplierProperties.getFilenamePattern())) { predicate = Pattern.compile(sftpSupplierProperties.getFilenamePattern()).asPredicate(); } @@ -351,7 +350,7 @@ else if (sftpSupplierProperties.getFilenameRegex() != null) { } @Bean - public IntegrationFlow listingFlow(MessageProducerSupport listingMessageProducer, MessageChannel listingChannel, + IntegrationFlow listingFlow(MessageProducerSupport listingMessageProducer, MessageProcessor lsEntryToStringTransformer, GenericSelector> duplicateFilter, GenericSelector listOnlyFilter) { @@ -360,14 +359,13 @@ public IntegrationFlow listingFlow(MessageProducerSupport listingMessageProducer .transform(lsEntryToStringTransformer) .filter(duplicateFilter) .filter(listOnlyFilter) - .channel(listingChannel) + .channel(this.listingChannel) .get(); } @Bean - public MessageProcessor> lsEntryToStringTransformer() { + MessageProcessor> lsEntryToStringTransformer() { return (Message message) -> { - SftpClient.DirEntry dirEntry = (SftpClient.DirEntry) message.getPayload(); String fileName = message.getHeaders().get(FileHeaders.REMOTE_DIRECTORY) + dirEntry.getFilename(); @@ -383,6 +381,7 @@ public MessageProcessor> lsEntryToStringTransformer() { @Bean GenericSelector> duplicateFilter(ConcurrentMetadataStore metadataStore) { + // Must be a specific type return new GenericSelector>() { @Override public boolean accept(Message message) { @@ -420,18 +419,18 @@ static class SftpListingMessageProducer extends MessageProducerSupport { this.sort = sort; } - public void listNames() { + void listNames() { Stream stream; try { stream = Stream.of(this.sessionFactory.getSession().list(this.remoteDirectory)) - .filter(x -> !(x.getAttributes().isDirectory() || x.getAttributes().isSymbolicLink())); + .filter((x) -> !(x.getAttributes().isDirectory() || x.getAttributes().isSymbolicLink())); - if (sort != null) { - stream = stream.sorted(sort.comparator()); + if (this.sort != null) { + stream = stream.sorted(this.sort.comparator()); } } - catch (IOException e) { - throw new MessagingException(e.getMessage(), e); + catch (IOException ex) { + throw new MessagingException(ex.getMessage(), ex); } sendMessage(MessageBuilder.withPayload(stream) .setHeader(FileHeaders.REMOTE_DIRECTORY, this.remoteDirectory + this.remoteFileSeparator) diff --git a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierFactoryConfiguration.java b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierFactoryConfiguration.java index e50be7f9..f66850b3 100644 --- a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierFactoryConfiguration.java +++ b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierFactoryConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.core.io.Resource; -import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.expression.Expression; import org.springframework.integration.file.remote.aop.StandardRotationPolicy; import org.springframework.integration.file.remote.session.CachingSessionFactory; import org.springframework.integration.file.remote.session.DelegatingSessionFactory; @@ -63,14 +63,15 @@ public DelegatingFactoryWrapper delegatingFactoryWrapper(SftpSupplierProperties @Bean StandardRotationPolicy rotationPolicy(SftpSupplierProperties properties, DelegatingFactoryWrapper factory) { - return properties.isMultiSource() ? new StandardRotationPolicy(factory.getFactory(), + return (properties.isMultiSource()) ? new StandardRotationPolicy(factory.getFactory(), SftpSupplierProperties.keyDirectories(properties), properties.isFair()) : null; } @Bean public SftpSupplierRotator rotatingAdvice(SftpSupplierProperties properties, @Nullable StandardRotationPolicy rotationPolicy) { - return properties.isMultiSource() ? new SftpSupplierRotator(properties, rotationPolicy) : null; + + return (properties.isMultiSource()) ? new SftpSupplierRotator(rotationPolicy) : null; } static SessionFactory buildFactory(ApplicationContext applicationContext, @@ -84,9 +85,9 @@ static SessionFactory buildFactory(ApplicationContext appli sftpSessionFactory.setPrivateKey(factory.getPrivateKey()); sftpSessionFactory.setPrivateKeyPassphrase(factory.getPassPhrase()); sftpSessionFactory.setAllowUnknownKeys(factory.isAllowUnknownKeys()); - if (factory.getKnownHostsExpression() != null) { - String knownHostsLocation = factory.getKnownHostsExpression() - .getValue(IntegrationContextUtils.getEvaluationContext(applicationContext), String.class); + Expression knownHostsExpression = factory.getKnownHostsExpression(); + if (knownHostsExpression != null) { + String knownHostsLocation = knownHostsExpression.getValue(String.class); Resource knownHostsResource = applicationContext.getResource(knownHostsLocation); sftpSessionFactory.setKnownHostsResource(knownHostsResource); } @@ -94,7 +95,7 @@ static SessionFactory buildFactory(ApplicationContext appli return new CachingSessionFactory<>(sftpSessionFactory); } - public final static class DelegatingFactoryWrapper implements DisposableBean { + public static final class DelegatingFactoryWrapper implements DisposableBean { private final DelegatingSessionFactory delegatingSessionFactory; @@ -115,12 +116,12 @@ public DelegatingSessionFactory getFactory() { @Override public void destroy() { - this.factories.values().forEach(f -> { - if (f instanceof DisposableBean) { + this.factories.values().forEach((f) -> { + if (f instanceof DisposableBean disposableBean) { try { - ((DisposableBean) f).destroy(); + disposableBean.destroy(); } - catch (Exception e) { + catch (Exception ex) { // empty } } diff --git a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierProperties.java b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierProperties.java index 9fde6a4b..7cb1a349 100644 --- a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierProperties.java +++ b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,8 @@ import org.springframework.validation.annotation.Validated; /** + * The configuration properties for SFTP supplier. + * * @author Gary Russell * @author Artem Bilan * @author Chris Schaefer @@ -153,7 +155,7 @@ public class SftpSupplierProperties { @NotBlank public String getRemoteDir() { - return remoteDir; + return this.remoteDir; } public void setRemoteDir(String remoteDir) { @@ -162,7 +164,7 @@ public void setRemoteDir(String remoteDir) { @NotBlank public String getTmpFileSuffix() { - return tmpFileSuffix; + return this.tmpFileSuffix; } public void setTmpFileSuffix(String tmpFileSuffix) { @@ -171,7 +173,7 @@ public void setTmpFileSuffix(String tmpFileSuffix) { @NotBlank public String getRemoteFileSeparator() { - return remoteFileSeparator; + return this.remoteFileSeparator; } public void setRemoteFileSeparator(String remoteFileSeparator) { @@ -179,7 +181,7 @@ public void setRemoteFileSeparator(String remoteFileSeparator) { } public boolean isAutoCreateLocalDir() { - return autoCreateLocalDir; + return this.autoCreateLocalDir; } public void setAutoCreateLocalDir(boolean autoCreateLocalDir) { @@ -187,7 +189,7 @@ public void setAutoCreateLocalDir(boolean autoCreateLocalDir) { } public boolean isDeleteRemoteFiles() { - return deleteRemoteFiles; + return this.deleteRemoteFiles; } public void setDeleteRemoteFiles(boolean deleteRemoteFiles) { @@ -195,7 +197,7 @@ public void setDeleteRemoteFiles(boolean deleteRemoteFiles) { } public Expression getRenameRemoteFilesTo() { - return renameRemoteFilesTo; + return this.renameRemoteFilesTo; } public void setRenameRemoteFilesTo(Expression renameRemoteFilesTo) { @@ -204,7 +206,7 @@ public void setRenameRemoteFilesTo(Expression renameRemoteFilesTo) { @NotNull public File getLocalDir() { - return localDir; + return this.localDir; } public final void setLocalDir(File localDir) { @@ -212,7 +214,7 @@ public final void setLocalDir(File localDir) { } public String getFilenamePattern() { - return filenamePattern; + return this.filenamePattern; } public void setFilenamePattern(String filenamePattern) { @@ -220,7 +222,7 @@ public void setFilenamePattern(String filenamePattern) { } public Pattern getFilenameRegex() { - return filenameRegex; + return this.filenameRegex; } public void setFilenameRegex(Pattern filenameRegex) { @@ -228,7 +230,7 @@ public void setFilenameRegex(Pattern filenameRegex) { } public boolean isPreserveTimestamp() { - return preserveTimestamp; + return this.preserveTimestamp; } public void setPreserveTimestamp(boolean preserveTimestamp) { @@ -241,7 +243,7 @@ public boolean isExclusivePatterns() { } public boolean isListOnly() { - return listOnly; + return this.listOnly; } public void setListOnly(boolean listOnly) { @@ -253,7 +255,7 @@ public boolean isMultiSource() { } public int getMaxFetch() { - return maxFetch; + return this.maxFetch; } public void setMaxFetch(int maxFetch) { @@ -285,7 +287,7 @@ public void setDirectories(String[] directories) { } public boolean isStream() { - return stream; + return this.stream; } public void setStream(boolean stream) { @@ -293,11 +295,11 @@ public void setStream(boolean stream) { } public Factory getFactory() { - return factory; + return this.factory; } public Duration getDelayWhenEmpty() { - return delayWhenEmpty; + return this.delayWhenEmpty; } public void setDelayWhenEmpty(Duration delayWhenEmpty) { @@ -317,7 +319,7 @@ static List keyDirectories(SftpSupplierProperties p @Valid public SftpSupplierProperties.SortSpec getSortBy() { - return sortBy; + return this.sortBy; } public void setSortBy(SortSpec sortBy) { @@ -326,7 +328,7 @@ public void setSortBy(SortSpec sortBy) { @AssertTrue(message = "deleteRemoteFiles must be 'false' when renameRemoteFilesTo is set") public boolean isRenameRemoteFilesValid() { - return renameRemoteFilesTo == null || !deleteRemoteFiles; + return this.renameRemoteFilesTo == null || !this.deleteRemoteFiles; } public static class Factory { @@ -456,7 +458,7 @@ public static class SortSpec { @NotNull public Attribute getAttribute() { - return attribute; + return this.attribute; } public void setAttribute(Attribute attribute) { @@ -465,7 +467,7 @@ public void setAttribute(Attribute attribute) { @NotNull public Dir getDir() { - return dir; + return this.dir; } public void setDir(Dir dir) { @@ -506,21 +508,17 @@ public enum Dir { } private Comparator getAttributeComparator() { - switch (attribute) { - case FILENAME: - return Comparator.comparing(SftpClient.DirEntry::getFilename); - case ATIME: - return Comparator.comparing(x -> x.getAttributes().getAccessTime()); - case MTIME: - return Comparator.comparing(x -> x.getAttributes().getModifyTime()); - } - - throw new UnsupportedOperationException("Unsupported sortBy attribute: " + attribute); + return switch (this.attribute) { + case FILENAME -> Comparator.comparing(SftpClient.DirEntry::getFilename); + case ATIME -> Comparator.comparing((x) -> x.getAttributes().getAccessTime()); + case MTIME -> Comparator.comparing((x) -> x.getAttributes().getModifyTime()); + }; + } public Comparator comparator() { Comparator comparator = getAttributeComparator(); - return dir == Dir.ASC ? comparator : comparator.reversed(); + return (this.dir == Dir.ASC) ? comparator : comparator.reversed(); } } diff --git a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierRotator.java b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierRotator.java index 56743445..24cbbb48 100644 --- a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierRotator.java +++ b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierRotator.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,15 +31,12 @@ */ public class SftpSupplierRotator extends RotatingServerAdvice { - private static String SFTP_SELECTED_SERVER_PROPERTY_KEY = "sftp_selectedServer"; - - private final SftpSupplierProperties properties; + private static final String SFTP_SELECTED_SERVER_PROPERTY_KEY = "sftp_selectedServer"; private final StandardRotationPolicy rotationPolicy; - public SftpSupplierRotator(SftpSupplierProperties properties, StandardRotationPolicy rotationPolicy) { + public SftpSupplierRotator(StandardRotationPolicy rotationPolicy) { super(rotationPolicy); - this.properties = properties; this.rotationPolicy = rotationPolicy; } diff --git a/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/package-info.java b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/package-info.java new file mode 100644 index 00000000..a65d39f1 --- /dev/null +++ b/supplier/spring-sftp-supplier/src/main/java/org/springframework/cloud/fn/supplier/sftp/package-info.java @@ -0,0 +1,4 @@ +/** + * The SFTP supplier auto-configuration support. + */ +package org.springframework.cloud.fn.supplier.sftp; diff --git a/supplier/spring-sftp-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/supplier/spring-sftp-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 00000000..a49e0a85 --- /dev/null +++ b/supplier/spring-sftp-supplier/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.springframework.cloud.fn.supplier.sftp.SftpSupplierConfiguration diff --git a/supplier/spring-sftp-supplier/src/test/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierApplicationTests.java b/supplier/spring-sftp-supplier/src/test/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierApplicationTests.java index e5f3fac6..15535b6c 100644 --- a/supplier/spring-sftp-supplier/src/test/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierApplicationTests.java +++ b/supplier/spring-sftp-supplier/src/test/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2023 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,7 +70,7 @@ void setUpDefaultProperties() { @Test @SuppressWarnings("unchecked") void supplierForListOnly() { - defaultApplicationContextRunner.withPropertyValues("sftp.supplier.listOnly=true").run(context -> { + defaultApplicationContextRunner.withPropertyValues("sftp.supplier.listOnly=true").run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class); HashSet fileNames = new HashSet<>(); @@ -79,11 +79,11 @@ void supplierForListOnly() { fileNames .add(String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(), "sftpSource2.txt")); final AtomicReference> expectedFileNames = new AtomicReference<>(fileNames); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { assertThat(expectedFileNames.get()).contains(message.getPayload()); expectedFileNames.get().remove(message.getPayload()); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isEqualTo(MediaType.TEXT_PLAIN); - }).assertNext(message -> { + }).assertNext((message) -> { assertThat(expectedFileNames.get()).contains(message.getPayload()); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isEqualTo(MediaType.TEXT_PLAIN); }).thenCancel().verify(Duration.ofSeconds(30)); @@ -96,12 +96,11 @@ void supplierForListOnly() { void supplierForListOnlyWithPatternFilter() { defaultApplicationContextRunner .withPropertyValues("sftp.supplier.listOnly=true", "sftp.supplier.file-name-pattern=.*1.txt") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); - SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class); StepVerifier.create(sftpSupplier.get()) - .assertNext(message -> assertThat(message.getPayload()).contains("sftpSource1.txt")) + .assertNext((message) -> assertThat(message.getPayload()).contains("sftpSource1.txt")) .thenCancel() .verify(Duration.ofSeconds(30)); @@ -114,7 +113,7 @@ void supplierForListSortedByFilenameAsc() { defaultApplicationContextRunner .withPropertyValues("sftp.supplier.listOnly=true", "sftp.supplier.sortBy.attribute=filename", "sftp.supplier.sortBy.dir=asc") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class); List fileNames = new ArrayList<>(); @@ -123,10 +122,10 @@ void supplierForListSortedByFilenameAsc() { fileNames.add( String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(), "sftpSource2.txt")); final AtomicReference> expectedFileNames = new AtomicReference<>(fileNames); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(0)); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isEqualTo(MediaType.TEXT_PLAIN); - }).assertNext(message -> { + }).assertNext((message) -> { assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(1)); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isEqualTo(MediaType.TEXT_PLAIN); }).thenCancel().verify(Duration.ofSeconds(30)); @@ -140,7 +139,7 @@ void supplierForListSortedByFilenameDesc() { defaultApplicationContextRunner .withPropertyValues("sftp.supplier.listOnly=true", "sftp.supplier.sortBy.attribute=filename", "sftp.supplier.sortBy.dir=desc") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class); List fileNames = new ArrayList<>(); @@ -149,10 +148,10 @@ void supplierForListSortedByFilenameDesc() { fileNames.add( String.join(properties.getRemoteFileSeparator(), properties.getRemoteDir(), "sftpSource1.txt")); final AtomicReference> expectedFileNames = new AtomicReference<>(fileNames); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(0)); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isEqualTo(MediaType.TEXT_PLAIN); - }).assertNext(message -> { + }).assertNext((message) -> { assertThat(message.getPayload()).isEqualTo(expectedFileNames.get().get(1)); assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isEqualTo(MediaType.TEXT_PLAIN); }).thenCancel().verify(Duration.ofSeconds(30)); @@ -166,7 +165,7 @@ void supplierForFileRef() { defaultApplicationContextRunner .withPropertyValues("sftp.supplier.localDir=" + getTargetLocalDirectory().getAbsolutePath(), "file.consumer.mode=ref") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class); MetadataStore metadataStore = context.getBean(MetadataStore.class); @@ -174,13 +173,13 @@ void supplierForFileRef() { fileNames.add(properties.getLocalDir() + File.separator + "sftpSource1.txt"); fileNames.add(properties.getLocalDir() + File.separator + "sftpSource2.txt"); final AtomicReference> expectedFileNames = new AtomicReference<>(fileNames); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { File file = message.getPayload(); assertThat(expectedFileNames.get()).contains(file.getAbsolutePath()); expectedFileNames.get().remove(file.getAbsolutePath()); }) .expectNextMatches( - message -> expectedFileNames.get().contains(message.getPayload().getAbsolutePath())) + (message) -> expectedFileNames.get().contains(message.getPayload().getAbsolutePath())) .thenCancel() .verify(Duration.ofSeconds(30)); @@ -198,11 +197,11 @@ void supplierForFileRef() { void deleteRemoteFiles() { defaultApplicationContextRunner .withPropertyValues("sftp.supplier.stream=true", "sftp.supplier.delete-remote-files=true") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); StepVerifier.create(sftpSupplier.get()) - .expectNextMatches(message -> message.getPayload().length > 0) - .expectNextMatches(message -> message.getPayload().length > 0) + .expectNextMatches((message) -> message.getPayload().length > 0) + .expectNextMatches((message) -> message.getPayload().length > 0) .thenCancel() .verify(Duration.ofSeconds(30)); await().atMost(Duration.ofSeconds(30)).until(() -> getSourceRemoteDirectory().list().length == 0); @@ -234,8 +233,8 @@ private void doTestRenameRemoteFiles(AssertableApplicationContext context) { .collect(Collectors.toSet()); StepVerifier.create(sftpSupplier.get()) - .expectNextMatches(message -> message.getPayload().length > 0) - .expectNextMatches(message -> message.getPayload().length > 0) + .expectNextMatches((message) -> message.getPayload().length > 0) + .expectNextMatches((message) -> message.getPayload().length > 0) .thenCancel() .verify(Duration.ofSeconds(30)); await().atMost(Duration.ofSeconds(30)) @@ -247,33 +246,30 @@ private void doTestRenameRemoteFiles(AssertableApplicationContext context) { @SuppressWarnings("unchecked") public void streamSourceFilesInLineMode() { defaultApplicationContextRunner + // ensure public key was used .withPropertyValues("sftp.supplier.stream=true", "sftp.supplier.factory.private-key = classpath:id_rsa_pp", - "sftp.supplier.factory.passphrase = secret", "sftp.supplier.factory.password = badPassword", // ensure - // public - // key - // was - // used + "sftp.supplier.factory.passphrase = secret", "sftp.supplier.factory.password = badPassword", "sftp.supplier.delete-remote-files=true", "file.consumer.mode=lines", "file.consumer.with-markers=true", "file.consumer.markers-json=true") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { final Object evaluate; try { evaluate = JsonPathUtils.evaluate(message.getPayload(), "$.mark"); assertThat(evaluate).isEqualTo(FileSplitter.FileMarker.Mark.START.name()); } - catch (IOException e) { - fail(e.getMessage()); + catch (IOException ex) { + fail(ex.getMessage()); } - }).expectNextMatches(message -> message.getPayload().startsWith("source")).assertNext(message -> { + }).expectNextMatches((message) -> message.getPayload().startsWith("source")).assertNext((message) -> { final Object evaluate; try { evaluate = JsonPathUtils.evaluate(message.getPayload(), "$.mark"); assertThat(evaluate).isEqualTo(FileSplitter.FileMarker.Mark.END.name()); } - catch (IOException e) { - fail(e.getMessage()); + catch (IOException ex) { + fail(ex.getMessage()); } }).thenCancel().verify(Duration.ofSeconds(30)); }); @@ -298,7 +294,7 @@ void supplierWithMultiSourceAndStreamContentsSource3ComesSecond() throws Excepti "sftp.supplier.factories.two.allowUnknownKeys = true", "sftp.supplier.directories=one.sftpSource,two.sftpSecondSource", "sftp.supplier.max-fetch=1", "sftp.supplier.fair=true") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); HashSet contents = new HashSet<>(); @@ -306,14 +302,14 @@ void supplierWithMultiSourceAndStreamContentsSource3ComesSecond() throws Excepti contents.add("source2"); final AtomicReference> expectedContentsOfAllFiles = new AtomicReference<>(contents); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { String payload = new String(message.getPayload()); assertThat(expectedContentsOfAllFiles.get()).contains(payload); expectedContentsOfAllFiles.get().remove(payload); }) - .expectNextMatches(message -> new String(message.getPayload()).equals("source3")) - .expectNextMatches( - message -> expectedContentsOfAllFiles.get().contains(new String(message.getPayload()))) + .expectNextMatches((message) -> new String(message.getPayload()).equals("source3")) + .expectNextMatches((message) -> expectedContentsOfAllFiles.get() + .contains(new String(message.getPayload()))) .thenCancel() .verify(Duration.ofSeconds(30)); }); @@ -348,7 +344,7 @@ void supplierMultiSourceRefTestsFor200Alex() throws Exception { "sftp.supplier.factories.empty.allowUnknownKeys = true", "sftp.supplier.directories=one.sftpSource,two.sftpSecondSource,empty.sftpSource", "sftp.supplier.max-fetch=1", "sftp.supplier.fair=true") - .run(context -> { + .run((context) -> { Supplier>> sftpSupplier = context.getBean("sftpSupplier", Supplier.class); SftpSupplierProperties properties = context.getBean(SftpSupplierProperties.class); String localDir = properties.getLocalDir().getPath(); @@ -358,15 +354,15 @@ void supplierMultiSourceRefTestsFor200Alex() throws Exception { final AtomicReference> expectedFirstSourcePaths = new AtomicReference<>( firstSourceFiles); - StepVerifier.create(sftpSupplier.get()).assertNext(message -> { + StepVerifier.create(sftpSupplier.get()).assertNext((message) -> { assertThat(expectedFirstSourcePaths.get()).contains(message.getPayload().getPath()); expectedFirstSourcePaths.get().remove(message.getPayload().getPath()); }) - .expectNextMatches(message -> message.getPayload() + .expectNextMatches((message) -> message.getPayload() .getPath() .equals(Paths.get(localDir, "sftpSource3.txt").toString())) .expectNextMatches( - message -> expectedFirstSourcePaths.get().contains(message.getPayload().getPath())) + (message) -> expectedFirstSourcePaths.get().contains(message.getPayload().getPath())) .thenCancel() .verify(Duration.ofSeconds(10)); });