diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 4bf66f9860..3f011dedea 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -456,21 +456,35 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { // if there are no tasks checks the job status return checkJobStatus() } + + final elapsed = System.currentTimeMillis() - submissionTime + if (elapsed > executor.config.maxStatusDuration.toMillis()) { + throw new ProcessUnrecoverableException("[GOOGLE BATCH] Task status check timed out after ${executor.config.maxStatusDuration} - job=$jobId task=$taskId not found") + } + final now = System.currentTimeMillis() final delta = now - timestamp; if( !taskState || delta >= 1_000) { - final status = client.getTaskStatus(jobId, taskId) - final newState = status?.state as String - if( newState ) { - log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState" - taskState = newState - timestamp = now + try { + final status = client.getTaskStatus(jobId, taskId) + final newState = status?.state as String + if( newState ) { + log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState" + taskState = newState + timestamp = now + } + if( newState == 'PENDING' ) { + final eventsCount = status.getStatusEventsCount() + final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null + if( lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED') ) + log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}" + } } - if( newState == 'PENDING' ) { - final eventsCount = status.getStatusEventsCount() - final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null - if( lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED') ) - log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}" + catch (NotFoundException e) { + log.trace "[GOOGLE BATCH] Task status not found yet for job=$jobId task=$taskId - returning PENDING state" + taskState = 'PENDING' + timestamp = now + return taskState } } return taskState diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy index de9b6e2d38..95b4ac1799 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchConfig.groovy @@ -68,7 +68,8 @@ class BatchConfig { String getServiceAccountEmail() { serviceAccountEmail } BatchRetryConfig getRetryConfig() { retryConfig } List getAutoRetryExitCodes() { autoRetryExitCodes } - + Duration getMaxStatusDuration() { maxStatusDuration } + static BatchConfig create(Session session) { final result = new BatchConfig() result.googleOpts = GoogleOpts.create(session) @@ -87,6 +88,7 @@ class BatchConfig { result.serviceAccountEmail = session.config.navigate('google.batch.serviceAccountEmail') result.retryConfig = new BatchRetryConfig( session.config.navigate('google.batch.retryPolicy') as Map ?: Map.of() ) result.autoRetryExitCodes = session.config.navigate('google.batch.autoRetryExitCodes', DEFAULT_RETRY_LIST) as List + result.maxStatusDuration = session.config.navigate('google.batch.maxStatusDuration', Duration.ofMinutes(5)) as Duration return result }