Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deleteTasksOnCompletion to Azure Batch configuration #4114

21 changes: 16 additions & 5 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,21 +330,32 @@ The following settings are available:
`azure.batch.copyToolInstallMode`
: Specify where the `azcopy` tool used by Nextflow. When `node` is specified it's copied once during the pool creation. When `task` is provider, it's installed for each task execution (default: `node`).

`azure.batch.terminateJobsOnCompletion`
: Enables the Batch Job to automatically terminate a job once all tasks have completed (default: `true`).

`azure.batch.deleteJobsOnCompletion`
: Enable the automatic deletion of jobs created by the pipeline execution (default: `true`).
: Delete all jobs when the workflow completes (default: `false`).
: :::{versionchanged} 23.08.0-edge
Default value was changed from `true` to `false`.
:::

`azure.batch.deletePoolsOnCompletion`
: Enable the automatic deletion of compute node pools upon pipeline completion (default: `false`).
: Delete all compute node pools when the workflow completes (default: `false`).

`azure.batch.deleteTasksOnCompletion`
: :::{versionadded} 23.08.0-edge
:::
: Delete each task when it completes (default: `true`).
: Although this setting is enabled by default, failed tasks will not be deleted unless it is explicitly enabled. This way, the default behavior is that successful tasks are deleted while failed tasks are preserved for debugging purposes.

`azure.batch.endpoint`
: The batch service endpoint e.g. `https://nfbatch1.westeurope.batch.azure.com`.

`azure.batch.location`
: The name of the batch service region, e.g. `westeurope` or `eastus2`. This is not needed when the endpoint is specified.

`azure.batch.terminateJobsOnCompletion`
: :::{versionadded} 23.05.0-edge
:::
: When the workflow completes, set all jobs to terminate on task completion. (default: `true`).

`azure.batch.pools.<name>.autoScale`
: Enable autoscaling feature for the pool identified with `<name>`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,17 +785,13 @@ class AzBatchService implements Closeable {
apply(() -> client.taskOperations().deleteTask(key.jobId, key.taskId))
}

