Skip to content

Commit

Permalink
graph-builder: call the futures in two steps to prevent a race
Browse files Browse the repository at this point in the history
  • Loading branch information
steveej committed Nov 6, 2018
1 parent 9232924 commit 0121a29
Showing 1 changed file with 39 additions and 32 deletions.
71 changes: 39 additions & 32 deletions graph-builder/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub fn fetch_releases(
))
.map_err(|e| format_err!("{}", e))?;

let releases = authenticated_client
let layer_digests_tag = authenticated_client
.get_tags(&repo, None)
.and_then(|tag| {
trace!("processing: {}:{}", &repo, &tag);
Expand Down Expand Up @@ -163,43 +163,50 @@ pub fn fetch_releases(
layer_digests_future
})
.map_err(|e| format_err!("{}", e))
.and_then(|layer_digests_tag| {
let (layer_digests, tag) = layer_digests_tag?;
trace!("tag: {:?} layer_digests: {:?}", &tag, &layer_digests);
.and_then(|f| f)
.collect();

let layer_digests_mapped_to_releases = layer_digests.iter().map(|layer_digest| {
let (registry_host, repo, tag) = (registry_host.clone(), repo.clone(), tag.clone());
let layer_digests_tag = tcore.run(layer_digests_tag)?;

let releases = layer_digests_tag
.into_iter()
.filter_map(|(layer_digests, tag)| {
let mut found = false;
let mut map = layer_digests.into_iter().filter_map(|layer_digest| {
trace!("Downloading layer {}...", &layer_digest);
authenticated_client
.get_blob(&repo, &layer_digest)
.map_err(|e| format_err!("{}", e))
.and_then(move |blob| {
trace!("Layer has {} bytes.", blob.len());
let metadata = extract_metadata_from_layer_blob(&blob, "cincinnati.json")?;
let release = Release {
source: format!("{}/{}:{}", &registry_host, &repo, &tag),
metadata: metadata,
};
trace!("Found release '{:?}'", release);
Ok(release)
})
});
if found {
return None;
}

Ok(layer_digests_mapped_to_releases.collect::<Vec<_>>())
})
.and_then(|futures| {
// Select the first Ok, resembling the first Release found
futures::future::select_ok(futures)
})
.map(|(release, _)| {
// Drop the remainder futures after the first Ok
release
match tcore.run(
authenticated_client
.get_blob(&repo, &layer_digest)
.map_err(|e| format_err!("{}", e))
.and_then(|blob| {
trace!("Layer has {} bytes.", blob.len());
let metadata =
extract_metadata_from_layer_blob(&blob, "cincinnati.json")?;
let release = Release {
source: format!("{}/{}:{}", &registry_host, &repo, &tag),
metadata: metadata,
};
trace!("Found release '{:?}'", release);
found = true;
Ok(release)
}),
) {
Ok(release) => Some(release),
Err(e) => {
debug!("No release in layer with digest {}: {}", &layer_digest, e);
None
}
}
});
map.nth(0)
})
.collect()
.map_err(|e| format_err!("{}", e));
.collect::<Vec<Release>>();

Ok(tcore.run(releases)?)
Ok(releases)
}

#[derive(Debug, Deserialize)]
Expand Down

0 comments on commit 0121a29

Please sign in to comment.