Skip to content

Commit

Permalink
GH-3: Add tail support for spring-file-supplier (#42)
Browse files Browse the repository at this point in the history
This commit adds `tail` support to the `spring-file-supplier`.

Additionally, move `FileMessageSource` specific beans into separate inner 
`FileMessageSourceConfiguration` class for easier conditional config 

Fixes: #3
  • Loading branch information
artembilan authored Mar 19, 2024
1 parent 6e4d756 commit 1611f94
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 63 deletions.
18 changes: 13 additions & 5 deletions supplier/spring-file-supplier/README.adoc
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# File Supplier
= File Supplier

This module provides a File supplier that can be reused and composed in other applications.
The `Supplier` uses the `FileInboundChannelAdapter` from Spring Integration.
`FileSupplier` is implemented as a `java.util.function.Supplier`.
This supplier gives you a reactive stream of files from the provided directory as the supplier has a signature of `Supplier<Flux<Message<?>>>`.
Users have to subscribe to this `Flux` and receive the data.

## Beans for injection
This module also can be configured for file tailing functionality.
The implementation and logic is fully based on respective Spring Integration https://docs.spring.io/spring-integration/reference/file/reading.html#file-tailing[Tailing Message Producer].
The `Supplier<Flux<Message<?>>>` emits messages for each tailed line from the file.

== Beans for injection

The `FileSupplierConfiguration` auto-configuration provides the following bean:

Expand All @@ -18,7 +22,7 @@ You can use `fileSupplier` as a qualifier when injecting.

Once injected, you can use the `get` method of the `Supplier` to invoke it and then subscribe to the returned `Flux`.

## Configuration Options
== Configuration Options

All configuration properties are prefixed with `file.supplier`.
There are also properties that need to be used with the prefix `file.consumer`.
Expand All @@ -28,10 +32,14 @@ See `FileConsumerProperties` also.

A `ComponentCustomizer<FileInboundChannelAdapterSpec>` bean can be added in the target project to provide any custom options for the `FileInboundChannelAdapterSpec` configuration used by the `fileSupplier`.

## Tests
If `file.supplier.tail` option is provided, this supplier works in a tailing file mode.
See `FileSupplierProperties.Tailer` container for more information.
In `tail` mode, all other options for directory polling are ignored.

== Tests

See this link:src/test/java/org/springframework/cloud/fn/supplier/file[test suite] for the various ways, this supplier is used.

## Other usage
== Other usage

See this https://github.com/spring-cloud/stream-applications/blob/master/applications/source/file-source/README.adoc[README] where this supplier is used to create a Spring Cloud Stream application where it makes a File Source.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,21 @@
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.fn.common.config.ComponentCustomizer;
import org.springframework.cloud.fn.common.file.FileConsumerProperties;
import org.springframework.cloud.fn.common.file.FileReadingMode;
import org.springframework.cloud.fn.common.file.FileUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.JavaUtils;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.dsl.FileInboundChannelAdapterSpec;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.dsl.TailAdapterSpec;
import org.springframework.integration.file.filters.ChainFileListFilter;
import org.springframework.integration.file.filters.FileListFilter;
import org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter;
Expand All @@ -58,78 +62,113 @@
@EnableConfigurationProperties({ FileSupplierProperties.class, FileConsumerProperties.class })
public class FileSupplierConfiguration {

private static final String METADATA_STORE_PREFIX = "local-file-system-metadata-";

private final FileSupplierProperties fileSupplierProperties;

private final FileConsumerProperties fileConsumerProperties;

public FileSupplierConfiguration(FileSupplierProperties fileSupplierProperties,
FileConsumerProperties fileConsumerProperties) {

this.fileSupplierProperties = fileSupplierProperties;
this.fileConsumerProperties = fileConsumerProperties;
@Bean
@ConditionalOnProperty(prefix = "file.supplier", name = "tail")
public Publisher<Message<String>> fileTailingFlow(FileSupplierProperties fileSupplierProperties) {
FileSupplierProperties.Tailer tail = fileSupplierProperties.getTailer();
TailAdapterSpec tailAdapterSpec = Files.tailAdapter(fileSupplierProperties.getTail())
// TODO until Spring Integration 6.2.3
.autoStartup(false)
.fileDelay(tail.getAttemptsDelay().toMillis())
// OS native tail command
.nativeOptions(tail.getNativeOptions())
.enableStatusReader(tail.isStatusReader());

JavaUtils.INSTANCE
.acceptIfNotNull(tail.getIdleEventInterval(),
(duration) -> tailAdapterSpec.idleEventInterval(duration.toMillis()))
// Apache Commons Tailer
.acceptIfNotNull(tail.isEnd(), tailAdapterSpec::end)
.acceptIfNotNull(tail.isReopen(), tailAdapterSpec::reopen)
.acceptIfNotNull(tail.getPollingDelay(), (duration) -> tailAdapterSpec.delay(duration.toMillis()));

return IntegrationFlow.from(tailAdapterSpec).toReactivePublisher(true);
}

@Bean
public ChainFileListFilter<File> filter(ConcurrentMetadataStore metadataStore) {
ChainFileListFilter<File> chainFilter = new ChainFileListFilter<>();
if (StringUtils.hasText(this.fileSupplierProperties.getFilenamePattern())) {
chainFilter.addFilter(new SimplePatternFileListFilter(this.fileSupplierProperties.getFilenamePattern()));
public Supplier<Flux<Message<?>>> fileSupplier(FileConsumerProperties fileConsumerProperties,
@Nullable Flux<Message<?>> fileMessageFlux, @Nullable Publisher<Message<Object>> fileReadingFlow,
@Nullable Publisher<Message<String>> fileTailingFlow) {

if (fileConsumerProperties.getMode() == FileReadingMode.ref) {
return () -> fileMessageFlux;
}
else if (this.fileSupplierProperties.getFilenameRegex() != null) {
chainFilter.addFilter(new RegexPatternFileListFilter(this.fileSupplierProperties.getFilenameRegex()));
else if (fileReadingFlow != null) {
return () -> Flux.from(fileReadingFlow);
}

if (this.fileSupplierProperties.isPreventDuplicates()) {
chainFilter
.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, METADATA_STORE_PREFIX));
else if (fileTailingFlow != null) {
return () -> Flux.from(fileTailingFlow);
}
else {
throw new BeanInitializationException("Cannot creat 'fileSupplier' bean: no 'fileReadingFlow', "
+ "or 'fileTailingFlow' dependency, and is not 'FileReadingMode.ref'.");
}

return chainFilter;
}

@Bean
public FileInboundChannelAdapterSpec fileMessageSource(FileListFilter<File> fileListFilter,
@Nullable ComponentCustomizer<FileInboundChannelAdapterSpec> fileInboundChannelAdapterSpecCustomizer) {
@Configuration(proxyBeanMethods = false)
@ConditionalOnExpression("environment['file.supplier.tail'] == null")
protected static class FileMessageSourceConfiguration {

FileInboundChannelAdapterSpec adapterSpec = Files.inboundAdapter(this.fileSupplierProperties.getDirectory())
.filter(fileListFilter);
if (fileInboundChannelAdapterSpecCustomizer != null) {
fileInboundChannelAdapterSpecCustomizer.customize(adapterSpec);
}
return adapterSpec;
}
private static final String METADATA_STORE_PREFIX = "local-file-system-metadata-";

@Bean
public Flux<Message<File>> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) {
return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource)
.contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY,
this.fileSupplierProperties.getDelayWhenEmpty()))
.doOnRequest((r) -> fileReadingMessageSource.start());
}
private final FileSupplierProperties fileSupplierProperties;

@Bean
@ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'")
public Publisher<Message<Object>> fileReadingFlow(Flux<Message<?>> fileMessageFlux) {
IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux);
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties).toReactivePublisher(true);
}
private final FileConsumerProperties fileConsumerProperties;

