Skip to content

Commit

Permalink
added waitJob
Browse files Browse the repository at this point in the history
  • Loading branch information
munishchouhan committed Apr 29, 2024
1 parent df96716 commit 8d357bc
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class BlobCacheConfig {
@Value('${wave.blobCache.s5cmdImage}')
String s5Image

@Value('${wave.blobCache.BackoffLimit:3}')
Integer BackoffLimit

@Nullable
@Value('${wave.blobCache.requestsCpu}')
String requestsCpu
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.micronaut.context.annotation.Requires
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.service.blob.BlobCacheInfo
import io.seqera.wave.service.blob.TransferStrategy
import io.seqera.wave.service.blob.TransferTimeoutException
import io.seqera.wave.service.k8s.K8sService
import jakarta.inject.Inject
/**
Expand All @@ -48,9 +49,14 @@ class KubeTransferStrategy implements TransferStrategy {

@Override
BlobCacheInfo transfer(BlobCacheInfo info, List<String> command) {
final podName = getName(info, "pod")
final jobName = getName(info, "job")
final job = k8sService.transferJob(jobName, podName, blobConfig.s5Image, command, blobConfig)
final name = getName(info)
log.info("command-> ${command.join(' ')}")
final job = k8sService.transferJob(name, name, blobConfig.s5Image, command, blobConfig)
final podList = k8sService.waitJob(job, blobConfig.transferTimeout.toMillis())
if ( !podList || podList.items.size() < 1 ) {
throw new TransferTimeoutException("Blob transfer job timeout")
}
final podName = podList.items[0].metadata.name
final pod = k8sService.getPod(podName)
final terminated = k8sService.waitPod(pod, blobConfig.transferTimeout.toMillis())
final stdout = k8sService.logsPod(podName)
Expand All @@ -59,8 +65,8 @@ class KubeTransferStrategy implements TransferStrategy {
: info.failed(stdout)
}

protected static String getName(BlobCacheInfo info, String type) {
return "transfer-$type-" + Hashing
protected static String getName(BlobCacheInfo info) {
return "transfer-" + Hashing
.sipHash24()
.newHasher()
.putUnencodedChars(info.locationUri)
Expand Down
3 changes: 3 additions & 0 deletions src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.Path
import io.kubernetes.client.openapi.models.V1ContainerStateTerminated
import io.kubernetes.client.openapi.models.V1Job
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.openapi.models.V1PodList
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.configuration.ScanConfig
import io.seqera.wave.configuration.SpackConfig
Expand Down Expand Up @@ -57,4 +58,6 @@ interface K8sService {

V1Job transferJob(String name, String podName, String containerImage, List<String> args, BlobCacheConfig blobConfig)

V1PodList waitJob(V1Job job, Long timeout)

}
27 changes: 26 additions & 1 deletion src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import io.kubernetes.client.openapi.models.V1JobBuilder
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimVolumeSource
import io.kubernetes.client.openapi.models.V1Pod
import io.kubernetes.client.openapi.models.V1PodBuilder
import io.kubernetes.client.openapi.models.V1PodList
import io.kubernetes.client.openapi.models.V1ResourceRequirements
import io.kubernetes.client.openapi.models.V1Volume
import io.kubernetes.client.openapi.models.V1VolumeMount
Expand Down Expand Up @@ -602,7 +603,7 @@ class K8sServiceImpl implements K8sService {
resources.putRequestsItem('memory', new Quantity(requestsMemory))

def spec = builder.withNewSpec()
.withBackoffLimit(0)
.withBackoffLimit(blobConfig.BackoffLimit)
.withNewTemplate()
.editOrNewSpec()
.withServiceAccount(serviceAccount)
Expand All @@ -629,4 +630,28 @@ class K8sServiceImpl implements K8sService {
return result
}

/**
* Wait for a job to start a pod
*
* @param k8s job
* @param timeout
* Max wait time in milliseconds
* @return list of pods created by this job
*/
V1PodList waitJob(V1Job job, Long timeout) {
final startTime = System.currentTimeMillis()
// wait for termination
while (System.currentTimeMillis() - startTime < timeout) {
final name = job.metadata.name
final active = job.status?.getActive()
if (active && active > 0) {
return k8sClient.
coreV1Api().
listNamespacedPod(namespace, null, null, null, null, "job-name=$name", null, null, null, null, null, null)
}
sleep 5_000
job = getJob(name)
}
return null
}
}

0 comments on commit 8d357bc

Please sign in to comment.