diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy index 8eb624393..934fa9937 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy @@ -44,9 +44,9 @@ interface K8sService { V1Pod getPod(String name) - String logsPod(String podName) + String logsPod(String name) - String logsPod(String podName, String containerName) + String logsPod(String name, String containerName) void deletePod(String name) diff --git a/src/test/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategyTest.groovy b/src/test/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategyTest.groovy new file mode 100644 index 000000000..59f478d9c --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/blob/impl/KubeTransferStrategyTest.groovy @@ -0,0 +1,106 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.blob.impl + +import spock.lang.Specification + +import java.time.Duration + +import io.kubernetes.client.openapi.models.V1ContainerStateTerminated +import io.kubernetes.client.openapi.models.V1Job +import io.kubernetes.client.openapi.models.V1Pod +import io.kubernetes.client.openapi.models.V1PodList +import io.kubernetes.client.openapi.models.V1PodStatus +import io.seqera.wave.configuration.BlobCacheConfig +import io.seqera.wave.service.blob.BlobCacheInfo +import io.seqera.wave.service.k8s.K8sService +/** + * + * @author Munish Chouhan + */ +class KubeTransferStrategyTest extends Specification { + + def "transfer should return completed info when job is terminated"() { + given: + def config = Mock(BlobCacheConfig) { + getTransferTimeout() >> Duration.ofSeconds(20) + getBackoffLimit() >> 3 + } + + List command = ["command1", "command2"] + String podName = "pod-123" + def pod = new V1Pod(metadata: [name: podName]) + pod.status = new V1PodStatus(phase: "Succeeded") + def podList = new V1PodList(items: [pod]) + String stdout = "success" + + def k8sService = Mock(K8sService) + k8sService.transferJob(_, _, _, _) >> new V1Job(metadata: [name: "job-123"]) + k8sService.waitJob(_, _) >> podList + k8sService.getPod(_) >> pod + k8sService.waitPod(_, _, _) >> new V1ContainerStateTerminated().exitCode(0) + k8sService.logsPod(_, _) >> stdout + + KubeTransferStrategy strategy = new KubeTransferStrategy(blobConfig: config, k8sService: k8sService) + BlobCacheInfo info = BlobCacheInfo.create("https://test.com/blobs", null) + + when: + BlobCacheInfo result = strategy.transfer(info, command) + + then: + result.exitStatus == 0 + result.logs == stdout + result.done() + result.succeeded() + } + + def "transfer should return failed info when job is not terminated"() { + given: + def config = Mock(BlobCacheConfig) { + getTransferTimeout() >> Duration.ofSeconds(20) + getBackoffLimit() >> 3 + } + + List command = ["command1", "command2"] + String podName = "pod-123" + def pod = new V1Pod(metadata: [name: podName]) + pod.status = new V1PodStatus(phase: "Succeeded") + def podList = new V1PodList(items: [pod]) + String stdout = "failed" + + def k8sService = Mock(K8sService) + k8sService.transferJob(_, _, _, _) >> new V1Job(metadata: [name: "job-123"]) + k8sService.waitJob(_, _) >> podList + k8sService.getPod(_) >> pod + k8sService.waitPod(_, _, _) >> new V1ContainerStateTerminated().exitCode(1) + k8sService.logsPod(_, _) >> stdout + + KubeTransferStrategy strategy = new KubeTransferStrategy(blobConfig: config, k8sService: k8sService) + BlobCacheInfo info = BlobCacheInfo.create("https://test.com/blobs", null) + + when: + BlobCacheInfo result = strategy.transfer(info, command) + + then: + result.exitStatus == 1 + result.logs == stdout + result.done() + !result.succeeded() + } +}