Skip to content

Commit

Permalink
Add deleteTasksOnCompletion to Azure Batch configuration (nextflow-io…
Browse files Browse the repository at this point in the history
…#4114)


Deleting Azure Tasks was checking the configuration object
deleteJobsOnCompletion which was incorrect since a task belongs to a
job. This adds the equivalent configuration for tasks which is checked
before deleting the tasks.



Signed-off-by: Adam Talbot <[email protected]>
Signed-off-by: Ben Sherman <[email protected]>
Signed-off-by: Adam Talbot <[email protected]>
Co-authored-by: Ben Sherman <[email protected]>
  • Loading branch information
2 people authored and abhi18av committed Oct 28, 2023
1 parent 4762e82 commit dfcfcbb
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 35 deletions.
21 changes: 16 additions & 5 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,21 +330,32 @@ The following settings are available:
`azure.batch.copyToolInstallMode`
: Specify where the `azcopy` tool used by Nextflow. When `node` is specified it's copied once during the pool creation. When `task` is provider, it's installed for each task execution (default: `node`).

`azure.batch.terminateJobsOnCompletion`
: Enables the Batch Job to automatically terminate a job once all tasks have completed (default: `true`).

`azure.batch.deleteJobsOnCompletion`
: Enable the automatic deletion of jobs created by the pipeline execution (default: `true`).
: Delete all jobs when the workflow completes (default: `false`).
: :::{versionchanged} 23.08.0-edge
Default value was changed from `true` to `false`.
:::

`azure.batch.deletePoolsOnCompletion`
: Enable the automatic deletion of compute node pools upon pipeline completion (default: `false`).
: Delete all compute node pools when the workflow completes (default: `false`).

`azure.batch.deleteTasksOnCompletion`
: :::{versionadded} 23.08.0-edge
:::
: Delete each task when it completes (default: `true`).
: Although this setting is enabled by default, failed tasks will not be deleted unless it is explicitly enabled. This way, the default behavior is that successful tasks are deleted while failed tasks are preserved for debugging purposes.

`azure.batch.endpoint`
: The batch service endpoint e.g. `https://nfbatch1.westeurope.batch.azure.com`.

`azure.batch.location`
: The name of the batch service region, e.g. `westeurope` or `eastus2`. This is not needed when the endpoint is specified.

`azure.batch.terminateJobsOnCompletion`
: :::{versionadded} 23.05.0-edge
:::
: When the workflow completes, set all jobs to terminate on task completion. (default: `true`).

`azure.batch.pools.<name>.autoScale`
: Enable autoscaling feature for the pool identified with `<name>`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,17 +785,13 @@ class AzBatchService implements Closeable {
apply(() -> client.taskOperations().deleteTask(key.jobId, key.taskId))
}

/**
* Set all jobs to terminate on completion.
*/
protected void terminateJobs() {
/*
We set the job to terminate when all tasks are complete rather than directly terminating, this allows Azure Batch to handle the termination for us.
*/

for( Map.Entry<TaskProcessor,String> entry : allJobIds ) {
final proc = entry.key
final jobId = entry.value

for( String jobId : allJobIds.values() ) {
try {
log.trace "Terminating Azure job ${jobId}"
log.trace "Setting Azure job ${jobId} to terminate on completion"

CloudJob job = apply(() -> client.jobOperations().getJob(jobId))
final poolInfo = job.poolInfo()
Expand All @@ -813,10 +809,7 @@ class AzBatchService implements Closeable {
}

protected void cleanupJobs() {
for( Map.Entry<TaskProcessor,String> entry : allJobIds ) {
final proc = entry.key
final jobId = entry.value

for( String jobId : allJobIds.values() ) {
try {
log.trace "Deleting Azure job ${jobId}"
apply(() -> client.jobOperations().deleteJob(jobId))
Expand All @@ -828,7 +821,7 @@ class AzBatchService implements Closeable {
}

protected void cleanupPools() {
for( String poolId : allPools.keySet()) {
for( String poolId : allPools.keySet() ) {
try {
apply(() -> client.poolOperations().deletePool(poolId))
}
Expand All @@ -849,17 +842,20 @@ class AzBatchService implements Closeable {
}
return identity
}

@Override
void close() {
// Terminate existing jobs to prevent them occupying quota
if( config.batch().terminateJobsOnCompletion!=Boolean.FALSE ) {
// terminate all jobs to prevent them from occupying quota
if( config.batch().terminateJobsOnCompletion ) {
terminateJobs()
}

// cleanup app successful jobs
if( config.batch().deleteJobsOnCompletion!=Boolean.FALSE ) {
// delete all jobs
if( config.batch().deleteJobsOnCompletion ) {
cleanupJobs()
}

// delete all autopools
if( config.batch().canCreatePool() && config.batch().deletePoolsOnCompletion ) {
cleanupPools()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

private Boolean shouldDelete() {
executor.config.batch().deleteJobsOnCompletion
executor.config.batch().deleteTasksOnCompletion
}

protected void deleteTask(AzTaskKey taskKey, TaskRun task) {
if( !taskKey || shouldDelete()==Boolean.FALSE )
return

if( !task.isSuccess() && shouldDelete()==null ) {
// do not delete successfully executed pods for debugging purpose
// preserve failed tasks for debugging purposes, unless deletion is explicitly enabled
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class AzBatchOpts implements CloudTransferOptions {
Boolean terminateJobsOnCompletion
Boolean deleteJobsOnCompletion
Boolean deletePoolsOnCompletion
Boolean deleteTasksOnCompletion
CopyToolInstallMode copyToolInstallMode

Map<String,AzPoolOpts> pools
Expand All @@ -63,9 +64,10 @@ class AzBatchOpts implements CloudTransferOptions {
location = config.location
autoPoolMode = config.autoPoolMode
allowPoolCreation = config.allowPoolCreation
terminateJobsOnCompletion = config.terminateJobsOnCompletion
terminateJobsOnCompletion = config.terminateJobsOnCompletion != Boolean.FALSE
deleteJobsOnCompletion = config.deleteJobsOnCompletion
deletePoolsOnCompletion = config.deletePoolsOnCompletion
deleteTasksOnCompletion = config.deleteTasksOnCompletion
pools = parsePools(config.pools instanceof Map ? config.pools as Map<String,Map> : Collections.<String,Map>emptyMap())
maxParallelTransfers = config.maxParallelTransfers ? config.maxParallelTransfers as int : MAX_TRANSFER
maxTransferAttempts = config.maxTransferAttempts ? config.maxTransferAttempts as int : MAX_TRANSFER_ATTEMPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,20 @@ class AzBatchServiceTest extends Specification {

}


def 'should cleanup jobs by default' () {
def 'should set jobs to automatically terminate by default' () {
given:
def CONFIG = [:]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService svc = Spy(AzBatchService, constructorArgs:[exec])
when:
svc.close()
then:
1 * svc.cleanupJobs() >> null
1 * svc.terminateJobs() >> null
}

def 'should cleanup jobs no cleanup jobs' () {
def 'should not cleanup jobs by default' () {
given:
def CONFIG = [batch:[deleteJobsOnCompletion: false]]
def CONFIG = [:]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService svc = Spy(AzBatchService, constructorArgs:[exec])
when:
Expand All @@ -373,7 +372,18 @@ class AzBatchServiceTest extends Specification {
0 * svc.cleanupJobs() >> null
}

def 'should cleanup not cleanup pools by default' () {
def 'should cleanup jobs if specified' () {
given:
def CONFIG = [batch:[deleteJobsOnCompletion: true]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService svc = Spy(AzBatchService, constructorArgs:[exec])
when:
svc.close()
then:
1 * svc.cleanupJobs() >> null
}

def 'should not cleanup pools by default' () {
given:
def CONFIG = [:]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
Expand All @@ -395,7 +405,7 @@ class AzBatchServiceTest extends Specification {
1 * svc.cleanupPools() >> null
}

def 'should cleanup cleanup pools with allowPoolCreation' () {
def 'should cleanup pools with allowPoolCreation' () {
given:
def CONFIG = [batch:[allowPoolCreation: true, deletePoolsOnCompletion: true]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ class AzureConfigTest extends Specification {

and:
cfg.batch().endpoint == null
cfg.batch().terminateJobsOnCompletion == true
cfg.batch().deleteJobsOnCompletion == null
cfg.batch().deletePoolsOnCompletion == null
cfg.batch().deleteTasksOnCompletion == null
cfg.batch().location == null
cfg.batch().autoPoolMode == null
cfg.batch().allowPoolCreation == null
Expand Down Expand Up @@ -99,8 +101,11 @@ class AzureConfigTest extends Specification {
endpoint: ENDPOINT,
location: LOCATION,
autoPoolMode: true,
allowPoolCreation: true, deleteJobsOnCompletion: false,
allowPoolCreation: true,
terminateJobsOnCompletion: false,
deleteJobsOnCompletion: true,
deletePoolsOnCompletion: true,
deleteTasksOnCompletion: false,
pools: [ myPool: [
vmType: 'Foo_A1',
autoScale: true,
Expand All @@ -124,8 +129,10 @@ class AzureConfigTest extends Specification {
cfg.batch().location == LOCATION
cfg.batch().autoPoolMode == true
cfg.batch().allowPoolCreation == true
cfg.batch().deleteJobsOnCompletion == false
cfg.batch().terminateJobsOnCompletion == false
cfg.batch().deleteJobsOnCompletion == true
cfg.batch().deletePoolsOnCompletion == true
cfg.batch().deleteTasksOnCompletion == false
cfg.batch().canCreatePool()
and:
cfg.batch().pool('myPool').vmType == 'Foo_A1'
Expand Down

0 comments on commit dfcfcbb

Please sign in to comment.