/**
* Set all jobs to terminate on completion.
*/
protected void terminateJobs() {
/*
We set the job to terminate when all tasks are complete rather than directly terminating, this allows Azure Batch to handle the termination for us.
*/

for( Map.Entry<TaskProcessor,String> entry : allJobIds ) {
final proc = entry.key
final jobId = entry.value

for( String jobId : allJobIds.values() ) {
try {
log.trace "Terminating Azure job ${jobId}"
log.trace "Setting Azure job ${jobId} to terminate on completion"

CloudJob job = apply(() -> client.jobOperations().getJob(jobId))
final poolInfo = job.poolInfo()
Expand All @@ -813,10 +809,7 @@ class AzBatchService implements Closeable {
}

protected void cleanupJobs() {
for( Map.Entry<TaskProcessor,String> entry : allJobIds ) {
final proc = entry.key
final jobId = entry.value

for( String jobId : allJobIds.values() ) {
try {
log.trace "Deleting Azure job ${jobId}"
apply(() -> client.jobOperations().deleteJob(jobId))
Expand All @@ -828,7 +821,7 @@ class AzBatchService implements Closeable {
}

protected void cleanupPools() {
for( String poolId : allPools.keySet()) {
for( String poolId : allPools.keySet() ) {
try {
apply(() -> client.poolOperations().deletePool(poolId))
}
Expand All @@ -849,17 +842,20 @@ class AzBatchService implements Closeable {
}
return identity
}

@Override
void close() {
// Terminate existing jobs to prevent them occupying quota
if( config.batch().terminateJobsOnCompletion!=Boolean.FALSE ) {
// terminate all jobs to prevent them from occupying quota
if( config.batch().terminateJobsOnCompletion ) {
terminateJobs()
}

// cleanup app successful jobs
if( config.batch().deleteJobsOnCompletion!=Boolean.FALSE ) {
// delete all jobs
if( config.batch().deleteJobsOnCompletion ) {
cleanupJobs()
}

// delete all autopools
if( config.batch().canCreatePool() && config.batch().deletePoolsOnCompletion ) {
cleanupPools()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

private Boolean shouldDelete() {
executor.config.batch().deleteJobsOnCompletion
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
executor.config.batch().deleteTasksOnCompletion
}

protected void deleteTask(AzTaskKey taskKey, TaskRun task) {
if( !taskKey || shouldDelete()==Boolean.FALSE )
return

if( !task.isSuccess() && shouldDelete()==null ) {
// do not delete successfully executed pods for debugging purpose
// preserve failed tasks for debugging purposes, unless deletion is explicitly enabled
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class AzBatchOpts implements CloudTransferOptions {
Boolean terminateJobsOnCompletion
Boolean deleteJobsOnCompletion
Boolean deletePoolsOnCompletion
Boolean deleteTasksOnCompletion
CopyToolInstallMode copyToolInstallMode

Map<String,AzPoolOpts> pools
Expand All @@ -63,9 +64,10 @@ class AzBatchOpts implements CloudTransferOptions {
location = config.location
autoPoolMode = config.autoPoolMode
allowPoolCreation = config.allowPoolCreation
terminateJobsOnCompletion = config.terminateJobsOnCompletion
terminateJobsOnCompletion = config.terminateJobsOnCompletion != Boolean.FALSE
deleteJobsOnCompletion = config.deleteJobsOnCompletion
deletePoolsOnCompletion = config.deletePoolsOnCompletion
deleteTasksOnCompletion = config.deleteTasksOnCompletion
pools = parsePools(config.pools instanceof Map ? config.pools as Map<String,Map> : Collections.<String,Map>emptyMap())
maxParallelTransfers = config.maxParallelTransfers ? config.maxParallelTransfers as int : MAX_TRANSFER
maxTransferAttempts = config.maxTransferAttempts ? config.maxTransferAttempts as int : MAX_TRANSFER_ATTEMPTS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,20 @@ class AzBatchServiceTest extends Specification {

}


def 'should cleanup jobs by default' () {
def 'should set jobs to automatically terminate by default' () {
given:
def CONFIG = [:]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService svc = Spy(AzBatchService, constructorArgs:[exec])
when:
svc.close()
then:
1 * svc.cleanupJobs() >> null
1 * svc.terminateJobs() >> null
}

def 'should cleanup jobs no cleanup jobs' () {
def 'should not cleanup jobs by default' () {
given:
def CONFIG = [batch:[deleteJobsOnCompletion: false]]
def CONFIG = [:]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService svc = Spy(AzBatchService, constructorArgs:[exec])
when:
Expand All @@ -373,7 +372,18 @@ class AzBatchServiceTest extends Specification {
0 * svc.cleanupJobs() >> null
}

def 'should cleanup not cleanup pools by default' () {
def 'should cleanup jobs if specified' () {
given:
def CONFIG = [batch:[deleteJobsOnCompletion: true]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService svc = Spy(AzBatchService, constructorArgs:[exec])
when:
svc.close()
then:
1 * svc.cleanupJobs() >> null
}

def 'should not cleanup pools by default' () {
given:
def CONFIG = [:]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
Expand All @@ -395,7 +405,7 @@ class AzBatchServiceTest extends Specification {
1 * svc.cleanupPools() >> null
}

def 'should cleanup cleanup pools with allowPoolCreation' () {
def 'should cleanup pools with allowPoolCreation' () {
given:
def CONFIG = [batch:[allowPoolCreation: true, deletePoolsOnCompletion: true]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ class AzureConfigTest extends Specification {

and:
cfg.batch().endpoint == null
cfg.batch().terminateJobsOnCompletion == true
cfg.batch().deleteJobsOnCompletion == null
cfg.batch().deletePoolsOnCompletion == null
cfg.batch().deleteTasksOnCompletion == null
cfg.batch().location == null
cfg.batch().autoPoolMode == null
cfg.batch().allowPoolCreation == null
Expand Down Expand Up @@ -99,8 +101,11 @@ class AzureConfigTest extends Specification {
endpoint: ENDPOINT,
location: LOCATION,
autoPoolMode: true,
allowPoolCreation: true, deleteJobsOnCompletion: false,
allowPoolCreation: true,
terminateJobsOnCompletion: false,
deleteJobsOnCompletion: true,
deletePoolsOnCompletion: true,
deleteTasksOnCompletion: false,
pools: [ myPool: [
vmType: 'Foo_A1',
autoScale: true,
Expand All @@ -124,8 +129,10 @@ class AzureConfigTest extends Specification {
cfg.batch().location == LOCATION
cfg.batch().autoPoolMode == true
cfg.batch().allowPoolCreation == true
cfg.batch().deleteJobsOnCompletion == false
cfg.batch().terminateJobsOnCompletion == false
cfg.batch().deleteJobsOnCompletion == true
cfg.batch().deletePoolsOnCompletion == true
cfg.batch().deleteTasksOnCompletion == false
cfg.batch().canCreatePool()
and:
cfg.batch().pool('myPool').vmType == 'Foo_A1'
Expand Down