diff --git a/docs/config.md b/docs/config.md index 37e6761776..d74e79aac5 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1579,11 +1579,32 @@ Read the {ref}`trace-report` page to learn more about the execution report that There are additional variables that can be defined within a configuration file that do not have a dedicated scope. `cleanup` -: If `true`, on a successful completion of a run all files in *work* directory are automatically deleted. - - :::{warning} - The use of the `cleanup` option will prevent the use of the *resume* feature on subsequent executions of that pipeline run. Also, be aware that deleting all scratch files can take a lot of time, especially when using a shared file system or remote cloud storage. +: :::{versionchanged} 23.10.0 + Added `'lazy'`, `'eager'`, and `'aggressive'` strategies. ::: +: Automatically delete task directories using one of the following strategies: + + `false` (default) + : Disable automatic cleanup. + + `true` + : Equivalent to `'lazy'`. + + `'lazy'` + : If a workflow completes successfully, delete all task directories. + : This strategy supports resumability for both successful and failed runs. + : Note that deleting all work directories at once can take a lot of time, especially when using a shared file system or remote cloud storage. + + `'eager'` + : Delete each task directory as soon as it is no longer needed by downstream tasks. + : A task can be deleted once all of the tasks that use any of its output files have completed. + : This strategy supports resumability for both successful and failed runs. + : Output files that are published via symlink will be invalidated when the original task directory is deleted. Avoid using the following publish modes: `copyNoFollow`, `rellink`, `symlink`. + + `'aggressive'` + : Equivalent to `'eager'`, but also deletes individual output files as soon as they are no longer needed. + : An output file can be deleted once the tasks that use it have completed. In some cases, an output file can be deleted sooner than its originating task. + : This strategy supports resumability for successful runs, but not necessarily for failed runs. Therefore, it is recommended when you want to minimize disk storage during the pipeline and you don't need resumability. `dumpHashes` : If `true`, dump task hash keys in the log file, for debugging purposes. diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 48713ea603..d038227455 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -372,6 +372,9 @@ class Session implements ISession { // -- file porter config this.filePorter = new FilePorter(this) + // -- normalize cleanup config + if( config.cleanup == true ) + config.cleanup = 'lazy' } protected Path cloudCachePath(Map cloudcache, Path workDir) { @@ -964,6 +967,17 @@ class Session implements ISession { } } + void notifyProcessClose(TaskProcessor process) { + observers.each { observer -> + try { + observer.onProcessClose(process) + } + catch( Exception e ) { + log.debug(e.getMessage(), e) + } + } + } + void notifyProcessTerminate(TaskProcessor process) { for( int i=0; i - def deleted = db.removeTaskEntry(hash) - if( deleted ) { - // delete folder - FileHelper.deletePath(FileHelper.asPath(record.workDir)) - } - } - log.trace "Clean workdir complete" - } - catch( Exception e ) { - log.warn("Failed to cleanup work dir: ${workDir.toUriString()}") - } - finally { - db.close() - } - } - @Memoized CondaConfig getCondaConfig() { final cfg = config.conda as Map ?: Collections.emptyMap() diff --git a/modules/nextflow/src/main/groovy/nextflow/cache/CacheDB.groovy b/modules/nextflow/src/main/groovy/nextflow/cache/CacheDB.groovy index 214e3235e3..07302166d5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/cache/CacheDB.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/cache/CacheDB.groovy @@ -77,7 +77,7 @@ class CacheDB implements Closeable { * @param processor The {@link TaskProcessor} instance to be assigned to the retrieved task * @return A {link TaskEntry} instance or {@code null} if a task for the given hash does not exist */ - TaskEntry getTaskEntry(HashCode taskHash, TaskProcessor processor) { + TaskEntry getTaskEntry(HashCode taskHash, Map processorLookup=[:]) { final payload = store.getEntry(taskHash) if( !payload ) @@ -85,9 +85,11 @@ class CacheDB implements Closeable { final record = (List)KryoHelper.deserialize(payload) TraceRecord trace = TraceRecord.deserialize( (byte[])record[0] ) + final processor = processorLookup[trace.get('process')] TaskContext ctx = record[1]!=null && processor!=null ? TaskContext.deserialize(processor, (byte[])record[1]) : null + final consumers = record[3]!=null ? ((List)record[3]).collect( s -> HashCode.fromString(s) ) : null - return new TaskEntry(trace,ctx) + return new TaskEntry(processor, trace, ctx, consumers) } void incTaskEntry( HashCode hash ) { @@ -99,7 +101,7 @@ class CacheDB implements Closeable { final record = (List)KryoHelper.deserialize(payload) // third record contains the reference count for this record - record[2] = ((Integer)record[2]) +1 + record[2] = ((Integer)record[2]) + 1 // save it again store.putEntry(hash, KryoHelper.serialize(record)) @@ -114,7 +116,7 @@ class CacheDB implements Closeable { final record = (List)KryoHelper.deserialize(payload) // third record contains the reference count for this record - def count = record[2] = ((Integer)record[2]) -1 + def count = record[2] = ((Integer)record[2]) - 1 // save or delete if( count > 0 ) { store.putEntry(hash, KryoHelper.serialize(record)) @@ -128,9 +130,10 @@ class CacheDB implements Closeable { /** - * Save task runtime information to th cache DB + * Save task runtime information to the cache DB * - * @param handler A {@link TaskHandler} instance + * @param handler + * @param trace */ @PackageScope void writeTaskEntry0( TaskHandler handler, TraceRecord trace ) { @@ -143,10 +146,11 @@ class CacheDB implements Closeable { // only the 'cache' is active and TaskContext ctx = proc.isCacheable() && task.hasCacheableValues() ? task.context : null - def record = new ArrayList(3) + final record = new ArrayList(4) record[0] = trace.serialize() record[1] = ctx != null ? ctx.serialize() : null record[2] = 1 + record[3] = null // -- save in the db store.putEntry( key, KryoHelper.serialize(record) ) @@ -157,6 +161,31 @@ class CacheDB implements Closeable { writer.send { writeTaskEntry0(handler, trace) } } + /** + * Finalize task entry in the cache DB with the list of + * consumer tasks. + * + * @param hash + * @param consumers + */ + @PackageScope + void finalizeTaskEntry0( HashCode hash, List consumers ) { + + final payload = store.getEntry(hash) + if( !payload ) { + log.debug "Unable to finalize task with key: $hash" + return + } + + final record = (List)KryoHelper.deserialize(payload) + record[3] = consumers.collect( h -> h.toString() ) + store.putEntry(hash, KryoHelper.serialize(record)) + } + + void finalizeTaskAsync( HashCode hash, List consumers ) { + writer.send { finalizeTaskEntry0(hash, consumers) } + } + void cacheTaskAsync( TaskHandler handler ) { writer.send { writeTaskIndex0(handler,true) @@ -224,7 +253,7 @@ class CacheDB implements Closeable { } TraceRecord getTraceRecord( HashCode hashCode ) { - final result = getTaskEntry(hashCode, null) + final result = getTaskEntry(hashCode) return result ? result.trace : null } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy index 775fb68f9e..f037a1adaa 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/PublishDir.groovy @@ -183,7 +183,7 @@ class PublishDir { def result = new PublishDir() if( params.path ) - result.path = params.path + result.path = params.path if( params.mode ) result.mode = params.mode @@ -217,6 +217,18 @@ class PublishDir { return result } + boolean canPublish(Path source, TaskRun task) { + if( !sourceDir ) { + this.sourceDir = task.targetDir + this.sourceFileSystem = sourceDir.fileSystem + this.stageInMode = task.config.stageInMode + this.task = task + validatePublishMode() + } + + return getPublishTarget(source) != null + } + protected void apply0(Set files) { assert path @@ -301,13 +313,8 @@ class PublishDir { protected void apply1(Path source, boolean inProcess ) { - def target = sourceDir ? sourceDir.relativize(source) : source.getFileName() - if( matcher && !matcher.matches(target) ) { - // skip not matching file - return - } - - if( saveAs && !(target=saveAs.call(target.toString()))) { + final target = getPublishTarget(source) + if( !target ) { // skip this file return } @@ -339,6 +346,17 @@ class PublishDir { } + protected def getPublishTarget(Path source) { + def target = sourceDir ? sourceDir.relativize(source) : source.getFileName() + if( matcher && !matcher.matches(target) ) + return null + + if( saveAs && !(target=saveAs.call(target.toString()))) + return null + + return target + } + protected Path resolveDestination(target) { if( target instanceof Path ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskEntry.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskEntry.groovy index ed4ac32362..7738dfd016 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskEntry.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskEntry.groovy @@ -16,9 +16,11 @@ package nextflow.processor +import com.google.common.hash.HashCode import groovy.transform.EqualsAndHashCode import groovy.transform.ToString import groovy.transform.TupleConstructor +import nextflow.processor.TaskProcessor import nextflow.trace.TraceRecord /** * Model a task entry persisted in the {@link nextflow.cache.CacheDB} @@ -30,8 +32,12 @@ import nextflow.trace.TraceRecord @TupleConstructor class TaskEntry { + TaskProcessor processor + TraceRecord trace TaskContext context + List consumers + } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 134d6b28b8..dc9cd46564 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -23,6 +23,7 @@ import java.nio.file.LinkOption import java.nio.file.NoSuchFileException import java.nio.file.Path import java.nio.file.Paths +import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicIntegerArray @@ -126,11 +127,11 @@ class TaskProcessor { RunType(String str) { message=str }; } - static final public String TASK_CONTEXT_PROPERTY_NAME = 'task' + static final String TASK_CONTEXT_PROPERTY_NAME = 'task' - final private static Pattern ENV_VAR_NAME = ~/[a-zA-Z_]+[a-zA-Z0-9_]*/ + static private final Pattern ENV_VAR_NAME = ~/[a-zA-Z_]+[a-zA-Z0-9_]*/ - final private static Pattern QUESTION_MARK = ~/(\?+)/ + static private final Pattern QUESTION_MARK = ~/(\?+)/ @TestOnly private static volatile TaskProcessor currentProcessor0 @@ -144,7 +145,7 @@ class TaskProcessor { /** * Unique task index number (run) */ - final protected AtomicInteger indexCount = new AtomicInteger() + protected final AtomicInteger indexCount = new AtomicInteger() /** * The current workflow execution session @@ -197,7 +198,7 @@ class TaskProcessor { * Note: it is declared static because the error must be shown only the * very first time for all processes */ - private static final AtomicBoolean errorShown = new AtomicBoolean() + static private final AtomicBoolean errorShown = new AtomicBoolean() /** * Flag set {@code true} when the processor termination has been invoked @@ -240,9 +241,9 @@ class TaskProcessor { private int maxForks - private static int processCount + static private int processCount - private static LockManager lockManager = new LockManager() + static private LockManager lockManager = new LockManager() private List> fairBuffers = new ArrayList<>() @@ -250,6 +251,17 @@ class TaskProcessor { private Boolean isFair0 + /** + * Map of all task processors by name. + */ + static private Map processorLookup = [:] + + /** + * Set of tasks (across all processors) that were deleted in a + * previous run and successfully restored from the cache db. + */ + static private Map restoredTasks = new ConcurrentHashMap<>() + private CompilerConfiguration compilerConfig() { final config = new CompilerConfiguration() config.addCompilationCustomizers( new ASTTransformationCustomizer(TaskTemplateVarsXform) ) @@ -304,6 +316,8 @@ class TaskProcessor { this.maxForks = config.maxForks ? config.maxForks as int : 0 this.forksCount = maxForks ? new LongAdder() : null this.isFair0 = config.getFair() + + processorLookup[name] = this } /** @@ -714,6 +728,10 @@ class TaskProcessor { return null } + synchronized protected TaskStartParams createTaskStartParams() { + return new TaskStartParams(TaskId.next(), indexCount.incrementAndGet()) + } + /** * Create a new {@code TaskRun} instance, initializing the following properties : *
  • {@code TaskRun#id} @@ -785,23 +803,17 @@ class TaskProcessor { while( true ) { hash = CacheHelper.defaultHasher().newHasher().putBytes(hash.asBytes()).putInt(tries).hash() - Path resumeDir = null - boolean exists = false try { - final entry = session.cache.getTaskEntry(hash, this) - resumeDir = entry ? FileHelper.asPath(entry.trace.getWorkDir()) : null - if( resumeDir ) - exists = resumeDir.exists() - - log.trace "[${safeTaskName(task)}] Cacheable folder=${resumeDir?.toUriString()} -- exists=$exists; try=$tries; shouldTryCache=$shouldTryCache; entry=$entry" - final cached = shouldTryCache && exists && entry.trace.isCompleted() && checkCachedOutput(task.clone(), resumeDir, hash, entry) - if( cached ) + if( shouldTryCache && checkCachedOutput(task.clone(), hash) ) break } catch (Throwable t) { log.warn1("[${safeTaskName(task)}] Unable to resume cached task -- See log file for details", causedBy: t) } + final entry = session.cache.getTaskEntry(hash, processorLookup) + Path resumeDir = entry ? FileHelper.asPath(entry.trace.getWorkDir()) : null + boolean exists = resumeDir?.exists() if( exists ) { tries++ continue @@ -887,88 +899,113 @@ class TaskProcessor { } /** - * Check whenever the outputs for the specified task already exist + * Attempt to restore a cached task by verifying either its outputs + * or the outputs of its consumers. * - * @param task The task instance - * @param folder The folder where the outputs are stored (eventually) - * @return {@code true} when all outputs are available, {@code false} otherwise + * @param seedTask + * @param seedHash + * @return {@code true} if all outputs are available, {@code false} otherwise */ - final boolean checkCachedOutput(TaskRun task, Path folder, HashCode hash, TaskEntry entry) { + final boolean checkCachedOutput(TaskRun seedTask, HashCode seedHash) { - // check if exists the task exit code file - def exitCode = null - def exitFile = folder.resolve(TaskRun.CMD_EXIT) - if( task.type == ScriptType.SCRIPTLET ) { - def str - try { - str = exitFile.text?.trim() + // -- recursively check for cached outputs + List queue = [ seedHash ] + Map handlers = [:] + + while( !queue.isEmpty() ) { + final hash = queue.pop() + + // -- skip tasks that have already been restored + if( hash in handlers ) + continue + + // -- get cache entry + final entry = session.cache.getTaskEntry(hash, processorLookup) + if( !entry || !entry.trace.isCompleted() ) + return false + + // -- get or create task run + def task + if( hash == seedHash ) { + task = seedTask + log.trace "[${safeTaskName(task)}] Cacheable task hash=${hash} entry=${entry}" + } + else if( entry.processor ) { + final params = entry.processor.createTaskStartParams() + task = entry.processor.createTaskRun(params) + log.trace "[${safeTaskName(task)}] Restoring deleted task hash=${hash} context=${entry.context}" } - catch( IOException e ) { - log.trace "[${safeTaskName(task)}] Exit file can't be read > $exitFile -- return false -- Cause: ${e.message}" + else { + log.trace "[${safeTaskName(seedTask)}] Missing processor for downstream hash=${hash} entry=${entry} -- return false" return false } - exitCode = str.isInteger() ? str.toInteger() : null - if( !task.isSuccess(exitCode) ) { - log.trace "[${safeTaskName(task)}] Exit code is not valid > $str -- return false" + // -- verify the task context map + if( task.hasCacheableValues() && !entry.context ) { + log.trace "[${safeTaskName(task)}] Missing cache context -- return false" return false } - } - /* - * verify cached context map - */ - if( !entry ) { - log.trace "[${safeTaskName(task)}] Missing cache entry -- return false" - return false - } + if( entry.context != null ) { + task.context = entry.context + task.config.context = entry.context + task.code?.delegate = entry.context + } - if( task.hasCacheableValues() && !entry.context ) { - log.trace "[${safeTaskName(task)}] Missing cache context -- return false" - return false - } + // -- verify the task exit code + final exitCode = entry.trace.get('exit') as Integer + if( task.type == ScriptType.SCRIPTLET && !task.isSuccess(exitCode) ) { + log.trace "[${safeTaskName(task)}] Exit code is not valid > ${exitCode} -- return false" + return false + } - /* - * verify stdout file - */ - final stdoutFile = folder.resolve( TaskRun.CMD_OUTFILE ) + // -- set the remaining task properties + task.cached = true + task.hash = hash + task.workDir = FileHelper.asPath(entry.trace.getWorkDir()) + task.stdout = task.workDir.resolve(TaskRun.CMD_OUTFILE) + task.exitStatus = exitCode + task.config.exitStatus = exitCode + + // -- check if all downstream outputs are available + if( entry.consumers != null ) { + queue.addAll( entry.consumers ) + } - if( entry.context != null ) { - task.context = entry.context - task.config.context = entry.context - task.code?.delegate = entry.context + // -- otherwise check if all task outputs are available + else { + try { + collectOutputs(task) + } + catch( MissingFileException | MissingValueException e ) { + log.trace "[${safeTaskName(task)}] Missed cache > ${e.getMessage()} -- folder: ${task.workDir}" + return false + } + } + + // -- create task handler + handlers[hash] = new CachedTaskHandler(task, entry.trace) } - try { - // -- expose task exit status to make accessible as output value - task.config.exitStatus = exitCode - // -- check if all output resources are available - collectOutputs(task, folder, stdoutFile, task.context) + // -- finalize all cached tasks + handlers.each { hash, handler -> + if( hash in restoredTasks ) + return - // set the exit code in to the task object - task.cached = true - task.hash = hash - task.workDir = folder - task.stdout = stdoutFile - if( exitCode != null ) { - task.exitStatus = exitCode - } + final task = handler.task log.info "[${task.hashLog}] Cached process > ${task.name}" + // -- update the set of restored tasks + if( task.processor != this ) + task.processor.state.update { StateObj it -> it.incSubmitted() } + restoredTasks[hash] = true // -- notify cached event - if( entry ) - session.notifyTaskCached(new CachedTaskHandler(task,entry.trace)) - - // -- now bind the results - finalizeTask0(task) - return true - } - catch( MissingFileException | MissingValueException e ) { - log.trace "[${safeTaskName(task)}] Missed cache > ${e.getMessage()} -- folder: $folder" - task.exitStatus = Integer.MAX_VALUE - task.workDir = null - return false + session.notifyTaskCached(handler) + // -- bind the results + task.processor.finalizeTask0(task) } + + return true } /** @@ -2341,6 +2378,10 @@ class TaskProcessor { state.update { StateObj it -> it.incCompleted() } } + protected void closeProcess() { + session.notifyProcessClose(this) + } + protected void terminateProcess() { log.trace "<${name}> Sending poison pills and terminating process" sendPoisonPill() @@ -2438,7 +2479,7 @@ class TaskProcessor { state.update { StateObj it -> it.incSubmitted() } // task index must be created here to guarantee consistent ordering // with the sequence of messages arrival since this method is executed in a thread safe manner - final params = new TaskStartParams(TaskId.next(), indexCount.incrementAndGet()) + final params = createTaskStartParams() final result = new ArrayList(2) result[0] = params result[1] = messages @@ -2493,6 +2534,7 @@ class TaskProcessor { // apparently auto if-guard instrumented by @Slf4j is not honoured in inner classes - add it explicitly if( log.isTraceEnabled() ) log.trace "<${name}> After stop" + closeProcess() } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy index ea9bae557f..1163cc3371 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ScriptRunner.groovy @@ -257,7 +257,6 @@ class ScriptRunner { protected shutdown() { session.destroy() - session.cleanup() Global.cleanUp() log.debug "> Execution complete -- Goodbye" } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/CleanupStrategy.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/CleanupStrategy.groovy new file mode 100644 index 0000000000..2ed75b0a7b --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/CleanupStrategy.groovy @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace + +import groovy.transform.CompileStatic + +/** + * Strategies to automatically cleanup task directories. + * + * @author Ben Sherman + */ +@CompileStatic +enum CleanupStrategy { + LAZY(1), + EAGER(2), + AGGRESSIVE(3) + + final int level + + CleanupStrategy(int level) { + this.level = level + } + + static boolean isValid(CharSequence name) { + if( !name ) + return false + try { + valueOf(name.toString().toUpperCase()) + return true + } + catch( IllegalArgumentException e ) { + return false + } + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index 6c391625c9..cf394a839d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -25,6 +25,7 @@ class DefaultObserverFactory implements TraceObserverFactory { createTimelineObserver(result) createDagObserver(result) createAnsiLogObserver(result) + createTaskCleanupObserver(result) return result } @@ -101,4 +102,12 @@ class DefaultObserverFactory implements TraceObserverFactory { result << observer } + protected void createTaskCleanupObserver(Collection result) { + final strategy = session.config.cleanup + if( strategy instanceof CharSequence && !CleanupStrategy.isValid(strategy) ) + throw new IllegalArgumentException("Invalid cleanup strategy '${strategy}' -- available strategies are ${CleanupStrategy.values().join(',').toLowerCase()}") + if( strategy ) + result << new TaskCleanupObserver(strategy.toString().toUpperCase() as CleanupStrategy) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TaskCleanupObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TaskCleanupObserver.groovy new file mode 100644 index 0000000000..c4e4abe459 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TaskCleanupObserver.groovy @@ -0,0 +1,460 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.trace + +import java.nio.file.Path +import java.util.concurrent.ExecutorService +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.cache.CacheDB +import nextflow.dag.DAG +import nextflow.file.FileHelper +import nextflow.processor.PublishDir.Mode +import nextflow.processor.TaskHandler +import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun +import nextflow.script.params.FileOutParam +import nextflow.util.ThreadPoolManager +/** + * Delete task directories once they are no longer needed. + * + * @author Ben Sherman + */ +@Slf4j +@CompileStatic +class TaskCleanupObserver implements TraceObserver { + + private CleanupStrategy strategy + + private Session session + + private Map processes = [:] + + private Map tasks = [:] + + private Map paths = [:] + + private Set publishedOutputs = [] + + private Lock sync = new ReentrantLock() + + private ExecutorService threadPool + + TaskCleanupObserver(CleanupStrategy strategy) { + this.strategy = strategy + } + + @Override + void onFlowCreate(Session session) { + this.session = session + this.threadPool = new ThreadPoolManager('TaskCleanup') + .withConfig(session.config) + .create() + } + + /** + * When the workflow begins, determine the consumers of each process + * in the DAG. + */ + @Override + void onFlowBegin() { + + // construct process lookup + final dag = session.dag + final withIncludeInputs = [] as Set + + for( def processNode : dag.vertices ) { + // skip nodes that are not processes + if( !processNode.process ) + continue + + // find all downstream processes in the abstract dag + def processName = processNode.process.name + def consumers = [] as Set + def queue = [ processNode ] + + while( !queue.isEmpty() ) { + // remove a node from the search queue + final sourceNode = queue.remove(0) + + // search each outgoing edge from the source node + for( def edge : dag.edges ) { + if( edge.from != sourceNode ) + continue + + def node = edge.to + + // skip if process is terminal + if( !node ) + continue + + // add process nodes to the list of consumers + if( node.process != null ) + consumers << node.process.name + // add operator nodes to the queue to keep searching + else + queue << node + } + } + + processes[processName] = new ProcessState(consumers ?: [processName] as Set) + + // check if process uses includeInputs + final hasIncludeInputs = processNode.process + .config.getOutputs() + .any( p -> p instanceof FileOutParam && p.includeInputs ) + + if( hasIncludeInputs ) + withIncludeInputs << processName + } + + // update producers of processes that use includeInputs + processes.each { processName, processState -> + final consumers = processState.consumers + for( def consumer : consumers.intersect(withIncludeInputs) ) { + log.trace "Process `${consumer}` uses includeInputs, adding its consumers to `${processName}`" + final consumerState = processes[consumer] + consumers.addAll(consumerState.consumers) + } + + log.trace "Process `${processName}` is consumed by the following processes: ${consumers}" + } + } + + static private final Set INVALID_PUBLISH_MODES = [Mode.COPY_NO_FOLLOW, Mode.RELLINK, Mode.SYMLINK] + + /** + * Log warning for any process that uses any incompatible features. + * + * @param process + */ + void onProcessCreate( TaskProcessor process ) { + // check for incompatible publish modes + final task = process.createTaskPreview() + final publishDirs = task.config.getPublishDir() + + if( publishDirs.any( p -> p.mode in INVALID_PUBLISH_MODES ) ) + log.warn "Process `${process.name}` is publishing files as symlinks, which may be invalidated by eager cleanup -- consider using 'copy' or 'link' instead" + } + + /** + * When a task is created, add it to the state map and add it as a consumer + * of any upstream tasks and output files. + * + * @param handler + * @param trace + */ + @Override + void onProcessPending(TaskHandler handler, TraceRecord trace) { + // query task input files + final task = handler.task + final inputs = task.getInputFilesMap().values() + + sync.withLock { + // add task to the task state map + tasks[task] = new TaskState() + + // add task as consumer of each upstream task and output file + for( Path path : inputs ) { + if( path in paths ) { + final pathState = paths[path] + final taskState = tasks[pathState.task] + taskState.consumers << task + pathState.consumers << task + } + } + } + } + + /** + * When a task is completed, track the task and its output files + * for automatic cleanup. + * + * @param handler + * @param trace + */ + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace) { + final task = handler.task + + // handle failed tasks separately + if( !task.isSuccess() ) { + handleTaskFailure(task) + return + } + + // query task output files + final outputs = task + .getOutputsByType(FileOutParam) + .values() + .flatten() as List + + // get publish outputs + final publishDirs = task.config.getPublishDir() + final publishOutputs = publishDirs + ? outputs.findAll( p -> publishDirs.any( publishDir -> publishDir.canPublish(p, task) ) ) + : [] + + log.trace "[${task.name}] will publish the following files: ${publishOutputs*.toUriString()}" + + sync.withLock { + // mark task as completed + tasks[task].completed = true + + // remove any outputs have already been published + final alreadyPublished = publishedOutputs.intersect(publishOutputs) + publishedOutputs.removeAll(alreadyPublished) + publishOutputs.removeAll(alreadyPublished) + + // add publish outputs to wait on + tasks[task].publishOutputs = publishOutputs as Set + + // scan tasks for cleanup + cleanup0() + + // add each output file to the path state map + for( Path path : outputs ) { + final pathState = new PathState(task) + if( path !in publishOutputs ) + pathState.published = true + + paths[path] = pathState + } + } + } + + /** + * When a task fails, mark it as completed without tracking its + * output files. Failed tasks are not included as consumers of + * upstream tasks in the cache. + * + * @param task + */ + void handleTaskFailure(TaskRun task) { + sync.withLock { + // mark task as completed + tasks[task].completed = true + } + } + + /** + * When a file is published, mark it as published and check + * the corresponding task for cleanup. + * + * If the file is published before the corresponding task is + * marked as completed, save it for later. + * + * @param destination + * @param source + */ + @Override + void onFilePublish(Path destination, Path source) { + sync.withLock { + // get the corresponding task + final pathState = paths[source] + if( pathState ) { + final task = pathState.task + + log.trace "File ${source.toUriString()} was published by task <${task.name}>" + + // mark file as published + tasks[task].publishOutputs.remove(source) + pathState.published = true + + // delete task if it can be deleted + if( strategy >= CleanupStrategy.EAGER && canDeleteTask(task) ) + deleteTask(task) + else if( strategy >= CleanupStrategy.AGGRESSIVE && canDeleteFile(source) ) + deleteFile(source) + } + else { + log.trace "File ${source.toUriString()} was published before task was marked as completed" + + // save file to be processed when task completes + publishedOutputs << source + } + } + } + + /** + * When a process is closed (all tasks of the process have been created), + * mark the process as closed and scan tasks for cleanup. + * + * @param process + */ + @Override + void onProcessClose(TaskProcessor process) { + sync.withLock { + processes[process.name].closed = true + cleanup0() + } + } + + /** + * When the workflow completes, delete all task directories (only + * when using the 'lazy' strategy). + */ + @Override + void onFlowComplete() { + if( strategy == CleanupStrategy.LAZY && session.isSuccess() ) { + log.info 'Deleting task directories (this might take a moment)...' + + for( TaskRun task : tasks.keySet() ) + deleteTask(task) + } + + threadPool.shutdown() + } + + /** + * Delete any task directories and output files that can be deleted. + */ + private void cleanup0() { + if( strategy < CleanupStrategy.EAGER ) + return + + for( TaskRun task : tasks.keySet() ) + if( canDeleteTask(task) ) + deleteTask(task) + + if( strategy < CleanupStrategy.AGGRESSIVE ) + return + + for( Path path : paths.keySet() ) + if( canDeleteFile(path) ) + deleteFile(path) + } + + /** + * Determine whether a task directory can be deleted. + * + * A task directory can be deleted if: + * - the task has completed + * - the task directory hasn't already been deleted + * - all of its publish outputs have been published + * - all of its process consumers are closed + * - all of its task consumers are completed + * + * @param task + */ + private boolean canDeleteTask(TaskRun task) { + final taskState = tasks[task] + final processState = processes[task.processor.name] + + taskState.completed + && !taskState.deleted + && taskState.publishOutputs.isEmpty() + && processState.consumers.every( p -> processes[p].closed ) + && taskState.consumers.every( t -> tasks[t].completed ) + } + + /** + * Delete a task directory. + * + * @param task + */ + private void deleteTask(TaskRun task) { + log.trace "[${task.name}] Deleting task directory: ${task.workDir.toUriString()}" + + // delete task + threadPool.submit({ + try { + FileHelper.deletePath(task.workDir) + } + catch( Exception e ) {} + } as Runnable) + + // mark task as deleted + final taskState = tasks[task] + taskState.deleted = true + + // finalize task in the cache db + final consumers = taskState.consumers + .findAll( t -> t.isSuccess() ) + .collect( t -> t.hash ) + session.cache.finalizeTaskAsync(task.hash, consumers) + } + + /** + * Determine whether a file can be deleted. + * + * A file can be deleted if: + * - the file has been published (or doesn't need to be published) + * - the file hasn't already been deleted + * - all of its process consumers are closed + * - all of its task consumers are completed + * + * @param path + */ + private boolean canDeleteFile(Path path) { + final pathState = paths[path] + final processState = processes[pathState.task.processor.name] + + pathState.published + && !pathState.deleted + && processState.consumers.every( p -> processes[p].closed ) + && pathState.consumers.every( t -> tasks[t].completed ) + } + + /** + * Delete a file. + * + * @param path + */ + private void deleteFile(Path path) { + final pathState = paths[path] + final task = pathState.task + + if( !tasks[task].deleted ) { + log.trace "[${task.name}] Deleting file: ${path.toUriString()}" + FileHelper.deletePath(path) + } + pathState.deleted = true + } + + static private class ProcessState { + Set consumers + boolean closed = false + + ProcessState(Set consumers) { + this.consumers = consumers + } + } + + static private class TaskState { + Set consumers = [] + Set publishOutputs = [] + boolean completed = false + boolean deleted = false + } + + static private class PathState { + TaskRun task + Set consumers = [] + boolean deleted = false + boolean published = false + + PathState(TaskRun task) { + this.task = task + } + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy index c5fac42d00..36cc0e9a89 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy @@ -50,6 +50,11 @@ trait TraceObserver { */ void onProcessCreate( TaskProcessor process ){} + /** + * Invoked when the process is closed (all tasks have been created). + */ + void onProcessClose( TaskProcessor process ){} + /* * Invoked when all tak have been executed and process ends. */ diff --git a/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy b/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy index 71ef367d07..e44975e2bb 100644 --- a/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy @@ -25,9 +25,10 @@ import nextflow.container.ContainerConfig import nextflow.exception.AbortOperationException import nextflow.script.ScriptFile import nextflow.script.WorkflowMetadata +import nextflow.trace.TaskCleanupObserver +import nextflow.trace.TraceFileObserver import nextflow.trace.TraceHelper import nextflow.trace.WorkflowStatsObserver -import nextflow.trace.TraceFileObserver import nextflow.util.Duration import nextflow.util.VersionNumber import spock.lang.Specification @@ -248,16 +249,23 @@ class SessionTest extends Specification { session = [:] as Session result = session.createObservers() then: - result.size()==1 + result.size() == 1 result.any { it instanceof WorkflowStatsObserver } when: session = [:] as Session - session.config = [trace: [enabled: true, file:'name.txt']] + session.config = [cleanup: 'eager'] result = session.createObservers() - observer = result[1] as TraceFileObserver then: result.size() == 2 + result.any { it instanceof TaskCleanupObserver } + + when: + session = [:] as Session + session.config = [trace: [enabled: true, file:'name.txt']] + result = session.createObservers() + observer = result.find { it instanceof TraceFileObserver } + then: observer.tracePath == Paths.get('name.txt').complete() observer.separator == '\t' @@ -265,9 +273,8 @@ class SessionTest extends Specification { session = [:] as Session session.config = [trace: [enabled: true, sep: 'x', fields: 'task_id,name,exit', file: 'alpha.txt']] result = session.createObservers() - observer = result[1] as TraceFileObserver + observer = result.find { it instanceof TraceFileObserver } then: - result.size() == 2 observer.tracePath == Paths.get('alpha.txt').complete() observer.separator == 'x' observer.fields == ['task_id','name','exit'] @@ -283,9 +290,8 @@ class SessionTest extends Specification { session = [:] as Session session.config = [trace: [enabled: true, fields: 'task_id,name,exit,vmem']] result = session.createObservers() - observer = result[1] as TraceFileObserver + observer = result.find { it instanceof TraceFileObserver } then: - result.size() == 2 observer.tracePath == Paths.get('trace-20221001.txt').complete() observer.separator == '\t' observer.fields == ['task_id','name','exit','vmem'] diff --git a/modules/nextflow/src/test/groovy/nextflow/cache/CacheDBTest.groovy b/modules/nextflow/src/test/groovy/nextflow/cache/CacheDBTest.groovy index 8191c29695..0276a9ae79 100644 --- a/modules/nextflow/src/test/groovy/nextflow/cache/CacheDBTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/cache/CacheDBTest.groovy @@ -83,7 +83,7 @@ class CacheDBTest extends Specification { 1 * task.getContext() >> ctx when: - def entry = cache.getTaskEntry(hash, proc) + def entry = cache.getTaskEntry(hash, ['foo': proc]) then: entry instanceof TaskEntry entry.trace instanceof TraceRecord diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/CleanupStrategyTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/CleanupStrategyTest.groovy new file mode 100644 index 0000000000..f25a0439b3 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/trace/CleanupStrategyTest.groovy @@ -0,0 +1,48 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.trace + +import spock.lang.Specification + +/** + * + * @author Ben Sherman + */ +class CleanupStrategyTest extends Specification { + + def "should check if it's a valid cleanup strategy" () { + + expect: + CleanupStrategy.isValid(NAME) == EXPECTED + + where: + NAME | EXPECTED + null | false + '' | false + 'foo' | false + and: + 'lazy' | true + 'eager' | true + 'aggressive' | true + and: + 'LAZY' | true + 'EAGER' | true + 'AGGRESSIVE' | true + } + +}