Skip to content

Commit

Permalink
GH-2: Expose a @MessagingGateway for sftpConsumer
Browse files Browse the repository at this point in the history
Fixes: #2

* To avoid `@Lazy` and other possible injection problems
when an `IntegrationFlow` is not ready yet, provide a `@MessagingGateway(name = "sftpConsumer")`
explicitly for the `MessageConsumer` in the `SftpConsumerConfiguration`

* Make SFTP modules as auto-configuration
* Fix Checkstyle violations in those modules
  • Loading branch information
artembilan committed Jan 8, 2024
1 parent 985a753 commit 56f75fb
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 232 deletions.
2 changes: 1 addition & 1 deletion consumer/spring-sftp-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ A consumer that allows you to SFTP files.

## Beans for injection

You can import `SftpConsumerConfiguration` in the application and then inject the following bean.
The `SftpConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> sftpConsumer`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,13 +20,13 @@

import org.apache.sshd.sftp.client.SftpClient;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpMessageHandlerSpec;
Expand All @@ -35,12 +35,12 @@
import org.springframework.messaging.Message;

/**
* Configuration for SFTP Consumer.
* Auto-configuration for SFTP Consumer.
*
* @author Soby Chacko
* @author Corneil du Plessis
*/
@Configuration(proxyBeanMethods = false)
@AutoConfiguration
@EnableConfigurationProperties(SftpConsumerProperties.class)
@Import(SftpConsumerSessionFactoryConfiguration.class)
public class SftpConsumerConfiguration {
Expand All @@ -50,9 +50,6 @@ public IntegrationFlow ftpOutboundFlow(SftpConsumerProperties properties,
SessionFactory<SftpClient.DirEntry> ftpSessionFactory,
@Nullable ComponentCustomizer<SftpMessageHandlerSpec> sftpMessageHandlerSpecCustomizer) {

IntegrationFlowBuilder integrationFlowBuilder = IntegrationFlow.from(MessageConsumer.class,
(gateway) -> gateway.beanName("sftpConsumer"));

SftpMessageHandlerSpec handlerSpec = Sftp
.outboundAdapter(new SftpRemoteFileTemplate(ftpSessionFactory), properties.getMode())
.remoteDirectory(properties.getRemoteDir())
Expand All @@ -69,10 +66,11 @@ public IntegrationFlow ftpOutboundFlow(SftpConsumerProperties properties,
sftpMessageHandlerSpecCustomizer.customize(handlerSpec);
}

return integrationFlowBuilder.handle(handlerSpec).get();
return (flow) -> flow.handle(handlerSpec);
}

private interface MessageConsumer extends Consumer<Message<?>> {
@MessagingGateway(name = "sftpConsumer", defaultRequestChannel = "ftpOutboundFlow.input")
public interface MessageConsumer extends Consumer<Message<?>> {

}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,8 @@
import org.springframework.validation.annotation.Validated;

/**
* The configuration properties for SFTP consumer.
*
* @author Gary Russell
* @author Artem Bilan
* @author Corneil du Plessis
Expand Down Expand Up @@ -122,7 +124,7 @@ public void setFilenameExpression(String filenameExpression) {

@NotBlank
public String getRemoteDir() {
return remoteDir;
return this.remoteDir;
}

public final void setRemoteDir(String remoteDir) {
Expand All @@ -131,7 +133,7 @@ public final void setRemoteDir(String remoteDir) {

@NotBlank
public String getTmpFileSuffix() {
return tmpFileSuffix;
return this.tmpFileSuffix;
}

public void setTmpFileSuffix(String tmpFileSuffix) {
Expand All @@ -140,7 +142,7 @@ public void setTmpFileSuffix(String tmpFileSuffix) {

@NotBlank
public String getRemoteFileSeparator() {
return remoteFileSeparator;
return this.remoteFileSeparator;
}

public void setRemoteFileSeparator(String remoteFileSeparator) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.expression.Expression;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
Expand Down Expand Up @@ -51,9 +51,9 @@ public SessionFactory<SftpClient.DirEntry> sftpSessionFactory(SftpConsumerProper
sftpSessionFactory.setPrivateKey(factory.getPrivateKey());
sftpSessionFactory.setPrivateKeyPassphrase(factory.getPassPhrase());
sftpSessionFactory.setAllowUnknownKeys(factory.isAllowUnknownKeys());
if (factory.getKnownHostsExpression() != null) {
String knownHostsLocation = factory.getKnownHostsExpression()
.getValue(IntegrationContextUtils.getEvaluationContext(applicationContext), String.class);
Expression knownHostsExpression = factory.getKnownHostsExpression();
if (knownHostsExpression != null) {
String knownHostsLocation = knownHostsExpression.getValue(String.class);
Resource knownHostsResource = applicationContext.getResource(knownHostsLocation);
sftpSessionFactory.setKnownHostsResource(knownHostsResource);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The SFTP consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.sftp;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.sftp.SftpConsumerConfiguration
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,26 +20,14 @@

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationPropertiesBinding;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.cloud.fn.common.config.SpelExpressionConverterConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.convert.converter.Converter;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ParseException;
import org.springframework.expression.spel.standard.SpelExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.IntegrationConverter;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.test.util.TestUtils;
Expand Down Expand Up @@ -160,50 +148,9 @@ static class Conf {
@Configuration
@EnableConfigurationProperties(SftpConsumerProperties.class)
@EnableIntegration
@Import(SftpConsumerSessionFactoryConfiguration.class)
@Import({ SpelExpressionConverterConfiguration.class, SftpConsumerSessionFactoryConfiguration.class })
static class Factory {

@Bean
@ConfigurationPropertiesBinding
@IntegrationConverter
public Converter<String, Expression> spelConverter() {
return new SpelConverter();
}

/**
* TODO: This needs to be refactored into a generic place for any functions to
* use.
*
* A simple converter from String to Expression.
*
* @author Eric Bottard
*/
public static class SpelConverter implements Converter<String, Expression> {

private SpelExpressionParser parser = new SpelExpressionParser();

@Autowired
@Qualifier(IntegrationContextUtils.INTEGRATION_EVALUATION_CONTEXT_BEAN_NAME)
@Lazy
private EvaluationContext evaluationContext;

@Override
public Expression convert(String source) {
try {
Expression expression = this.parser.parseExpression(source);
if (expression instanceof SpelExpression) {
((SpelExpression) expression).setEvaluationContext(this.evaluationContext);
}
return expression;
}
catch (ParseException e) {
throw new IllegalArgumentException(
String.format("Could not convert '%s' into a SpEL expression", source), e);
}
}

}

}

}
6 changes: 3 additions & 3 deletions supplier/spring-sftp-supplier/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ When configuring the `sftp.factory.known-hosts-expression` option, the root obje

