Skip to content

Commit

Permalink
Add support for Wave container freeze
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jul 18, 2023
1 parent 6307f9b commit 9a5903e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,9 @@ class SubmitContainerTokenRequest {
*/
String fingerprint

/**
* Enable freeze container mode
*/
boolean freeze

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ class SubmitContainerTokenResponse {
* The fully qualified wave container name to be used
*/
String targetImage

/**
* The source container image that originated this request
*/
String containerImage

/**
* The ID of the build associated with this request or null of the image already exists
*/
String buildId

}
99 changes: 36 additions & 63 deletions plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.file.Path
import java.time.Instant
import java.time.Duration
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.concurrent.Callable
Expand All @@ -35,7 +35,6 @@ import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.util.concurrent.UncheckedExecutionException
import com.google.gson.Gson
import com.google.gson.GsonBuilder
import com.google.gson.reflect.TypeToken
import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
Expand All @@ -45,7 +44,6 @@ import dev.failsafe.function.CheckedSupplier
import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import io.seqera.wave.plugin.adapter.InstantAdapter
import io.seqera.wave.plugin.config.TowerConfig
import io.seqera.wave.plugin.config.WaveConfig
import io.seqera.wave.plugin.exception.BadResponseException
Expand Down Expand Up @@ -167,7 +165,8 @@ class WaveClient {
buildRepository: config().buildRepository(),
cacheRepository: config.cacheRepository(),
timestamp: OffsetDateTime.now().toString(),
fingerprint: assets.fingerprint()
fingerprint: assets.fingerprint(),
freeze: config.freezeMode()
)
}

Expand All @@ -189,7 +188,9 @@ class WaveClient {
towerAccessToken: tower.accessToken,
towerWorkspaceId: tower.workspaceId,
towerEndpoint: tower.endpoint,
workflowId: tower.workflowId)
workflowId: tower.workflowId,
freeze: config.freezeMode()
)
return sendRequest(request)
}

Expand Down Expand Up @@ -456,6 +457,13 @@ class WaveClient {
final key = assets.fingerprint()
// get from cache or submit a new request
final response = cache.get(key, { sendRequest(assets) } as Callable )
if( config.freezeMode() ) {
if( response.buildId ) {
// await the image to be available when a new image is being built
awaitImage(response.targetImage)
}
return new ContainerInfo(assets.containerImage, response.containerImage, key)
}
// assemble the container info response
return new ContainerInfo(assets.containerImage, response.targetImage, key)
}
Expand All @@ -464,7 +472,30 @@ class WaveClient {
}
}

protected URI imageToManifestUri(String image) {
final p = image.indexOf('/')
if( p==-1 ) throw new IllegalArgumentException("Invalid container name: $image")
final result = 'https://' + image.substring(0,p) + '/v2' + image.substring(p).replace(':','/manifests/')
return new URI(result)
}

protected void awaitImage(String image) {
final manifest = imageToManifestUri(image)
final req = HttpRequest.newBuilder()
.uri(manifest)
.headers('Content-Type','application/json')
.timeout(Duration.ofSeconds(15 * 60 + 10))
.GET()
.build()
final begin = System.currentTimeMillis()
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
final code = resp.statusCode()
if( code>=200 && code<400 ) {
log.debug "Wave container available in ${nextflow.util.Duration.of(System.currentTimeMillis()-begin)}: [$code] ${resp.body()}"
}
else
throw new BadResponseException("Unexpected response for \'$manifest\': [${resp.statusCode()}] ${resp.body()}")
}

