Skip to content

Commit

Permalink
Make S3 modules as auto-configuration
Browse files Browse the repository at this point in the history
* Fix all the Checkstyle violations and compiler warnings
* Fix HTTP Supplier props prefix in the README
  • Loading branch information
artembilan committed Jan 8, 2024
1 parent 090ca28 commit 373fbdd
Show file tree
Hide file tree
Showing 17 changed files with 93 additions and 70 deletions.
2 changes: 1 addition & 1 deletion consumer/spring-s3-consumer/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ The consumer uses the AWS S3 support from Spring Integration and Spring Cloud AW

== Beans for injection

You can import `AwsS3ConsumerConfiguration` in the application and then inject the following bean.
The `AwsS3ConsumerConfiguration` auto-configuration provides the following bean:

`Consumer<Message<?>> s3Consumer`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,16 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import io.awspring.cloud.autoconfigure.s3.S3TransferManagerAutoConfiguration;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.integration.aws.outbound.S3MessageHandler;
Expand All @@ -39,7 +41,12 @@
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;

@Configuration(proxyBeanMethods = false)
/**
* Auto-configuration for S3 consumer.
*
* @author Artem Bilan
*/
@AutoConfiguration(after = S3TransferManagerAutoConfiguration.class)
@EnableConfigurationProperties(AwsS3ConsumerProperties.class)
public class AwsS3ConsumerConfiguration {

Expand All @@ -52,7 +59,7 @@ public Consumer<Message<?>> s3Consumer(IntegrationFlow s3ConsumerFlow) {
public IntegrationFlow s3ConsumerFlow(@Nullable TransferListener transferListener,
MessageHandler amazonS3MessageHandler) {

return flow -> flow.enrichHeaders(headers -> headers.header(AwsHeaders.TRANSFER_LISTENER, transferListener))
return (flow) -> flow.enrichHeaders((headers) -> headers.header(AwsHeaders.TRANSFER_LISTENER, transferListener))
.handle(amazonS3MessageHandler);
}

Expand All @@ -62,17 +69,19 @@ public MessageHandler amazonS3MessageHandler(S3TransferManager s3TransferManager
@Nullable BiConsumer<PutObjectRequest.Builder, Message<?>> uploadMetadataProvider) {

Expression bucketExpression = s3ConsumerProperties.getBucketExpression();
if (s3ConsumerProperties.getBucket() != null) {
bucketExpression = new ValueExpression<>(s3ConsumerProperties.getBucket());
String bucket = s3ConsumerProperties.getBucket();
if (bucket != null) {
bucketExpression = new ValueExpression<>(bucket);
}

S3MessageHandler s3MessageHandler = new S3MessageHandler(s3TransferManager, bucketExpression);
s3MessageHandler.setKeyExpression(s3ConsumerProperties.getKeyExpression());

Expression aclExpression;

if (s3ConsumerProperties.getAcl() != null) {
aclExpression = new ValueExpression<>(s3ConsumerProperties.getAcl());
ObjectCannedACL acl = s3ConsumerProperties.getAcl();
if (acl != null) {
aclExpression = new ValueExpression<>(acl);
}
else {
aclExpression = s3ConsumerProperties.getAclExpression();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@
import org.springframework.validation.annotation.Validated;

/**
* Configuration properties for S3 consumer.
*
* @author Artem Bilan
*/
@ConfigurationProperties("s3.consumer")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* The S3 consumer auto-configuration support.
*/
package org.springframework.cloud.fn.consumer.s3;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.springframework.cloud.fn.consumer.s3.AwsS3ConsumerConfiguration
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,10 +51,8 @@ public void test() throws Exception {

this.s3Consumer.accept(message);

ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor
.forClass(PutObjectRequest.class);
ArgumentCaptor<AsyncRequestBody> asyncRequestBodyArgumentCaptor = ArgumentCaptor
.forClass(AsyncRequestBody.class);
ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor.captor();
ArgumentCaptor<AsyncRequestBody> asyncRequestBodyArgumentCaptor = ArgumentCaptor.captor();
verify(amazonS3Client, atLeastOnce()).putObject(putObjectRequestArgumentCaptor.capture(),
asyncRequestBodyArgumentCaptor.capture());

Expand All @@ -68,7 +66,7 @@ public void test() throws Exception {

AsyncRequestBody asyncRequestBody = asyncRequestBodyArgumentCaptor.getValue();
StepVerifier.create(asyncRequestBody)
.assertNext(buffer -> assertThat(TestUtils.getPropertyValue(buffer, "hb", byte[].class)).isEmpty())
.assertNext((buffer) -> assertThat(TestUtils.getPropertyValue(buffer, "hb", byte[].class)).isEmpty())
.expectComplete()
.verify();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,10 +51,8 @@ public void test() throws Exception {

this.s3Consumer.accept(message);

ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor
.forClass(PutObjectRequest.class);
ArgumentCaptor<AsyncRequestBody> asyncRequestBodyArgumentCaptor = ArgumentCaptor
.forClass(AsyncRequestBody.class);
ArgumentCaptor<PutObjectRequest> putObjectRequestArgumentCaptor = ArgumentCaptor.captor();
ArgumentCaptor<AsyncRequestBody> asyncRequestBodyArgumentCaptor = ArgumentCaptor.captor();
verify(amazonS3Client, atLeastOnce()).putObject(putObjectRequestArgumentCaptor.capture(),
asyncRequestBodyArgumentCaptor.capture());

Expand All @@ -67,7 +65,7 @@ public void test() throws Exception {
assertThat(putObjectRequest.contentDisposition()).isEqualTo("test.json");

AsyncRequestBody asyncRequestBody = asyncRequestBodyArgumentCaptor.getValue();
StepVerifier.create(asyncRequestBody.map(buffer -> StandardCharsets.UTF_8.decode(buffer).toString()))
StepVerifier.create(asyncRequestBody.map((buffer) -> StandardCharsets.UTF_8.decode(buffer).toString()))
.expectNext("a")
.expectComplete()
.verify();
Expand Down
2 changes: 1 addition & 1 deletion supplier/spring-http-supplier/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Once injected, you can use the `get` method of the `Supplier` to invoke it and t

## Configuration Options

All configuration properties are prefixed with `http`.
All configuration properties are prefixed with `http.supplier`.

For more information on the various options available, please see link:src/main/java/org/springframework/cloud/fn/supplier/http/HttpSupplierProperties.java[HttpSupplierProperties].

Expand Down
2 changes: 1 addition & 1 deletion supplier/spring-s3-supplier/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Users have to subscribe to this `Flux` and receive the data.

== Beans for injection

You can import the `AwsS3SupplierConfiguration` in the application and then inject the following bean.
The `AwsS3SupplierConfiguration` auto-configuration provides the following bean:

`s3Supplier`

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,8 +30,10 @@
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.S3Object;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.aws.s3.AmazonS3Configuration;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.common.file.FileConsumerProperties;
import org.springframework.cloud.fn.common.file.FileUtils;
Expand All @@ -55,10 +57,12 @@
import org.springframework.util.StringUtils;

/**
* Auto-configuration for S3 supplier.
*
* @author Artem Bilan
* @author David Turanski
*/
@Configuration(proxyBeanMethods = false)
@AutoConfiguration(after = AmazonS3Configuration.class)
@EnableConfigurationProperties({ AwsS3SupplierProperties.class, FileConsumerProperties.class })
public class AwsS3SupplierConfiguration {

Expand Down Expand Up @@ -87,12 +91,12 @@ public AwsS3SupplierConfiguration(AwsS3SupplierProperties awsS3SupplierPropertie
static class SynchronizingConfiguration extends AwsS3SupplierConfiguration {

@Bean
public Supplier<Flux<Message<?>>> s3Supplier(Publisher<Message<?>> s3SupplierFlow) {
Supplier<Flux<Message<?>>> s3Supplier(Publisher<Message<?>> s3SupplierFlow) {
return () -> Flux.from(s3SupplierFlow);
}

@Bean
public ChainFileListFilter<S3Object> filter(ConcurrentMetadataStore metadataStore) {
ChainFileListFilter<S3Object> filter(ConcurrentMetadataStore metadataStore) {
ChainFileListFilter<S3Object> chainFilter = new ChainFileListFilter<>();
if (StringUtils.hasText(this.awsS3SupplierProperties.getFilenamePattern())) {
chainFilter
Expand All @@ -115,19 +119,19 @@ else if (this.awsS3SupplierProperties.getFilenameRegex() != null) {
}

@Bean
public Publisher<Message<Object>> s3SupplierFlow(S3InboundFileSynchronizingMessageSource s3MessageSource) {
Publisher<Message<Object>> s3SupplierFlow(S3InboundFileSynchronizingMessageSource s3MessageSource) {
return FileUtils
.enhanceFlowForReadingMode(
IntegrationFlow.from(IntegrationReactiveUtils.messageSourceToFlux(s3MessageSource)
.doOnSubscribe((s) -> s3MessageSource.start())),
fileConsumerProperties)
this.fileConsumerProperties)
.toReactivePublisher(true);
}

@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S3Object> filter) {
S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S3Object> filter) {

S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(s3SessionFactory);
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(this.s3SessionFactory);
synchronizer.setDeleteRemoteFiles(this.awsS3SupplierProperties.isDeleteRemoteFiles());
synchronizer.setPreserveTimestamp(this.awsS3SupplierProperties.isPreserveTimestamp());
String remoteDir = this.awsS3SupplierProperties.getRemoteDir();
Expand All @@ -140,8 +144,7 @@ public S3InboundFileSynchronizer s3InboundFileSynchronizer(ChainFileListFilter<S
}

@Bean
public S3InboundFileSynchronizingMessageSource s3MessageSource(
S3InboundFileSynchronizer s3InboundFileSynchronizer,
S3InboundFileSynchronizingMessageSource s3MessageSource(S3InboundFileSynchronizer s3InboundFileSynchronizer,
@Nullable ComponentCustomizer<S3InboundFileSynchronizingMessageSource> s3MessageSourceCustomizer) {

S3InboundFileSynchronizingMessageSource s3MessageSource = new S3InboundFileSynchronizingMessageSource(
Expand Down Expand Up @@ -170,18 +173,18 @@ static class ListOnlyConfiguration extends AwsS3SupplierConfiguration {
}

@Bean
public Supplier<Flux<Message<Object>>> s3Supplier(Publisher<Message<Object>> s3SupplierFlow) {
Supplier<Flux<Message<Object>>> s3Supplier(Publisher<Message<Object>> s3SupplierFlow) {
return () -> Flux.from(s3SupplierFlow);
}

@Bean
public Publisher<Message<Object>> s3SupplierFlow(ReactiveMessageSourceProducer s3ListingProducer) {
Publisher<Message<Object>> s3SupplierFlow(ReactiveMessageSourceProducer s3ListingProducer) {
return IntegrationFlow.from(s3ListingProducer).split().toReactivePublisher(true);
}

@Bean
Predicate<S3Object> listOnlyFilter(AwsS3SupplierProperties awsS3SupplierProperties) {
Predicate<S3Object> predicate = s -> true;
Predicate<S3Object> predicate = (s) -> true;
if (StringUtils.hasText(this.awsS3SupplierProperties.getFilenamePattern())) {
Pattern pattern = Pattern.compile(this.awsS3SupplierProperties.getFilenamePattern());
predicate = (S3Object summary) -> pattern.matcher(summary.key()).matches();
Expand All @@ -197,7 +200,7 @@ else if (this.awsS3SupplierProperties.getFilenameRegex() != null) {
final String storedLastModified = this.metadataStore.get(key);
boolean result = !lastModified.equals(storedLastModified);
if (result) {
metadataStore.put(key, lastModified);
this.metadataStore.put(key, lastModified);
}
return result;
});
Expand All @@ -214,7 +217,7 @@ ReactiveMessageSourceProducer s3ListingMessageProducer(S3Client amazonS3, Object
.contents()
.stream()
.filter(filter)
.map(s3Object -> {
.map((s3Object) -> {
try {
return objectMapper.writeValueAsString(s3Object.toBuilder());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,8 @@
import org.springframework.validation.annotation.Validated;

/**
* The configuration properties for S3 supplier.
*
* @author Artem Bilan
*/
@ConfigurationProperties("s3.supplier")
Expand Down Expand Up @@ -95,7 +97,7 @@ public final void setRemoteDir(String remoteDir) {

@NotBlank
public String getTmpFileSuffix() {
return tmpFileSuffix;
return this.tmpFileSuffix;
}

public void setTmpFileSuffix(String tmpFileSuffix) {
Expand All @@ -104,23 +106,23 @@ public void setTmpFileSuffix(String tmpFileSuffix) {

@NotBlank
public String getRemoteFileSeparator() {
return remoteFileSeparator;
return this.remoteFileSeparator;
}

public void setRemoteFileSeparator(String remoteFileSeparator) {
this.remoteFileSeparator = remoteFileSeparator;
}

public boolean isAutoCreateLocalDir() {
return autoCreateLocalDir;
return this.autoCreateLocalDir;
}

public void setAutoCreateLocalDir(boolean autoCreateLocalDir) {
this.autoCreateLocalDir = autoCreateLocalDir;
}

public boolean isDeleteRemoteFiles() {
return deleteRemoteFiles;
return this.deleteRemoteFiles;
}

public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
Expand All @@ -129,31 +131,31 @@ public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {

@NotNull
public File getLocalDir() {
return localDir;
return this.localDir;
}

public final void setLocalDir(File localDir) {
this.localDir = localDir;
}

public String getFilenamePattern() {
return filenamePattern;
return this.filenamePattern;
}

public void setFilenamePattern(String filenamePattern) {
this.filenamePattern = filenamePattern;
}

public Pattern getFilenameRegex() {
return filenameRegex;
return this.filenameRegex;
}

public void setFilenameRegex(Pattern filenameRegex) {
this.filenameRegex = filenameRegex;
}

public boolean isPreserveTimestamp() {
return preserveTimestamp;
return this.preserveTimestamp;
}

public void setPreserveTimestamp(boolean preserveTimestamp) {
Expand All @@ -166,7 +168,7 @@ public boolean isExclusivePatterns() {
}

public boolean isListOnly() {
return listOnly;
return this.listOnly;
}

public void setListOnly(boolean listOnly) {
Expand Down
Loading

0 comments on commit 373fbdd

Please sign in to comment.