## Idempotency

By default the supplier uses a https://docs.spring.io/spring-integration/api/org/springframework/integration/metadata/SimpleMetadataStore.html[SimpleMetadataStore], storing the last modified time to track files that have already been processed in memory.
By default, the supplier uses a https://docs.spring.io/spring-integration/api/org/springframework/integration/metadata/SimpleMetadataStore.html[SimpleMetadataStore], storing the last modified time to track files that have already been processed in memory.
If an application using this supplier is restarted, any existing files will be reprocessed. You can inject on of the persistent https://docs.spring.io/spring-integration/reference/html/meta-data-store.html[MetadataStore implementations] provided by Spring Integration, or your own of course, to maintain this state permanently.
See also link:../../common/metadata-store-common/README.adoc[`MetadataStore`] options for possible shared persistent store configuration for the `SftpPersistentAcceptOnceFileListFilter` used in the SFTP Source.

Expand Down Expand Up @@ -60,7 +60,7 @@ Users have to subscribe to this `Flux` and receive the data.

## Beans for injection

You can import the `SftpSupplierConfiguration` in the application and then inject the following bean.
The `SftpSupplierConfiguration` auto-configuration provides the following bean:

`sftpSupplier`

Expand All @@ -76,7 +76,7 @@ All configuration properties are prefixed with `sftp.supplier`.
There are also properties that need to be used with the prefix `file.consumer`.

For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/sftp/SftpSupplierProperties.java[SftpSupplierProperties].
Also see link:src/main/java/org/springframework/cloud/fn/supplier/file/FileConsumerProperties.java[FileConsumerProperties].
Also see `FileConsumerProperties`.

## Examples

Expand Down
Loading

0 comments on commit 56f75fb

Please sign in to comment.