static protected boolean isCondaLocalFile(String value) {
if( value.contains('\n') )
Expand Down Expand Up @@ -526,64 +557,6 @@ class WaveClient {
return null
}

String resolveSourceContainer(String container) {
final token = getWaveToken(container)
if( !token )
return container
final resp = fetchContainerInfo(token)
final describe = jsonToDescribeContainerResponse(resp)
return describe.source.digest==describe.wave.digest
? digestImage(describe.source) // when the digest are equals, return the source because it's a stable name
: digestImage(describe.wave) // otherwise returns the wave container name
}

protected String digestImage(DescribeContainerResponse.ContainerInfo info) {
if( !info.digest )
return info.image
final p = info.image.lastIndexOf(':')
return p!=-1
? info.image.substring(0,p) + '@' + info.digest
: info.image + '@' + info.digest
}

protected String getWaveToken(String name) {
if( !name )
return null
final matcher = CONTAINER_PATH.matcher(name)
if( !matcher.find() )
return null
return matcher.group(1)==waveRegistry
? matcher.group(2)
: null
}

@Memoized
protected String fetchContainerInfo(String token) {
final uri = new URI("$endpoint/container-token/$token")
log.trace "Wave request container info: $uri"
final req = HttpRequest.newBuilder()
.uri(uri)
.headers('Content-Type','application/json')
.GET()
.build()

final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
final code = resp.statusCode()
if( code>=200 && code<400 ) {
log.debug "Wave container config info: [$code] ${resp.body()}"
return resp.body()
}
throw new BadResponseException("Unexpected response for \'$uri\': [${resp.statusCode()}] ${resp.body()}")
}

protected DescribeContainerResponse jsonToDescribeContainerResponse(String json) {
final gson = new GsonBuilder()
.registerTypeAdapter(Instant.class, new InstantAdapter())
.create();
final type = new TypeToken<DescribeContainerResponse>(){}.getType()
return gson.fromJson(json, type)
}

protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
final cfg = config.retryOpts()
final listener = new EventListener<ExecutionAttemptedEvent<T>>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ class WaveObserver implements TraceObserver {

protected void apply(TaskHandler handler) {
final process = handler.task.getProcessor().getName()
containers.computeIfAbsent(process, (String it) -> {
final container = handler.task.getContainer()
return client.resolveSourceContainer(container)
})
containers.computeIfAbsent(process, (String it) -> handler.task.getContainer())
}

void onProcessComplete(TaskHandler handler, TraceRecord trace){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,12 @@ class WaveConfig {
final private ReportOpts reportOpts
final private RetryOpts retryOpts
final private HttpOpts httpClientOpts
final private Boolean freezeMode

WaveConfig(Map opts, Map<String,String> env=System.getenv()) {
this.enabled = opts.enabled
this.endpoint = (opts.endpoint?.toString() ?: env.get('WAVE_API_ENDPOINT') ?: DEF_ENDPOINT)?.stripEnd('/')
this.freezeMode = opts.freeze as Boolean
this.containerConfigUrl = parseConfig(opts, env)
this.tokensCacheMaxDuration = opts.navigate('tokens.cache.maxDuration', '30m') as Duration
this.condaOpts = opts.navigate('build.conda', Collections.emptyMap()) as CondaOpts
Expand Down Expand Up @@ -80,6 +82,8 @@ class WaveConfig {

List<String> strategy() { this.strategy }

boolean freezeMode() { return this.freezeMode }

boolean bundleProjectResources() { bundleProjectResources }

String buildRepository() { buildRepository }
Expand Down
169 changes: 23 additions & 146 deletions plugins/nf-wave/src/test/io/seqera/wave/plugin/WaveClientTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@ class WaveClientTest extends Specification {
!req.condaFile
!req.spackFile
!req.containerConfig.layers
!req.freeze
and:
req.fingerprint == 'bd2cb4b32df41f2d290ce2366609f2ad'
req.timestamp instanceof String
}

def 'should create request object with freeze mode' () {
given:
def session = Mock(Session) { getConfig() >> [wave:[freeze:true]]}
def IMAGE = 'foo:latest'
def wave = new WaveClient(session)

when:
def req = wave.makeRequest(WaveAssets.fromImage(IMAGE))
then:
req.containerImage == IMAGE
!req.containerPlatform
!req.containerFile
!req.condaFile
!req.spackFile
!req.containerConfig.layers
and:
req.freeze
and:
req.fingerprint == 'bd2cb4b32df41f2d290ce2366609f2ad'
req.timestamp instanceof String
Expand Down Expand Up @@ -850,150 +873,4 @@ class WaveClientTest extends Specification {
'http://foo.com' | false
}

@Unroll
def 'should find wave token' () {
given:
def sess = Mock(Session) {getConfig() >> [wave:[endpoint: 'http://foo.com']] }
and:
def wave = Spy(new WaveClient(sess))

expect:
wave.getWaveToken(CONTAINER) == EXPECTED

where:
EXPECTED | CONTAINER
null | null
null | 'ubunutu:latest'
null | 'xyz.com/wt/3aec54700cff/wave-build-repo:rnaseq-nf_v1.0'
and:
'3aec54700cff' | 'foo.com/wt/3aec54700cff/wave-build-repo:rnaseq-nf_v1.0'
}

def 'should convert json to describe container response'() {
given:
def JSON = '''
{
"token": "3aec54700cff",
"expiration": "2023-03-02T18:07:50.488226285Z",
"request": {
"user": {
"id": 8083,
"userName": "pditommaso",
"email": "[email protected]"
},
"workspaceId": 88265370860066,
"containerImage": "1234567890.dkr.ecr.us-west-2.amazonaws.com/wave-build-repo:rnaseq-nf_v1.0",
"containerConfig": {
"entrypoint": [
"/opt/fusion/entrypoint.sh"
],
"layers": [
{
"location": "data:DATA+OMITTED",
"gzipDigest": "sha256:dc8dd4ebf839869abb81d35fe3f265de9f3ac7b9b285e274c6b92072b02a84ec",
"gzipSize": 202,
"tarDigest": "sha256:dc4d652cd223da5bca40d08890686c4198769fb7bfc09de2ed3c3c77dead4bf9"
},
{
"location": "https://fusionfs.seqera.io/releases/pkg/0/6/4/fusionfs-amd64.tar.gz",
"gzipDigest": "sha256:c55640ae3284715e5c7a1c1f6c6ec2de77a881a08f5a9c46f077ecd0379e8477",
"gzipSize": 6191418,
"tarDigest": "sha256:e24642d65d5b21987666cf1ce4ba007ecadedbcefae9601669ab43a566682aa6"
}
]
},
"towerEndpoint": "https://api.tower.nf",
"fingerprint": "779855a0ffc582ef3170f7dab8829465",
"timestamp": "2023-03-01T20:07:49.933811174Z",
"zoneId": "Z",
"ipAddress": "54.190.237.226"
},
"build": {
"buildRepository": "1234567890.dkr.ecr.us-west-2.amazonaws.com/wave-build-repo",
"cacheRepository": "1234567890.dkr.ecr.us-west-2.amazonaws.com/wave-cache-repo"
},
"source": {
"image": "1234567890.dkr.ecr.us-west-2.amazonaws.com/wave-build-repo:rnaseq-nf_v1.0",
"digest": "sha256:d6f56ed0eae171fabd324bf582dd5c49c6462662c80a7e69632c57043b6af143"
},
"wave": {
"image": "wave.seqera.io/wt/3aec54700cff/wave-build-repo:rnaseq-nf_v1.0",
"digest": "sha256:d8f4f9aa77b4d1941b50a050ed71473a0e04720f38a12d497557c39a25398830"
}
}
'''
and:
def sess = Mock(Session) {getConfig() >> [:] }
def wave = Spy(new WaveClient(sess))

when:
def resp = wave.jsonToDescribeContainerResponse(JSON)
then:
resp.token == '3aec54700cff'
and:
resp.wave.image == 'wave.seqera.io/wt/3aec54700cff/wave-build-repo:rnaseq-nf_v1.0'
resp.wave.digest == 'sha256:d8f4f9aa77b4d1941b50a050ed71473a0e04720f38a12d497557c39a25398830'
and:
resp.request.user.id == 8083
resp.request.user.userName == 'pditommaso'

}

def 'should resolve wave container' () {
given:
def RESP1 = '''
{
"token": "3aec54700cff",
"source": {
"image": "docker.io/library/ubuntu:latest",
"digest": "sha256:d6f56ed0eae171fabd324bf582dd5c49c6462662c80a7e69632c57043b6af143"
},
"wave": {
"image": "wave.seqera.io/wt/3aec54700cff/library/ubuntu:latest",
"digest": "sha256:d8f4f9aa77b4d1941b50a050ed71473a0e04720f38a12d497557c39a25398830"
}
}
'''
and:
def sess = Mock(Session) {getConfig() >> [:] }
def wave = Spy(new WaveClient(sess))

when:
def result = wave.resolveSourceContainer('ubuntu')
then:
0 * wave.fetchContainerInfo(_) >> null
result == 'ubuntu'

when:
result = wave.resolveSourceContainer('wave.seqera.io/wt/3aec54700cff/library/ubuntu:latest')
then:
1 * wave.fetchContainerInfo('3aec54700cff') >> RESP1
result == 'wave.seqera.io/wt/3aec54700cff/library/ubuntu@sha256:d8f4f9aa77b4d1941b50a050ed71473a0e04720f38a12d497557c39a25398830'
}

def 'should return source container' () {
given:
def RESP = '''
{
"token": "3aec54700cff",
"source": {
"image": "docker.io/library/ubuntu:latest",
"digest": "sha256:d6f56ed0eae171fabd324bf582dd5c49c6462662c80a7e69632c57043b6af143"
},
"wave": {
"image": "wave.seqera.io/wt/3aec54700cff/library/ubuntu:latest",
"digest": "sha256:d6f56ed0eae171fabd324bf582dd5c49c6462662c80a7e69632c57043b6af143"
}
}
'''
and:
def sess = Mock(Session) {getConfig() >> [:] }
def wave = Spy(new WaveClient(sess))

when:
def result = wave.resolveSourceContainer('wave.seqera.io/wt/3aec54700cff/library/ubuntu:latest')
then:
1 * wave.fetchContainerInfo('3aec54700cff') >> RESP
result == 'docker.io/library/ubuntu@sha256:d6f56ed0eae171fabd324bf582dd5c49c6462662c80a7e69632c57043b6af143'
}
}

0 comments on commit 9a5903e

Please sign in to comment.