Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic task cleanup (without resumability) #4713

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
47d0168
Add initial task graph and metadata json file
bentsherman Mar 27, 2023
ae67027
Add task inputs and outputs to conrete DAG
bentsherman Mar 28, 2023
8f95cd6
Fix failing tests
bentsherman Mar 28, 2023
9f11e4b
Use path-based APIs to get file metadata
bentsherman Mar 29, 2023
db6aed1
Merge branch 'master' into ben-task-graph
bentsherman Mar 29, 2023
8456892
Use buffer to compute checksum
bentsherman Mar 30, 2023
77f2cdc
Add support for temporary output paths
bentsherman Mar 31, 2023
3e55ad5
Fix failing test
bentsherman Mar 31, 2023
e307f75
Add caveat about overlapping output channels [ci skip]
bentsherman Mar 31, 2023
08881b0
Delete files instead of emptying them (now supports directories and r…
bentsherman Apr 1, 2023
0cf07ec
Add `eager' cleanup option
bentsherman Apr 10, 2023
73b2f3b
Fix concurrency issues [ci fast]
bentsherman Apr 10, 2023
0dd98d6
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 21, 2023
0f505d3
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 26, 2023
e81e584
Replace synchronized with lock
bentsherman Apr 26, 2023
ba2e7a6
Merge branch 'ben-task-graph-pull' into 452-eager-cleanup
bentsherman Apr 28, 2023
f7bcfa8
Remove dependency on task graph branch
bentsherman Apr 28, 2023
4d90e27
Use downstream tasks to determine lifetime for task cleanup
bentsherman Apr 28, 2023
dd23b2a
Rename TemporaryFileObserver to TaskCleanupObserver
bentsherman Jul 7, 2023
6a34be6
Merge branch 'master' into 452-eager-cleanup
bentsherman Jul 7, 2023
ff08984
Wait for output files to be published
bentsherman Jul 7, 2023
9b343b6
Log warning if eager cleanup is used with incompatible publish modes
bentsherman Jul 7, 2023
6b5a820
Add eager cleanup for individual output files
bentsherman Jul 7, 2023
c42f249
Add warning about includeInputs with eager cleanup
bentsherman Jul 10, 2023
8b66c39
Add resumability
bentsherman Jul 11, 2023
43c939d
Upgrade kryo to 5.4.0 (#3562)
bentsherman Jul 11, 2023
5a78cf9
Add serializer for HashCode
bentsherman Jul 11, 2023
7b02b8e
Fix failing tests
bentsherman Jul 11, 2023
c727c05
Revert Kryo upgrade
bentsherman Jul 12, 2023
d37ee1f
Add kryo instantiator for HashCode
bentsherman Jul 12, 2023
859e96b
Remove HashCode serializer
bentsherman Jul 12, 2023
c89811c
Add log messages, fix infinite hang
bentsherman Jul 12, 2023
a034f60
Add support for includeInputs
bentsherman Jul 12, 2023
9741b1c
Update log messages
bentsherman Jul 12, 2023
3aae1af
Add lazy, eager, aggressive cleanup strategies
bentsherman Jul 12, 2023
08342dd
Merge branch 'master' into 452-eager-cleanup
bentsherman Sep 13, 2023
3498c5d
Add thread pool for task cleanup
bentsherman Sep 14, 2023
c421617
Minor improvement to checkCachedOutput()
bentsherman Sep 14, 2023
5b05aad
Merge branch 'master' into 452-eager-cleanup
bentsherman Jan 31, 2024
9ce10dc
minor edits
bentsherman Feb 1, 2024
b73c6bb
Remove WIP resumability
bentsherman Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1579,10 +1579,32 @@ Read the {ref}`trace-report` page to learn more about the execution report that
There are additional variables that can be defined within a configuration file that do not have a dedicated scope.

`cleanup`
: If `true`, on a successful completion of a run all files in *work* directory are automatically deleted.
: :::{versionchanged} 24.04.0
Added `'lazy'`, `'eager'`, and `'aggressive'` strategies.
:::
: Automatically delete task directories using one of the following strategies:

`false` (default)
: Disable automatic cleanup.

`true`
: Equivalent to `'lazy'`.

`'lazy'`
: If a workflow completes successfully, delete all task directories.
: Note that deleting all work directories at once can take a lot of time, especially when using a shared file system or remote cloud storage.

:::{warning}
The use of the `cleanup` option will prevent the use of the *resume* feature on subsequent executions of that pipeline run. Also, be aware that deleting all scratch files can take a lot of time, especially when using a shared file system or remote cloud storage.
`'eager'`
: Delete each task directory as soon as it is no longer needed by downstream tasks.
: A task can be deleted once all of the tasks that use any of its output files have completed.
: Output files that are published via symlink will be invalidated when the original task directory is deleted. Avoid using the following publish modes: `copyNoFollow`, `rellink`, `symlink`.

`'aggressive'`
: Equivalent to `'eager'`, but also deletes individual output files as soon as they are no longer needed.
: An output file can be deleted once the tasks that use it have completed. In some cases, an output file can be deleted sooner than its originating task.

: :::{note}
Resumability is not currently supported, but will be supported in a future release for the `'lazy'` and `'eager'` cleanup strategies.
:::

`dumpHashes`
Expand Down
45 changes: 14 additions & 31 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ class Session implements ISession {
// -- file porter config
this.filePorter = new FilePorter(this)

// -- normalize cleanup config
if( config.cleanup == true )
config.cleanup = 'lazy'
}

protected Path cloudCachePath(Map cloudcache, Path workDir) {
Expand Down Expand Up @@ -964,6 +967,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down Expand Up @@ -1149,37 +1163,6 @@ class Session implements ISession {
errorAction = action
}

/**
* Delete the workflow work directory from tasks temporary files
*/
void cleanup() {
if( !workDir || !config.cleanup )
return

if( aborted || cancelled || error )
return

CacheDB db = null
try {
log.trace "Cleaning-up workdir"
db = CacheFactory.create(uniqueId, runName).openForRead()
db.eachRecord { HashCode hash, TraceRecord record ->
def deleted = db.removeTaskEntry(hash)
if( deleted ) {
// delete folder
FileHelper.deletePath(FileHelper.asPath(record.workDir))
}
}
log.trace "Clean workdir complete"
}
catch( Exception e ) {
log.warn("Failed to cleanup work dir: ${workDir.toUriString()}")
}
finally {
db.close()
}
}

@Memoized
CondaConfig getCondaConfig() {
final cfg = config.conda as Map ?: Collections.emptyMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class PublishDir {

def result = new PublishDir()
if( params.path )
result.path = params.path
result.path = params.path

if( params.mode )
result.mode = params.mode
Expand Down Expand Up @@ -217,6 +217,18 @@ class PublishDir {
return result
}

boolean canPublish(Path source, TaskRun task) {
if( !sourceDir ) {
this.sourceDir = task.targetDir
this.sourceFileSystem = sourceDir.fileSystem
this.stageInMode = task.config.stageInMode
this.task = task
validatePublishMode()
}

return getPublishTarget(source) != null
}

protected void apply0(Set<Path> files) {
assert path

Expand Down Expand Up @@ -301,13 +313,8 @@ class PublishDir {

protected void apply1(Path source, boolean inProcess ) {

def target = sourceDir ? sourceDir.relativize(source) : source.getFileName()
if( matcher && !matcher.matches(target) ) {
// skip not matching file
return
}

if( saveAs && !(target=saveAs.call(target.toString()))) {
final target = getPublishTarget(source)
if( !target ) {
// skip this file
return
}
Expand Down Expand Up @@ -339,6 +346,17 @@ class PublishDir {

}

protected def getPublishTarget(Path source) {
def target = sourceDir ? sourceDir.relativize(source) : source.getFileName()
if( matcher && !matcher.matches(target) )
return null

if( saveAs && !(target=saveAs.call(target.toString())))
return null

return target
}

protected Path resolveDestination(target) {

if( target instanceof Path ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,10 @@ class TaskProcessor {
state.update { StateObj it -> it.incCompleted() }
}

protected void closeProcess() {
session.notifyProcessClose(this)
}

protected void terminateProcess() {
log.trace "<${name}> Sending poison pills and terminating process"
sendPoisonPill()
Expand Down Expand Up @@ -2493,6 +2497,7 @@ class TaskProcessor {
// apparently auto if-guard instrumented by @Slf4j is not honoured in inner classes - add it explicitly
if( log.isTraceEnabled() )
log.trace "<${name}> After stop"
closeProcess()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,6 @@ class ScriptRunner {

protected shutdown() {
session.destroy()
session.cleanup()
Global.cleanUp()
log.debug "> Execution complete -- Goodbye"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.trace

import groovy.transform.CompileStatic

/**
* Strategies to automatically cleanup task directories.
*
* @author Ben Sherman <[email protected]>
*/
@CompileStatic
enum CleanupStrategy {
LAZY(1),
EAGER(2),
AGGRESSIVE(3)

final int level

CleanupStrategy(int level) {
this.level = level
}

static boolean isValid(CharSequence name) {
if( !name )
return false
try {
valueOf(name.toString().toUpperCase())
return true
}
catch( IllegalArgumentException e ) {
return false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class DefaultObserverFactory implements TraceObserverFactory {
createTimelineObserver(result)
createDagObserver(result)
createAnsiLogObserver(result)
createTaskCleanupObserver(result)
return result
}

Expand Down Expand Up @@ -101,4 +102,12 @@ class DefaultObserverFactory implements TraceObserverFactory {
result << observer
}

protected void createTaskCleanupObserver(Collection<TraceObserver> result) {
final strategy = session.config.cleanup
if( strategy instanceof CharSequence && !CleanupStrategy.isValid(strategy) )
throw new IllegalArgumentException("Invalid cleanup strategy '${strategy}' -- available strategies are ${CleanupStrategy.values().join(',').toLowerCase()}")
if( strategy )
result << new TaskCleanupObserver(strategy.toString().toUpperCase() as CleanupStrategy)
}

}
Loading
Loading