Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for temporary output paths #3818

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,9 @@ Available options:
`maxDepth`
: Maximum number of directory levels to visit (default: no limit)

`temporary`
: When `true` the target files will be deleted once they are no longer needed by downstream tasks.

`type`
: Type of paths returned, either `file`, `dir` or `any` (default: `any`, or `file` if the specified file name pattern contains a double star (`**`))

Expand Down Expand Up @@ -1009,6 +1012,24 @@ One example in which you'd need to manage the naming of output files is when you
To sum up, the use of output files with static names over dynamic ones is preferable whenever possible, because it will result in simpler and more portable code.
:::

### Temporary output files

:::{warning}
This feature is experimental and may change in a future release.
:::

When a `path` output is declared with `temporary: true`, the target files for this output will be automatically deleted during pipeline execution, as soon as they are no longer needed by downstream tasks. This feature is useful for cleaning up large intermediate files in order to free up disk storage.

The lifetime of a temporary file is determined by the downstream tasks that take the file as an input. When all of these tasks finish, the temporary file can be deleted.

The following caveats apply when using temporary outputs:

- Resumability is not currently supported for tasks with temporary outputs. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run.

- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. To avoid this problem, only the most downstream output should be declared as temporary.

- If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary file. If the regular output path is also published, some outputs may be deleted before they can be published.

(process-env)=

### Output type `env`
Expand Down
11 changes: 11 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2298,6 +2298,10 @@ class TaskProcessor {
state.update { StateObj it -> it.incCompleted() }
}

protected void closeProcess() {
session.notifyProcessClose(this)
}

