Skip to content

Commit

Permalink
Improve tracking of temporary files
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed Apr 28, 2023
1 parent 6fa9e92 commit f46f506
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 50 deletions.
6 changes: 3 additions & 3 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1020,13 +1020,13 @@ This feature is experimental and may change in a future release.

When a `path` output is declared with `temporary: true`, the target files for this output will be automatically deleted during pipeline execution, as soon as they are no longer needed by downstream tasks. This feature is useful for cleaning up large intermediate files in order to free up disk storage.

The lifetime of a temporary file is determined by the processes that are downstream of the file's originating process, either directly or indirectly through channel operators. When all of these processes finish (i.e. all of their tasks finish), all temporary files produced by the original process can be deleted.
The lifetime of a temporary file is determined by the downstream tasks that take the file as an input. When all of these tasks finish, the temporary file can be deleted.

The following caveats apply when using temporary outputs:

- This feature will break the resumability of your pipeline. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run.
- Resumability is not currently supported for tasks with temporary outputs. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run.

- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output.
- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. To avoid this problem, only the most downstream output should be declared as temporary.

- If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary file. If the regular output path is also published, some outputs may be deleted before they can be published.

Expand Down
11 changes: 11 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,10 @@ class TaskProcessor {
state.update { StateObj it -> it.incCompleted() }
}

protected void closeProcess() {
session.notifyProcessClose(this)
}

protected void terminateProcess() {
log.trace "<${name}> Sending poison pills and terminating process"
sendPoisonPill()
Expand Down Expand Up @@ -2440,6 +2444,7 @@ class TaskProcessor {
@Override
void afterStop(final DataflowProcessor processor) {
log.trace "<${name}> After stop"
closeProcess()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.dag.DAG
import nextflow.dag.ConcreteDAG
import nextflow.file.FileHelper
import nextflow.processor.TaskHandler
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
/**
* Track temporary output files and delete them once they
Expand All @@ -36,11 +38,18 @@ import nextflow.script.params.FileOutParam
* @author Ben Sherman <[email protected]>
*/
@Slf4j
@CompileStatic
class TemporaryFileObserver implements TraceObserver {

private DAG dag

private Map<String,Status> statusMap = new ConcurrentHashMap<>()
private Map<Path,Status> pathStatuses = new HashMap<>()

private Set<TaskRun> completedTasks = new HashSet<>()

private Map<String,Set<String>> processConsumers = new HashMap<>()

private Set<String> closedProcesses = new HashSet<>()

private Lock sync = new ReentrantLock()

Expand All @@ -49,6 +58,29 @@ class TemporaryFileObserver implements TraceObserver {
this.dag = session.dag
}

/**
* When a task is created, add it to the set of consumers for any temporary
* file that it takes as input.
*
* @param handler
* @param trace
*/
@Override
void onProcessPending(TaskHandler handler, TraceRecord trace) {
sync.lock()
try {
final task = handler.task
for( def entry : task.getInputFilesMap() ) {
final storePath = entry.value
if( storePath in pathStatuses )
pathStatuses[storePath].consumers.add(task)
}
}
finally {
sync.unlock()
}
}

/**
* When a task is completed, track any temporary output files
* for automatic cleanup.
Expand All @@ -58,39 +90,44 @@ class TemporaryFileObserver implements TraceObserver {
*/
@Override
void onProcessComplete(TaskHandler handler, TraceRecord trace) {
// find all temporary output files
// query all temporary files produced by task
final task = handler.task
final tempOutputs = task
.getOutputsByType(FileOutParam)
.findAll { param, paths -> param.temporary }
.values()
.flatten()

if( tempOutputs.isEmpty() )
return

// update status tracker for the task's process
final processName = task.processor.name
.flatten() as Set<Path>

sync.lock()
try {
if( processName !in statusMap ) {
log.trace "Process ${processName} has temporary output files, tracking for automatic cleanup"
statusMap[processName] = new Status(findAllConsumers(processName))
}
statusMap[processName].paths.addAll(tempOutputs)
// mark task as completed
completedTasks.add(task)

// scan temporary files for cleanup
cleanup0()

// add new temporary outputs to status map
for( Path path : tempOutputs )
pathStatuses[path] = new Status(task)
}
finally {
sync.unlock()
}
}

/**
* Find all processes which are consumers of a given process.
* Get the consumers of a process.
*
* @param processName
*/
private Set<String> findAllConsumers(String processName) {
private Set<String> getProcessConsumers(String processName) {
if( processName !in processConsumers )
processConsumers[processName] = getProcessConsumers0(processName)

return processConsumers[processName]
}

private Set<String> getProcessConsumers0(String processName) {

// find the task's process node in the abstract dag
final processNode = dag.vertices
Expand Down Expand Up @@ -126,54 +163,57 @@ class TemporaryFileObserver implements TraceObserver {
}

/**
* When a process is completed, update the status of any processes
* that are waiting on it in order to cleanup temporary outputs.
*
* If, after this update is removed, a process has no more barriers,
* then clean all temporary files for that process.
* When a process is closed (all tasks of the process have been created),
* mark the process as closed and scan for automatic cleanup.
*
* @param process
*/
@Override
void onProcessTerminate(TaskProcessor process) {
void onProcessClose(TaskProcessor process) {
sync.lock()
try {
for( def entry : statusMap ) {
// remove barrier from each upstream process
final producer = entry.key
final status = entry.value
final consumers = status.processBarriers

consumers.remove(process.name)

// if a process has no more barriers, trigger the cleanup
if( consumers.isEmpty() ) {
log.trace "All consumers of process ${producer} are complete, deleting temporary files"

deleteTemporaryFiles(status.paths)
statusMap.remove(producer)
}
}
closedProcesses.add(process.name)
cleanup0()
}
finally {
sync.unlock()
}
}

private void deleteTemporaryFiles(Collection<Path> paths) {
for( Path path : paths ) {
log.trace "Deleting temporary file: ${path}"
FileHelper.deletePath(path)
/**
* Delete any temporary file that has no more barriers.
*/
private void cleanup0() {
for( Path path : pathStatuses.keySet() ) {
final status = pathStatuses[path]
if( !status.deleted && canDelete(path) ) {
log.trace "Deleting temporary file: ${path}"
FileHelper.deletePath(path)
status.deleted = true
}
}
}

/**
* Determine whether a path can be deleted.
*
* A path can be deleted if all of its process consumers
* are closed and all of its task consumers are completed.
*/
private boolean canDelete(Path path) {
final status = pathStatuses[path]
final processConsumers = getProcessConsumers(status.task.processor.name)
final taskConsumers = status.consumers
processConsumers.every { p -> p in closedProcesses } && taskConsumers.every { t -> t in completedTasks }
}

static private class Status {
Set<Path> paths
Set<String> processBarriers
TaskRun task
Set<TaskRun> consumers = [] as Set
boolean deleted = false

Status(Set<String> processes) {
this.paths = [] as Set
this.processBarriers = processes
Status(TaskRun task) {
this.task = task
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ trait TraceObserver {
*/
void onProcessCreate( TaskProcessor process ){}

/**
* Invoked when the process is closed (all tasks have been created).
*/
void onProcessClose( TaskProcessor process ){}

/*
* Invoked when all tak have been executed and process ends.
*/
Expand Down

0 comments on commit f46f506

Please sign in to comment.