diff --git a/supplier/spring-file-supplier/README.adoc b/supplier/spring-file-supplier/README.adoc index 83d2584a..35d0d81b 100644 --- a/supplier/spring-file-supplier/README.adoc +++ b/supplier/spring-file-supplier/README.adoc @@ -1,4 +1,4 @@ -# File Supplier += File Supplier This module provides a File supplier that can be reused and composed in other applications. The `Supplier` uses the `FileInboundChannelAdapter` from Spring Integration. @@ -6,7 +6,11 @@ The `Supplier` uses the `FileInboundChannelAdapter` from Spring Integration. This supplier gives you a reactive stream of files from the provided directory as the supplier has a signature of `Supplier>>`. Users have to subscribe to this `Flux` and receive the data. -## Beans for injection +This module also can be configured for file tailing functionality. +The implementation and logic is fully based on respective Spring Integration https://docs.spring.io/spring-integration/reference/file/reading.html#file-tailing[Tailing Message Producer]. +The `Supplier>>` emits messages for each tailed line from the file. + +== Beans for injection The `FileSupplierConfiguration` auto-configuration provides the following bean: @@ -18,7 +22,7 @@ You can use `fileSupplier` as a qualifier when injecting. Once injected, you can use the `get` method of the `Supplier` to invoke it and then subscribe to the returned `Flux`. -## Configuration Options +== Configuration Options All configuration properties are prefixed with `file.supplier`. There are also properties that need to be used with the prefix `file.consumer`. @@ -28,10 +32,14 @@ See `FileConsumerProperties` also. A `ComponentCustomizer` bean can be added in the target project to provide any custom options for the `FileInboundChannelAdapterSpec` configuration used by the `fileSupplier`. -## Tests +If `file.supplier.tail` option is provided, this supplier works in a tailing file mode. +See `FileSupplierProperties.Tailer` container for more information. +In `tail` mode, all other options for directory polling are ignored. + +== Tests See this link:src/test/java/org/springframework/cloud/fn/supplier/file[test suite] for the various ways, this supplier is used. -## Other usage +== Other usage See this https://github.com/spring-cloud/stream-applications/blob/master/applications/source/file-source/README.adoc[README] where this supplier is used to create a Spring Cloud Stream application where it makes a File Source. diff --git a/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java b/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java index fff34fc8..d926d58f 100644 --- a/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java +++ b/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierConfiguration.java @@ -26,17 +26,21 @@ import org.springframework.beans.factory.BeanInitializationException; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.fn.common.config.ComponentCustomizer; import org.springframework.cloud.fn.common.file.FileConsumerProperties; import org.springframework.cloud.fn.common.file.FileReadingMode; import org.springframework.cloud.fn.common.file.FileUtils; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.JavaUtils; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlowBuilder; import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.file.dsl.FileInboundChannelAdapterSpec; import org.springframework.integration.file.dsl.Files; +import org.springframework.integration.file.dsl.TailAdapterSpec; import org.springframework.integration.file.filters.ChainFileListFilter; import org.springframework.integration.file.filters.FileListFilter; import org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter; @@ -58,78 +62,113 @@ @EnableConfigurationProperties({ FileSupplierProperties.class, FileConsumerProperties.class }) public class FileSupplierConfiguration { - private static final String METADATA_STORE_PREFIX = "local-file-system-metadata-"; - - private final FileSupplierProperties fileSupplierProperties; - - private final FileConsumerProperties fileConsumerProperties; - - public FileSupplierConfiguration(FileSupplierProperties fileSupplierProperties, - FileConsumerProperties fileConsumerProperties) { - - this.fileSupplierProperties = fileSupplierProperties; - this.fileConsumerProperties = fileConsumerProperties; + @Bean + @ConditionalOnProperty(prefix = "file.supplier", name = "tail") + public Publisher> fileTailingFlow(FileSupplierProperties fileSupplierProperties) { + FileSupplierProperties.Tailer tail = fileSupplierProperties.getTailer(); + TailAdapterSpec tailAdapterSpec = Files.tailAdapter(fileSupplierProperties.getTail()) + // TODO until Spring Integration 6.2.3 + .autoStartup(false) + .fileDelay(tail.getAttemptsDelay().toMillis()) + // OS native tail command + .nativeOptions(tail.getNativeOptions()) + .enableStatusReader(tail.isStatusReader()); + + JavaUtils.INSTANCE + .acceptIfNotNull(tail.getIdleEventInterval(), + (duration) -> tailAdapterSpec.idleEventInterval(duration.toMillis())) + // Apache Commons Tailer + .acceptIfNotNull(tail.isEnd(), tailAdapterSpec::end) + .acceptIfNotNull(tail.isReopen(), tailAdapterSpec::reopen) + .acceptIfNotNull(tail.getPollingDelay(), (duration) -> tailAdapterSpec.delay(duration.toMillis())); + + return IntegrationFlow.from(tailAdapterSpec).toReactivePublisher(true); } @Bean - public ChainFileListFilter filter(ConcurrentMetadataStore metadataStore) { - ChainFileListFilter chainFilter = new ChainFileListFilter<>(); - if (StringUtils.hasText(this.fileSupplierProperties.getFilenamePattern())) { - chainFilter.addFilter(new SimplePatternFileListFilter(this.fileSupplierProperties.getFilenamePattern())); + public Supplier>> fileSupplier(FileConsumerProperties fileConsumerProperties, + @Nullable Flux> fileMessageFlux, @Nullable Publisher> fileReadingFlow, + @Nullable Publisher> fileTailingFlow) { + + if (fileConsumerProperties.getMode() == FileReadingMode.ref) { + return () -> fileMessageFlux; } - else if (this.fileSupplierProperties.getFilenameRegex() != null) { - chainFilter.addFilter(new RegexPatternFileListFilter(this.fileSupplierProperties.getFilenameRegex())); + else if (fileReadingFlow != null) { + return () -> Flux.from(fileReadingFlow); } - - if (this.fileSupplierProperties.isPreventDuplicates()) { - chainFilter - .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, METADATA_STORE_PREFIX)); + else if (fileTailingFlow != null) { + return () -> Flux.from(fileTailingFlow); + } + else { + throw new BeanInitializationException("Cannot creat 'fileSupplier' bean: no 'fileReadingFlow', " + + "or 'fileTailingFlow' dependency, and is not 'FileReadingMode.ref'."); } - - return chainFilter; } - @Bean - public FileInboundChannelAdapterSpec fileMessageSource(FileListFilter fileListFilter, - @Nullable ComponentCustomizer fileInboundChannelAdapterSpecCustomizer) { + @Configuration(proxyBeanMethods = false) + @ConditionalOnExpression("environment['file.supplier.tail'] == null") + protected static class FileMessageSourceConfiguration { - FileInboundChannelAdapterSpec adapterSpec = Files.inboundAdapter(this.fileSupplierProperties.getDirectory()) - .filter(fileListFilter); - if (fileInboundChannelAdapterSpecCustomizer != null) { - fileInboundChannelAdapterSpecCustomizer.customize(adapterSpec); - } - return adapterSpec; - } + private static final String METADATA_STORE_PREFIX = "local-file-system-metadata-"; - @Bean - public Flux> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) { - return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource) - .contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, - this.fileSupplierProperties.getDelayWhenEmpty())) - .doOnRequest((r) -> fileReadingMessageSource.start()); - } + private final FileSupplierProperties fileSupplierProperties; - @Bean - @ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'") - public Publisher> fileReadingFlow(Flux> fileMessageFlux) { - IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux); - return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher(true); - } + private final FileConsumerProperties fileConsumerProperties; - @Bean - public Supplier>> fileSupplier(Flux> fileMessageFlux, - @Nullable Publisher> fileReadingFlow) { + FileMessageSourceConfiguration(FileSupplierProperties fileSupplierProperties, + FileConsumerProperties fileConsumerProperties) { - if (this.fileConsumerProperties.getMode() == FileReadingMode.ref) { - return () -> fileMessageFlux; + this.fileSupplierProperties = fileSupplierProperties; + this.fileConsumerProperties = fileConsumerProperties; } - else if (fileReadingFlow != null) { - return () -> Flux.from(fileReadingFlow); + + @Bean + public ChainFileListFilter filter(ConcurrentMetadataStore metadataStore) { + ChainFileListFilter chainFilter = new ChainFileListFilter<>(); + if (StringUtils.hasText(this.fileSupplierProperties.getFilenamePattern())) { + chainFilter + .addFilter(new SimplePatternFileListFilter(this.fileSupplierProperties.getFilenamePattern())); + } + else if (this.fileSupplierProperties.getFilenameRegex() != null) { + chainFilter.addFilter(new RegexPatternFileListFilter(this.fileSupplierProperties.getFilenameRegex())); + } + + if (this.fileSupplierProperties.isPreventDuplicates()) { + chainFilter + .addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, METADATA_STORE_PREFIX)); + } + + return chainFilter; } - else { - throw new BeanInitializationException( - "Cannot creat 'fileSupplier' bean: no 'fileReadingFlow' dependency and is not 'FileReadingMode.ref'."); + + @Bean + public FileInboundChannelAdapterSpec fileMessageSource(FileListFilter fileListFilter, + @Nullable ComponentCustomizer fileInboundChannelAdapterSpecCustomizer) { + + FileInboundChannelAdapterSpec adapterSpec = Files.inboundAdapter(this.fileSupplierProperties.getDirectory()) + .filter(fileListFilter); + if (fileInboundChannelAdapterSpecCustomizer != null) { + fileInboundChannelAdapterSpecCustomizer.customize(adapterSpec); + } + return adapterSpec; + } + + @Bean + public Flux> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) { + return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource) + .contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY, + this.fileSupplierProperties.getDelayWhenEmpty())) + .doOnRequest((r) -> fileReadingMessageSource.start()); + } + + @Bean + @ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'") + public Publisher> fileReadingFlow(Flux> fileMessageFlux) { + IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux); + return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties) + .toReactivePublisher(true); } + } } diff --git a/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierProperties.java b/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierProperties.java index e2c997f5..cf2cdbc1 100644 --- a/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierProperties.java +++ b/supplier/spring-file-supplier/src/main/java/org/springframework/cloud/fn/supplier/file/FileSupplierProperties.java @@ -21,6 +21,7 @@ import java.util.regex.Pattern; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.integration.file.tail.FileTailingMessageProducerSupport; import org.springframework.validation.annotation.Validated; /** @@ -41,6 +42,11 @@ public class FileSupplierProperties { */ private File directory = new File(DEFAULT_DIR); + /** + * The file to tail. + */ + private File tail; + /** * Set to true to include an AcceptOnceFileListFilter which prevents duplicates. */ @@ -61,6 +67,11 @@ public class FileSupplierProperties { */ private Duration delayWhenEmpty = Duration.ofSeconds(1); + /** + * File tailing options. + */ + private final Tailer tailer = new Tailer(); + public File getDirectory() { return this.directory; } @@ -107,4 +118,113 @@ public void setDelayWhenEmpty(Duration delayWhenEmpty) { this.delayWhenEmpty = delayWhenEmpty; } + public File getTail() { + return this.tail; + } + + public void setTail(File tail) { + this.tail = tail; + } + + public Tailer getTailer() { + return this.tailer; + } + + public static class Tailer { + + /** + * Options string for native tail command. + */ + private String nativeOptions; + + /** + * Whether to capture stderr output in the separate thread. + */ + private boolean statusReader; + + /** + * Delay between attempts to tail a non-existent file, or between attempts to + * execute a process if it fails for any reason. + */ + private Duration attemptsDelay = Duration + .ofMillis(FileTailingMessageProducerSupport.DEFAULT_TAIL_ATTEMPTS_DELAY); + + /** + * How often to emit FileTailingIdleEvents. + */ + private Duration idleEventInterval; + + /** + * Delay between checks of the file for new content. + */ + private Duration pollingDelay; + + /** + * Whether to tail from the end of the file. + */ + private Boolean end; + + /** + * Whether to close and reopen the file between reading chunks. + */ + private Boolean reopen; + + public String getNativeOptions() { + return this.nativeOptions; + } + + public void setNativeOptions(String nativeOptions) { + this.nativeOptions = nativeOptions; + } + + public boolean isStatusReader() { + return this.statusReader; + } + + public void setStatusReader(boolean statusReader) { + this.statusReader = statusReader; + } + + public Duration getAttemptsDelay() { + return this.attemptsDelay; + } + + public void setAttemptsDelay(Duration attemptsDelay) { + this.attemptsDelay = attemptsDelay; + } + + public Duration getIdleEventInterval() { + return this.idleEventInterval; + } + + public void setIdleEventInterval(Duration idleEventInterval) { + this.idleEventInterval = idleEventInterval; + } + + public Duration getPollingDelay() { + return this.pollingDelay; + } + + public void setPollingDelay(Duration pollingDelay) { + this.pollingDelay = pollingDelay; + } + + public Boolean isEnd() { + return this.end; + } + + public void setEnd(Boolean end) { + this.end = end; + } + + public Boolean isReopen() { + return this.reopen; + } + + public void setReopen(Boolean reopen) { + this.reopen = reopen; + } + + } + } diff --git a/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/AbstractFileSupplierTests.java b/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/AbstractFileSupplierTests.java index aa3c1e5b..547b687f 100644 --- a/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/AbstractFileSupplierTests.java +++ b/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/AbstractFileSupplierTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2022 the original author or authors. + * Copyright 2020-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ */ @SpringBootTest @DirtiesContext -public class AbstractFileSupplierTests { +public abstract class AbstractFileSupplierTests { @TempDir static Path tempDir; diff --git a/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/TailModeTests.java b/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/TailModeTests.java new file mode 100644 index 00000000..5b29bed9 --- /dev/null +++ b/supplier/spring-file-supplier/src/test/java/org/springframework/cloud/fn/supplier/file/TailModeTests.java @@ -0,0 +1,83 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.fn.supplier.file; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.integration.file.tail.FileTailingMessageProducerSupport; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; + +/** + * @author Artem Bilan + */ +public class TailModeTests extends AbstractFileSupplierTests { + + static Path tempFile; + + @BeforeAll + static void init() throws IOException { + tempFile = Files.createFile(tempDir.resolve("test.txt")); + System.setProperty("file.supplier.tail", tempFile.toAbsolutePath().toString()); + System.setProperty("file.supplier.tailer.end", "false"); + System.setProperty("file.supplier.tailer.polling-delay", "100"); + } + + @Test + void tailSupplierEmitsData(@Autowired FileTailingMessageProducerSupport fileTailingMessageProducer) + throws IOException { + + SimpleAsyncTaskExecutor taskExecutor = TestUtils.getPropertyValue(fileTailingMessageProducer, "taskExecutor", + SimpleAsyncTaskExecutor.class); + // We have to interrupt org.apache.commons.io.input.Tailer.run() loop to close + // reader to the file + taskExecutor.setTaskTerminationTimeout(10_000); + + Flux tailFlux = fileSupplier.get().map(Message::getPayload).cast(String.class); + + StepVerifier stepVerifier = StepVerifier.create(tailFlux) + .expectNext("one", "two", "three") + .thenCancel() + .verifyLater(); + + FileOutputStream fileOutputStream = new FileOutputStream(tempFile.toFile()); + fileOutputStream.write("one\n".getBytes()); + fileOutputStream.write("two\n".getBytes()); + fileOutputStream.write("three\n".getBytes()); + + fileOutputStream.flush(); + fileOutputStream.close(); + + stepVerifier.verify(Duration.ofSeconds(30)); + + // Ensure that Tailer.run() loop is interrupted, so reader to the file si closed + // before it is deleted by JUnit + taskExecutor.close(); + } + +}