@Bean
public Supplier<Flux<Message<?>>> fileSupplier(Flux<Message<?>> fileMessageFlux,
@Nullable Publisher<Message<Object>> fileReadingFlow) {
FileMessageSourceConfiguration(FileSupplierProperties fileSupplierProperties,
FileConsumerProperties fileConsumerProperties) {

if (this.fileConsumerProperties.getMode() == FileReadingMode.ref) {
return () -> fileMessageFlux;
this.fileSupplierProperties = fileSupplierProperties;
this.fileConsumerProperties = fileConsumerProperties;
}
else if (fileReadingFlow != null) {
return () -> Flux.from(fileReadingFlow);

@Bean
public ChainFileListFilter<File> filter(ConcurrentMetadataStore metadataStore) {
ChainFileListFilter<File> chainFilter = new ChainFileListFilter<>();
if (StringUtils.hasText(this.fileSupplierProperties.getFilenamePattern())) {
chainFilter
.addFilter(new SimplePatternFileListFilter(this.fileSupplierProperties.getFilenamePattern()));
}
else if (this.fileSupplierProperties.getFilenameRegex() != null) {
chainFilter.addFilter(new RegexPatternFileListFilter(this.fileSupplierProperties.getFilenameRegex()));
}

if (this.fileSupplierProperties.isPreventDuplicates()) {
chainFilter
.addFilter(new FileSystemPersistentAcceptOnceFileListFilter(metadataStore, METADATA_STORE_PREFIX));
}

return chainFilter;
}
else {
throw new BeanInitializationException(
"Cannot creat 'fileSupplier' bean: no 'fileReadingFlow' dependency and is not 'FileReadingMode.ref'.");

@Bean
public FileInboundChannelAdapterSpec fileMessageSource(FileListFilter<File> fileListFilter,
@Nullable ComponentCustomizer<FileInboundChannelAdapterSpec> fileInboundChannelAdapterSpecCustomizer) {

FileInboundChannelAdapterSpec adapterSpec = Files.inboundAdapter(this.fileSupplierProperties.getDirectory())
.filter(fileListFilter);
if (fileInboundChannelAdapterSpecCustomizer != null) {
fileInboundChannelAdapterSpecCustomizer.customize(adapterSpec);
}
return adapterSpec;
}

@Bean
public Flux<Message<File>> fileMessageFlux(FileReadingMessageSource fileReadingMessageSource) {
return IntegrationReactiveUtils.messageSourceToFlux(fileReadingMessageSource)
.contextWrite(Context.of(IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY,
this.fileSupplierProperties.getDelayWhenEmpty()))
.doOnRequest((r) -> fileReadingMessageSource.start());
}

@Bean
@ConditionalOnExpression("environment['file.consumer.mode'] != 'ref'")
public Publisher<Message<Object>> fileReadingFlow(Flux<Message<?>> fileMessageFlux) {
IntegrationFlowBuilder flowBuilder = IntegrationFlow.from(fileMessageFlux);
return FileUtils.enhanceFlowForReadingMode(flowBuilder, this.fileConsumerProperties)
.toReactivePublisher(true);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.regex.Pattern;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.integration.file.tail.FileTailingMessageProducerSupport;
import org.springframework.validation.annotation.Validated;

/**
Expand All @@ -41,6 +42,11 @@ public class FileSupplierProperties {
*/
private File directory = new File(DEFAULT_DIR);

/**
* The file to tail.
*/
private File tail;

/**
* Set to true to include an AcceptOnceFileListFilter which prevents duplicates.
*/
Expand All @@ -61,6 +67,11 @@ public class FileSupplierProperties {
*/
private Duration delayWhenEmpty = Duration.ofSeconds(1);

/**
* File tailing options.
*/
private final Tailer tailer = new Tailer();

public File getDirectory() {
return this.directory;
}
Expand Down Expand Up @@ -107,4 +118,113 @@ public void setDelayWhenEmpty(Duration delayWhenEmpty) {
this.delayWhenEmpty = delayWhenEmpty;
}

public File getTail() {
return this.tail;
}

public void setTail(File tail) {
this.tail = tail;
}

public Tailer getTailer() {
return this.tailer;
}

public static class Tailer {

/**
* Options string for native tail command.
*/
private String nativeOptions;

/**
* Whether to capture stderr output in the separate thread.
*/
private boolean statusReader;

/**
* Delay between attempts to tail a non-existent file, or between attempts to
* execute a process if it fails for any reason.
*/
private Duration attemptsDelay = Duration
.ofMillis(FileTailingMessageProducerSupport.DEFAULT_TAIL_ATTEMPTS_DELAY);

/**
* How often to emit FileTailingIdleEvents.
*/
private Duration idleEventInterval;

/**
* Delay between checks of the file for new content.
*/
private Duration pollingDelay;

/**
* Whether to tail from the end of the file.
*/
private Boolean end;

/**
* Whether to close and reopen the file between reading chunks.
*/
private Boolean reopen;

public String getNativeOptions() {
return this.nativeOptions;
}

public void setNativeOptions(String nativeOptions) {
this.nativeOptions = nativeOptions;
}

public boolean isStatusReader() {
return this.statusReader;
}

public void setStatusReader(boolean statusReader) {
this.statusReader = statusReader;
}

public Duration getAttemptsDelay() {
return this.attemptsDelay;
}

public void setAttemptsDelay(Duration attemptsDelay) {
this.attemptsDelay = attemptsDelay;
}

public Duration getIdleEventInterval() {
return this.idleEventInterval;
}

public void setIdleEventInterval(Duration idleEventInterval) {
this.idleEventInterval = idleEventInterval;
}

public Duration getPollingDelay() {
return this.pollingDelay;
}

public void setPollingDelay(Duration pollingDelay) {
this.pollingDelay = pollingDelay;
}

public Boolean isEnd() {
return this.end;
}

public void setEnd(Boolean end) {
this.end = end;
}

public Boolean isReopen() {
return this.reopen;
}

public void setReopen(Boolean reopen) {
this.reopen = reopen;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,7 +42,7 @@
*/
@SpringBootTest
@DirtiesContext
public class AbstractFileSupplierTests {
public abstract class AbstractFileSupplierTests {

@TempDir
static Path tempDir;
Expand Down
Loading

0 comments on commit 1611f94

Please sign in to comment.