diff --git a/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy b/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy index 13935e6c0..39e3f5ab2 100644 --- a/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy +++ b/src/main/groovy/io/seqera/wave/configuration/BlobCacheConfig.groovy @@ -72,6 +72,9 @@ class BlobCacheConfig { @Value('${wave.blobCache.s5cmdImage}') String s5Image + @Value('${wave.blobCache.BackoffLimit:3}') + Integer BackoffLimit + @Nullable @Value('${wave.blobCache.requestsCpu}') String requestsCpu diff --git a/src/main/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategy.groovy b/src/main/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategy.groovy index c73315fdc..00df69c0f 100644 --- a/src/main/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategy.groovy +++ b/src/main/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategy.groovy @@ -26,6 +26,7 @@ import io.micronaut.context.annotation.Requires import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.service.blob.BlobCacheInfo import io.seqera.wave.service.blob.TransferStrategy +import io.seqera.wave.service.blob.TransferTimeoutException import io.seqera.wave.service.k8s.K8sService import jakarta.inject.Inject /** @@ -48,9 +49,14 @@ class KubeTransferStrategy implements TransferStrategy { @Override BlobCacheInfo transfer(BlobCacheInfo info, List command) { - final podName = getName(info, "pod") - final jobName = getName(info, "job") - final job = k8sService.transferJob(jobName, podName, blobConfig.s5Image, command, blobConfig) + final name = getName(info) + log.info("command-> ${command.join(' ')}") + final job = k8sService.transferJob(name, name, blobConfig.s5Image, command, blobConfig) + final podList = k8sService.waitJob(job, blobConfig.transferTimeout.toMillis()) + if ( !podList || podList.items.size() < 1 ) { + throw new TransferTimeoutException("Blob transfer job timeout") + } + final podName = podList.items[0].metadata.name final pod = k8sService.getPod(podName) final terminated = k8sService.waitPod(pod, blobConfig.transferTimeout.toMillis()) final stdout = k8sService.logsPod(podName) @@ -59,8 +65,8 @@ class KubeTransferStrategy implements TransferStrategy { : info.failed(stdout) } - protected static String getName(BlobCacheInfo info, String type) { - return "transfer-$type-" + Hashing + protected static String getName(BlobCacheInfo info) { + return "transfer-" + Hashing .sipHash24() .newHasher() .putUnencodedChars(info.locationUri) diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy index 3098e99a3..06019861a 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy @@ -23,6 +23,7 @@ import java.nio.file.Path import io.kubernetes.client.openapi.models.V1ContainerStateTerminated import io.kubernetes.client.openapi.models.V1Job import io.kubernetes.client.openapi.models.V1Pod +import io.kubernetes.client.openapi.models.V1PodList import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.ScanConfig import io.seqera.wave.configuration.SpackConfig @@ -57,4 +58,6 @@ interface K8sService { V1Job transferJob(String name, String podName, String containerImage, List args, BlobCacheConfig blobConfig) + V1PodList waitJob(V1Job job, Long timeout) + } diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy index 9c7992b70..22d43bf0d 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy @@ -35,6 +35,7 @@ import io.kubernetes.client.openapi.models.V1JobBuilder import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource import io.kubernetes.client.openapi.models.V1Pod import io.kubernetes.client.openapi.models.V1PodBuilder +import io.kubernetes.client.openapi.models.V1PodList import io.kubernetes.client.openapi.models.V1ResourceRequirements import io.kubernetes.client.openapi.models.V1Volume import io.kubernetes.client.openapi.models.V1VolumeMount @@ -602,7 +603,7 @@ class K8sServiceImpl implements K8sService { resources.putRequestsItem('memory', new Quantity(requestsMemory)) def spec = builder.withNewSpec() - .withBackoffLimit(0) + .withBackoffLimit(blobConfig.BackoffLimit) .withNewTemplate() .editOrNewSpec() .withServiceAccount(serviceAccount) @@ -629,4 +630,28 @@ class K8sServiceImpl implements K8sService { return result } + /** + * Wait for a job to start a pod + * + * @param k8s job + * @param timeout + * Max wait time in milliseconds + * @return list of pods created by this job + */ + V1PodList waitJob(V1Job job, Long timeout) { + final startTime = System.currentTimeMillis() + // wait for termination + while (System.currentTimeMillis() - startTime < timeout) { + final name = job.metadata.name + final active = job.status?.getActive() + if (active && active > 0) { + return k8sClient. + coreV1Api(). + listNamespacedPod(namespace, null, null, null, null, "job-name=$name", null, null, null, null, null, null) + } + sleep 5_000 + job = getJob(name) + } + return null + } }