diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index a0f1448d72..67e1fb813d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -19,11 +19,14 @@ package nextflow.processor import java.nio.file.FileAlreadyExistsException import java.nio.file.FileSystem import java.nio.file.FileSystems +import java.nio.file.FileVisitResult import java.nio.file.Files import java.nio.file.LinkOption import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.PathMatcher +import java.nio.file.SimpleFileVisitor +import java.nio.file.attribute.BasicFileAttributes import java.time.temporal.ChronoUnit import java.util.concurrent.ExecutorService @@ -337,10 +340,10 @@ class PublishDir { } if( inProcess ) { - safeProcessFile(source, destination) + safeProcessPath(source, destination) } else { - threadPool.submit({ safeProcessFile(source, destination) } as Runnable) + threadPool.submit({ safeProcessPath(source, destination) } as Runnable) } } @@ -363,9 +366,23 @@ class PublishDir { throw new IllegalArgumentException("Not a valid publish target path: `$target` [${target?.class?.name}]") } - protected void safeProcessFile(Path source, Path target) { + protected void safeProcessPath(Path source, Path target) { try { - retryableProcessFile(source, target) + // publish each file in the directory tree + if( Files.isDirectory(source) ) { + Files.walkFileTree(source, new SimpleFileVisitor() { + FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) { + final targetFile = target.resolve(source.relativize(sourceFile).toString()) + retryableProcessFile(sourceFile, targetFile) + FileVisitResult.CONTINUE + } + }) + } + + // otherwise publish file directly + else { + retryableProcessFile(source, target) + } } catch( Throwable e ) { final msg = "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details" @@ -395,7 +412,7 @@ class PublishDir { .build() Failsafe .with( retryPolicy ) - .get({it-> processFile(source, target)}) + .get { it -> processFile(source, target) } } protected void processFile( Path source, Path destination ) { @@ -413,9 +430,10 @@ class PublishDir { // make sure destination and source does not overlap // see https://github.com/nextflow-io/nextflow/issues/2177 - if( !sameRealPath && checkSourcePathConflicts(destination)) + if( !sameRealPath && checkSourcePathConflicts(destination) ) return + // overwrite only if explicitly enabled or destination is stale if( !sameRealPath && shouldOverwrite(source, destination) ) { FileHelper.deletePath(destination) processFileImpl(source, destination) diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy index 60f183fb23..60c484e402 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/PublishDirTest.groovy @@ -218,6 +218,48 @@ class PublishDirTest extends Specification { } + def 'should overwrite published files only if they are stale' () { + + given: + def session = new Session() + def folder = Files.createTempDirectory('nxf') + def sourceDir = folder.resolve('work-dir') + def targetDir = folder.resolve('pub-dir') + sourceDir.mkdir() + sourceDir.resolve('file1.txt').text = 'aaa' + sourceDir.resolve('file2.bam').text = 'bbb' + targetDir.mkdir() + targetDir.resolve('file1.txt').text = 'aaa' + targetDir.resolve('file2.bam').text = 'bbb (old)' + + def task = new TaskRun(workDir: sourceDir, config: new TaskConfig(), name: 'foo') + + when: + def outputs = [ + sourceDir.resolve('file1.txt'), + sourceDir.resolve('file2.bam') + ] as Set + def publisher = new PublishDir(path: targetDir, mode: 'copy', overwrite: 'deep') + and: + def timestamp1 = targetDir.resolve('file1.txt').lastModified() + def timestamp2 = targetDir.resolve('file2.bam').lastModified() + and: + publisher.apply( outputs, task ) + and: + session.@publishPoolManager.shutdown(false) + + then: + timestamp1 == targetDir.resolve('file1.txt').lastModified() + timestamp2 != targetDir.resolve('file2.bam').lastModified() + + targetDir.resolve('file1.txt').text == 'aaa' + targetDir.resolve('file2.bam').text == 'bbb' + + cleanup: + folder?.deleteDir() + + } + def 'should apply saveAs closure' () { given: diff --git a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy new file mode 100644 index 0000000000..f1c40073b3 --- /dev/null +++ b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.file + +/** + * Defines the interface for files that have an ETag + * + * @author Ben Sherman + */ +interface ETagAwareFile { + + String getETag() + +} diff --git a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java index 46c3fedf84..6d14c64142 100644 --- a/modules/nf-commons/src/main/nextflow/util/HashBuilder.java +++ b/modules/nf-commons/src/main/nextflow/util/HashBuilder.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -44,6 +45,7 @@ import nextflow.ISession; import nextflow.extension.Bolts; import nextflow.extension.FilesEx; +import nextflow.file.ETagAwareFile; import nextflow.file.FileHolder; import nextflow.io.SerializableMarker; import org.slf4j.Logger; @@ -413,6 +415,11 @@ static private Hasher hashFileMetadata( Hasher hasher, Path file, BasicFileAttri */ static private Hasher hashFileContent( Hasher hasher, Path path ) { + // use etag if available + if( path instanceof ETagAwareFile ) + hasher.putBytes(((ETagAwareFile)path).getETag().getBytes(StandardCharsets.UTF_8)); + + // otherwise compute checksum manually OutputStream output = Funnels.asOutputStream(hasher); try { Files.copy(path, output); diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/AmazonPlugin.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/AmazonPlugin.groovy index 51533a26e4..949b270ac9 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/AmazonPlugin.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/AmazonPlugin.groovy @@ -15,8 +15,8 @@ */ package nextflow.cloud.aws -import nextflow.cloud.aws.nio.S3FileSystemProvider import groovy.transform.CompileStatic +import nextflow.cloud.aws.nio.S3FileSystemProvider import nextflow.file.FileHelper import nextflow.plugin.BasePlugin import org.pf4j.PluginWrapper diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java index 2a5e193b8c..a86a884869 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java @@ -42,13 +42,14 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import nextflow.file.ETagAwareFile; import nextflow.file.TagAwareFile; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; -public class S3Path implements Path, TagAwareFile { +public class S3Path implements Path, ETagAwareFile, TagAwareFile { public static final String PATH_SEPARATOR = "/"; /** @@ -566,6 +567,14 @@ public String getStorageClass() { return storageClass; } + @Override + public String getETag() { + return fileSystem + .getClient() + .getObjectMetadata(getBucket(), getKey()) + .getETag(); + } + // ~ helpers methods private static Function strip(final String ... strs) { diff --git a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy index adeb76cd43..fe9083d314 100644 --- a/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy +++ b/plugins/nf-amazon/src/test/nextflow/processor/PublishDirS3Test.groovy @@ -63,7 +63,7 @@ class PublishDirS3Test extends Specification { when: spy.apply1(source, true) then: - 1 * spy.safeProcessFile(source, _) >> { sourceFile, s3File -> + 1 * spy.safeProcessPath(source, _) >> { sourceFile, s3File -> assert s3File instanceof S3Path assert (s3File as S3Path).getTagsList().find{ it.getKey()=='FOO'}.value == 'this' assert (s3File as S3Path).getTagsList().find{ it.getKey()=='BAR'}.value == 'that' diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy index a9960aff56..047a4e0ce3 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy @@ -46,6 +46,8 @@ class AzFileAttributes implements BasicFileAttributes { private String objectId + private String etag + static AzFileAttributes root() { new AzFileAttributes(size: 0, objectId: '/', directory: true) } @@ -60,6 +62,7 @@ class AzFileAttributes implements BasicFileAttributes { updateTime = time(props.getLastModified()) directory = client.blobName.endsWith('/') size = props.getBlobSize() + etag = props.getETag() // Support for Azure Data Lake Storage Gen2 with hierarchical namespace enabled final meta = props.getMetadata() @@ -75,6 +78,7 @@ class AzFileAttributes implements BasicFileAttributes { creationTime = time(item.properties.getCreationTime()) updateTime = time(item.properties.getLastModified()) size = item.properties.getContentLength() + etag = item.properties.getETag() } } @@ -150,6 +154,10 @@ class AzFileAttributes implements BasicFileAttributes { return objectId } + String getETag() { + return etag + } + @Override boolean equals( Object obj ) { if( this.class != obj?.class ) return false diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy index 2f654b4ad8..596ab38260 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlobItem import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.PackageScope +import nextflow.file.ETagAwareFile /** * Implements Azure path object @@ -37,7 +38,7 @@ import groovy.transform.PackageScope */ @CompileStatic @EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true) -class AzPath implements Path { +class AzPath implements Path, ETagAwareFile { private AzFileSystem fs @@ -333,4 +334,9 @@ class AzPath implements Path { return result } + @Override + String getETag() { + return attributes.getETag() + } + }