From f46f5065a79f723615bcb36a93666b1d6b08cb05 Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 28 Apr 2023 12:08:37 -0500 Subject: [PATCH] Improve tracking of temporary files Signed-off-by: Ben Sherman --- docs/process.md | 6 +- .../src/main/groovy/nextflow/Session.groovy | 11 ++ .../nextflow/processor/TaskProcessor.groovy | 5 + .../trace/TemporaryFileObserver.groovy | 134 ++++++++++++------ .../nextflow/trace/TraceObserver.groovy | 5 + 5 files changed, 111 insertions(+), 50 deletions(-) diff --git a/docs/process.md b/docs/process.md index 06ef28dfaa..81fa754b82 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1020,13 +1020,13 @@ This feature is experimental and may change in a future release. When a `path` output is declared with `temporary: true`, the target files for this output will be automatically deleted during pipeline execution, as soon as they are no longer needed by downstream tasks. This feature is useful for cleaning up large intermediate files in order to free up disk storage. -The lifetime of a temporary file is determined by the processes that are downstream of the file's originating process, either directly or indirectly through channel operators. When all of these processes finish (i.e. all of their tasks finish), all temporary files produced by the original process can be deleted. +The lifetime of a temporary file is determined by the downstream tasks that take the file as an input. When all of these tasks finish, the temporary file can be deleted. The following caveats apply when using temporary outputs: -- This feature will break the resumability of your pipeline. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run. +- Resumability is not currently supported for tasks with temporary outputs. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run. -- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. +- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. To avoid this problem, only the most downstream output should be declared as temporary. - If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary file. If the regular output path is also published, some outputs may be deleted before they can be published. diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 87660008ac..836d2f50e7 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -941,6 +941,17 @@ class Session implements ISession { } } + void notifyProcessClose(TaskProcessor process) { + observers.each { observer -> + try { + observer.onProcessClose(process) + } + catch( Exception e ) { + log.debug(e.getMessage(), e) + } + } + } + void notifyProcessTerminate(TaskProcessor process) { for( int i=0; i it.incCompleted() } } + protected void closeProcess() { + session.notifyProcessClose(this) + } + protected void terminateProcess() { log.trace "<${name}> Sending poison pills and terminating process" sendPoisonPill() @@ -2440,6 +2444,7 @@ class TaskProcessor { @Override void afterStop(final DataflowProcessor processor) { log.trace "<${name}> After stop" + closeProcess() } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy index eafd6ebd07..e487d8746d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.Lock import java.util.concurrent.locks.ReentrantLock +import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.DAG @@ -28,6 +29,7 @@ import nextflow.dag.ConcreteDAG import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor +import nextflow.processor.TaskRun import nextflow.script.params.FileOutParam /** * Track temporary output files and delete them once they @@ -36,11 +38,18 @@ import nextflow.script.params.FileOutParam * @author Ben Sherman */ @Slf4j +@CompileStatic class TemporaryFileObserver implements TraceObserver { private DAG dag - private Map statusMap = new ConcurrentHashMap<>() + private Map pathStatuses = new HashMap<>() + + private Set completedTasks = new HashSet<>() + + private Map> processConsumers = new HashMap<>() + + private Set closedProcesses = new HashSet<>() private Lock sync = new ReentrantLock() @@ -49,6 +58,29 @@ class TemporaryFileObserver implements TraceObserver { this.dag = session.dag } + /** + * When a task is created, add it to the set of consumers for any temporary + * file that it takes as input. + * + * @param handler + * @param trace + */ + @Override + void onProcessPending(TaskHandler handler, TraceRecord trace) { + sync.lock() + try { + final task = handler.task + for( def entry : task.getInputFilesMap() ) { + final storePath = entry.value + if( storePath in pathStatuses ) + pathStatuses[storePath].consumers.add(task) + } + } + finally { + sync.unlock() + } + } + /** * When a task is completed, track any temporary output files * for automatic cleanup. @@ -58,27 +90,25 @@ class TemporaryFileObserver implements TraceObserver { */ @Override void onProcessComplete(TaskHandler handler, TraceRecord trace) { - // find all temporary output files + // query all temporary files produced by task final task = handler.task final tempOutputs = task .getOutputsByType(FileOutParam) .findAll { param, paths -> param.temporary } .values() - .flatten() - - if( tempOutputs.isEmpty() ) - return - - // update status tracker for the task's process - final processName = task.processor.name + .flatten() as Set sync.lock() try { - if( processName !in statusMap ) { - log.trace "Process ${processName} has temporary output files, tracking for automatic cleanup" - statusMap[processName] = new Status(findAllConsumers(processName)) - } - statusMap[processName].paths.addAll(tempOutputs) + // mark task as completed + completedTasks.add(task) + + // scan temporary files for cleanup + cleanup0() + + // add new temporary outputs to status map + for( Path path : tempOutputs ) + pathStatuses[path] = new Status(task) } finally { sync.unlock() @@ -86,11 +116,18 @@ class TemporaryFileObserver implements TraceObserver { } /** - * Find all processes which are consumers of a given process. + * Get the consumers of a process. * * @param processName */ - private Set findAllConsumers(String processName) { + private Set getProcessConsumers(String processName) { + if( processName !in processConsumers ) + processConsumers[processName] = getProcessConsumers0(processName) + + return processConsumers[processName] + } + + private Set getProcessConsumers0(String processName) { // find the task's process node in the abstract dag final processNode = dag.vertices @@ -126,54 +163,57 @@ class TemporaryFileObserver implements TraceObserver { } /** - * When a process is completed, update the status of any processes - * that are waiting on it in order to cleanup temporary outputs. - * - * If, after this update is removed, a process has no more barriers, - * then clean all temporary files for that process. + * When a process is closed (all tasks of the process have been created), + * mark the process as closed and scan for automatic cleanup. * * @param process */ @Override - void onProcessTerminate(TaskProcessor process) { + void onProcessClose(TaskProcessor process) { sync.lock() try { - for( def entry : statusMap ) { - // remove barrier from each upstream process - final producer = entry.key - final status = entry.value - final consumers = status.processBarriers - - consumers.remove(process.name) - - // if a process has no more barriers, trigger the cleanup - if( consumers.isEmpty() ) { - log.trace "All consumers of process ${producer} are complete, deleting temporary files" - - deleteTemporaryFiles(status.paths) - statusMap.remove(producer) - } - } + closedProcesses.add(process.name) + cleanup0() } finally { sync.unlock() } } - private void deleteTemporaryFiles(Collection paths) { - for( Path path : paths ) { - log.trace "Deleting temporary file: ${path}" - FileHelper.deletePath(path) + /** + * Delete any temporary file that has no more barriers. + */ + private void cleanup0() { + for( Path path : pathStatuses.keySet() ) { + final status = pathStatuses[path] + if( !status.deleted && canDelete(path) ) { + log.trace "Deleting temporary file: ${path}" + FileHelper.deletePath(path) + status.deleted = true + } } } + /** + * Determine whether a path can be deleted. + * + * A path can be deleted if all of its process consumers + * are closed and all of its task consumers are completed. + */ + private boolean canDelete(Path path) { + final status = pathStatuses[path] + final processConsumers = getProcessConsumers(status.task.processor.name) + final taskConsumers = status.consumers + processConsumers.every { p -> p in closedProcesses } && taskConsumers.every { t -> t in completedTasks } + } + static private class Status { - Set paths - Set processBarriers + TaskRun task + Set consumers = [] as Set + boolean deleted = false - Status(Set processes) { - this.paths = [] as Set - this.processBarriers = processes + Status(TaskRun task) { + this.task = task } } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy index 45e414764e..aacdbe95de 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy @@ -50,6 +50,11 @@ trait TraceObserver { */ void onProcessCreate( TaskProcessor process ){} + /** + * Invoked when the process is closed (all tasks have been created). + */ + void onProcessClose( TaskProcessor process ){} + /* * Invoked when all tak have been executed and process ends. */