protected void terminateProcess() {
log.trace "<${name}> Sending poison pills and terminating process"
sendPoisonPill()
Expand Down Expand Up @@ -2440,6 +2444,7 @@ class TaskProcessor {
@Override
void afterStop(final DataflowProcessor processor) {
log.trace "<${name}> After stop"
closeProcess()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ class FileOutParam extends BaseOutParam implements OutParam, OptionalParam, Path

boolean glob = true

/**
* When true the target files will be deleted once they are no longer needed by downstream tasks.
*/
boolean temporary = false

private GString gstring
private Closure<String> dynamicObj
private String filePattern
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DefaultObserverFactory implements TraceObserverFactory {
createDagObserver(result)
createWebLogObserver(result)
createAnsiLogObserver(result)
createTemporaryFileObserver(result)
return result
}

Expand Down Expand Up @@ -116,4 +117,8 @@ class DefaultObserverFactory implements TraceObserverFactory {
result << observer
}

protected void createTemporaryFileObserver(Collection<TraceObserver> result) {
result << new TemporaryFileObserver()
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace

import java.nio.file.Path
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.dag.DAG
import nextflow.file.FileHelper
import nextflow.processor.TaskHandler
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
/**
* Track temporary output files and delete them once they
* are no longer needed.
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
@CompileStatic
class TemporaryFileObserver implements TraceObserver {

private DAG dag

private Map<Path,Status> pathStatuses = new HashMap<>()

private Set<TaskRun> completedTasks = new HashSet<>()

private Map<String,Set<String>> processConsumers = new HashMap<>()

private Set<String> closedProcesses = new HashSet<>()

private Lock sync = new ReentrantLock()

@Override
void onFlowCreate(Session session) {
this.dag = session.dag
}

/**
* When a task is created, add it to the set of consumers for any temporary
* file that it takes as input.
*
* @param handler
* @param trace
*/
@Override
void onProcessPending(TaskHandler handler, TraceRecord trace) {
sync.lock()
try {
final task = handler.task
for( def entry : task.getInputFilesMap() ) {
final storePath = entry.value
if( storePath in pathStatuses )
pathStatuses[storePath].consumers.add(task)
}
}
finally {
sync.unlock()
}
}

/**
* When a task is completed, track any temporary output files
* for automatic cleanup.
*
* @param handler
* @param trace
*/
@Override
void onProcessComplete(TaskHandler handler, TraceRecord trace) {
// query all temporary files produced by task
final task = handler.task
final tempOutputs = task
.getOutputsByType(FileOutParam)
.findAll { param, paths -> param.temporary }
.values()
.flatten() as Set<Path>

sync.lock()
try {
// mark task as completed
completedTasks.add(task)

// scan temporary files for cleanup
cleanup0()

// add new temporary outputs to status map
for( Path path : tempOutputs )
pathStatuses[path] = new Status(task)
}
finally {
sync.unlock()
}
}

/**
* Get the consumers of a process.
*
* @param processName
*/
private Set<String> getProcessConsumers(String processName) {
if( processName !in processConsumers )
processConsumers[processName] = getProcessConsumers0(processName)

return processConsumers[processName]
}

private Set<String> getProcessConsumers0(String processName) {

// find the task's process node in the abstract dag
final processNode = dag.vertices
.find { node -> node.process?.name == processName }

// find all downstream processes in the abstract dag
def consumers = [] as Set
def queue = [ processNode ]

while( !queue.isEmpty() ) {
// remove a node from the search queue
final sourceNode = queue.remove(0)

// search each outgoing edge from the source node
for( def edge : dag.edges ) {
if( edge.from != sourceNode )
continue

def node = edge.to

// add process nodes to the list of consumers
if( node.process != null )
consumers.add(node.process.name)
// add operator nodes to the queue to keep searching
else
queue.add(node)
}
}

log.trace "Process ${processName} has the following consumers: ${consumers.join(', ')}"

return consumers
}

/**
* When a process is closed (all tasks of the process have been created),
* mark the process as closed and scan for automatic cleanup.
*
* @param process
*/
@Override
void onProcessClose(TaskProcessor process) {
sync.lock()
try {
closedProcesses.add(process.name)
cleanup0()
}
finally {
sync.unlock()
}
}

/**
* Delete any temporary file that has no more barriers.
*/
private void cleanup0() {
for( Path path : pathStatuses.keySet() ) {
final status = pathStatuses[path]
if( !status.deleted && canDelete(path) ) {
log.trace "Deleting temporary file: ${path}"
FileHelper.deletePath(path)
status.deleted = true
}
}
}

/**
* Determine whether a path can be deleted.
*
* A path can be deleted if all of its process consumers
* are closed and all of its task consumers are completed.
*/
private boolean canDelete(Path path) {
final status = pathStatuses[path]
final processConsumers = getProcessConsumers(status.task.processor.name)
final taskConsumers = status.consumers
processConsumers.every { p -> p in closedProcesses } && taskConsumers.every { t -> t in completedTasks }
}

static private class Status {
TaskRun task
Set<TaskRun> consumers = [] as Set
boolean deleted = false

Status(TaskRun task) {
this.task = task
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ trait TraceObserver {
*/
void onProcessCreate( TaskProcessor process ){}

/**
* Invoked when the process is closed (all tasks have been created).
*/
void onProcessClose( TaskProcessor process ){}
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

/*
* Invoked when all tak have been executed and process ends.
*/
Expand Down
15 changes: 7 additions & 8 deletions modules/nextflow/src/test/groovy/nextflow/SessionTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import nextflow.container.ContainerConfig
import nextflow.exception.AbortOperationException
import nextflow.script.ScriptFile
import nextflow.script.WorkflowMetadata
import nextflow.trace.TemporaryFileObserver
import nextflow.trace.TraceFileObserver
import nextflow.trace.TraceHelper
import nextflow.trace.WorkflowStatsObserver
import nextflow.trace.TraceFileObserver
import nextflow.util.Duration
import nextflow.util.VersionNumber
import spock.lang.Specification
Expand Down Expand Up @@ -248,26 +249,25 @@ class SessionTest extends Specification {
session = [:] as Session
result = session.createObservers()
then:
result.size()==1
result.size() == 2
result.any { it instanceof WorkflowStatsObserver }
result.any { it instanceof TemporaryFileObserver }

when:
session = [:] as Session
session.config = [trace: [enabled: true, file:'name.txt']]
result = session.createObservers()
observer = result[1] as TraceFileObserver
observer = result.find { it instanceof TraceFileObserver }
then:
result.size() == 2
observer.tracePath == Paths.get('name.txt').complete()
observer.separator == '\t'

when:
session = [:] as Session
session.config = [trace: [enabled: true, sep: 'x', fields: 'task_id,name,exit', file: 'alpha.txt']]
result = session.createObservers()
observer = result[1] as TraceFileObserver
observer = result.find { it instanceof TraceFileObserver }
then:
result.size() == 2
observer.tracePath == Paths.get('alpha.txt').complete()
observer.separator == 'x'
observer.fields == ['task_id','name','exit']
Expand All @@ -283,9 +283,8 @@ class SessionTest extends Specification {
session = [:] as Session
session.config = [trace: [enabled: true, fields: 'task_id,name,exit,vmem']]
result = session.createObservers()
observer = result[1] as TraceFileObserver
observer = result.find { it instanceof TraceFileObserver }
then:
result.size() == 2
observer.tracePath == Paths.get('trace-20221001.txt').complete()
observer.separator == '\t'
observer.fields == ['task_id','name','exit','vmem']
Expand Down
Loading