diff --git a/graph-builder/src/config.rs b/graph-builder/src/config.rs index fa12f5daf..438b25d46 100644 --- a/graph-builder/src/config.rs +++ b/graph-builder/src/config.rs @@ -14,6 +14,7 @@ use std::net::IpAddr; use std::num::ParseIntError; +use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; @@ -46,6 +47,10 @@ pub struct Options { /// Port to which the server will bind #[structopt(long = "port", default_value = "8080")] pub port: u16, + + /// Credentials file for authentication against the image registry + #[structopt(long = "credentials-file", parse(from_os_str))] + pub credentials_path: Option, } fn parse_duration(src: &str) -> Result { diff --git a/graph-builder/src/graph.rs b/graph-builder/src/graph.rs index eacac512a..f9da140cf 100644 --- a/graph-builder/src/graph.rs +++ b/graph-builder/src/graph.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +extern crate dkregistry; + use actix_web::http::header::{self, HeaderValue}; use actix_web::{HttpMessage, HttpRequest, HttpResponse}; use cincinnati::{AbstractRelease, Graph, Release, CONTENT_TYPE}; @@ -51,9 +53,18 @@ impl State { } pub fn run(opts: &config::Options, state: &State) -> ! { + // Read the credentials outside the loop to avoid re-reading the file + let (username, password) = + registry::read_credentials(opts.credentials_path.as_ref(), &opts.registry) + .expect("could not read credentials"); + loop { debug!("Updating graph..."); - match create_graph(&opts) { + match create_graph( + &opts, + username.as_ref().map(String::as_ref), + password.as_ref().map(String::as_ref), + ) { Ok(graph) => match serde_json::to_string(&graph) { Ok(json) => *state.json.write().expect("json lock has been poisoned") = json, Err(err) => error!("Failed to serialize graph: {}", err), @@ -64,10 +75,14 @@ pub fn run(opts: &config::Options, state: &State) -> ! { } } -fn create_graph(opts: &config::Options) -> Result { +fn create_graph( + opts: &config::Options, + username: Option<&str>, + password: Option<&str>, +) -> Result { let mut graph = Graph::default(); - registry::fetch_releases(&opts.registry, &opts.repository) + registry::fetch_releases(&opts.registry, &opts.repository, username, password) .context("failed to fetch all release metadata")? .into_iter() .try_for_each(|release| { diff --git a/graph-builder/src/main.rs b/graph-builder/src/main.rs index 39600c4c4..390455b20 100644 --- a/graph-builder/src/main.rs +++ b/graph-builder/src/main.rs @@ -14,6 +14,7 @@ extern crate actix_web; extern crate cincinnati; +extern crate dkregistry; extern crate env_logger; extern crate itertools; #[macro_use] diff --git a/graph-builder/src/registry.rs b/graph-builder/src/registry.rs index 988cb4e91..6be65738b 100644 --- a/graph-builder/src/registry.rs +++ b/graph-builder/src/registry.rs @@ -12,16 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +extern crate dkregistry; +extern crate failure; +extern crate futures; +extern crate tokio_core; + use cincinnati; -use failure::{Error, ResultExt}; +use failure::Error; use flate2::read::GzDecoder; +use registry::futures::future::Either; +use registry::futures::prelude::*; +use registry::tokio_core::reactor::Core; use release; -use reqwest::{self, Url}; use serde_json; -use std::io::Read; -use std::path::Path; +use std::{self, fs::File, io::Read, path::Path, path::PathBuf}; use tar::Archive; +#[derive(Debug, Clone)] pub struct Release { pub source: String, pub metadata: release::Metadata, @@ -37,24 +44,217 @@ impl Into for Release { } } +fn trim_protocol(src: &str) -> &str { + src.trim_left_matches("https://") + .trim_left_matches("http://") +} + +pub fn read_credentials( + credentials_path: Option<&PathBuf>, + registry: &str, +) -> Result<(Option, Option), Error> { + credentials_path.clone().map_or(Ok((None, None)), |path| { + Ok( + dkregistry::get_credentials(File::open(&path)?, trim_protocol(registry)) + .map_err(|e| format_err!("{}", e))?, + ) + }) +} + +pub fn authenticate_client( + client: &mut dkregistry::v2::Client, + login_scope: std::string::String, +) -> impl futures::future::Future +{ + client + .is_v2_supported() + .and_then(move |v2_supported| { + if !v2_supported { + Err("API v2 not supported".into()) + } else { + Ok(client) + } + }) + .and_then(move |dclient| { + dclient.login(&[&login_scope]).and_then(|token| { + dclient + .is_auth(Some(token.token())) + .and_then(move |is_auth| { + if !is_auth { + Err("login failed".into()) + } else { + Ok(dclient.set_token(Some(token.token()))) + } + }) + }) + }) +} + /// Fetches a vector of all release metadata from the given repository, hosted on the given /// registry. -pub fn fetch_releases(registry: &str, repo: &str) -> Result, Error> { - let mut metadata = Vec::new(); - for tag in fetch_tags(registry, repo)? { - metadata.push(Release { - source: format!( - "{}/{}:{}", - registry - .trim_left_matches("https://") - .trim_left_matches("http://"), - repo, - tag - ), - metadata: fetch_metadata(registry, repo, &tag)?, +pub fn fetch_releases( + registry: &str, + repo: &str, + username: Option<&str>, + password: Option<&str>, +) -> Result, Error> { + let mut tcore = Core::new()?; + + let registry_host = trim_protocol(®istry); + + let mut client = dkregistry::v2::Client::configure(&tcore.handle()) + .registry(registry_host) + .insecure_registry(false) + .username(username.map(|s| s.to_string())) + .password(password.map(|s| s.to_string())) + .build() + .map_err(|e| format_err!("{}", e))?; + + let authenticated_client = tcore + .run(authenticate_client( + &mut client, + format!("repository:{}:pull", &repo), + )) + .map_err(|e| format_err!("{}", e))?; + + let tags = tcore + .run( + authenticated_client + // According to https://docs.docker.com/registry/spec/api/#listing-image-tags + // the tags should be ordered lexically but they aren't + .get_tags(&repo, Some(20)) + .map_err(|e| format_err!("{}", e)) + .collect(), + ) + .map(|mut tags| { + if tags.is_empty() { + warn!("{}/{} has no tags", registry_host, repo) + }; + tags.sort(); + tags + })?; + + let releases = futures::stream::iter_ok(tags) + .and_then(|tag| { + trace!("processing: {}:{}", &repo, &tag); + authenticated_client + .has_manifest(&repo, &tag, None) + .join(authenticated_client.get_manifest(&repo, &tag)) + .map_err(|e| format_err!("{}", e)) + .and_then(|(manifest_kind, manifest)| { + Ok((tag, get_layer_digests(&manifest_kind, &manifest)?)) + }) + }) + .and_then(|(tag, layer_digests)| { + find_first_release( + layer_digests, + authenticated_client.clone(), + registry_host.into(), + repo.into(), + tag, + ) }) + .collect(); + + tcore.run(releases) +} + +fn get_layer_digests( + manifest_kind: &Option, + manifest: &[u8], +) -> Result, failure::Error> { + use registry::dkregistry::mediatypes::MediaTypes::{ManifestV2S1Signed, ManifestV2S2}; + use registry::dkregistry::v2::manifest::{ManifestSchema1Signed, ManifestSchema2}; + + match manifest_kind { + Some(ManifestV2S1Signed) => serde_json::from_slice::(manifest) + .and_then(|m| { + let mut l = m.get_layers(); + l.reverse(); + Ok(l) + }), + Some(ManifestV2S2) => serde_json::from_slice::(manifest).and_then(|m| { + let mut l = m.get_layers(); + l.reverse(); + Ok(l) + }), + _ => bail!("unknown manifest_kind '{:?}'", manifest_kind), } - Ok(metadata) + .map_err(Into::into) +} + +fn find_first_release( + layer_digests: Vec, + authenticated_client: dkregistry::v2::Client, + registry_host: String, + repo: String, + tag: String, +) -> impl futures::future::Future { + futures::future::loop_fn( + layer_digests.into_iter().peekable(), + move |mut layer_digests_iter| { + let layer_digest = { + if let Some(layer_digest) = layer_digests_iter.next() { + layer_digest + } else { + let no_release_found = futures::future::ok(continue_or_break_loop( + layer_digests_iter, + None, + Some(format_err!( + "no release found for tag '{}' and no more layers to examine", + tag + )), + )); + return Either::A(no_release_found); + } + }; + + // FIXME: it would be nice to avoid this + let (registry_host, repo, tag, layer_digest) = ( + registry_host.clone(), + repo.clone(), + tag.clone(), + layer_digest.clone(), + ); + + trace!("Downloading layer {}...", &layer_digest); + let examine_blobs = authenticated_client + .get_blob(&repo, &layer_digest) + .map_err(|e| format_err!("could not download blob: {}", e)) + .and_then(move |blob| { + let metadata_filename = "cincinnati.json"; + trace!( + "{}: Looking for {} in archive {} with {} bytes", + tag, + metadata_filename, + layer_digest, + blob.len(), + ); + + match assemble_metadata(&blob, metadata_filename) { + Ok(metadata) => futures::future::ok(continue_or_break_loop( + layer_digests_iter, + Some(Release { + source: format!("{}/{}:{}", ®istry_host, &repo, &tag), + metadata, + }), + None, + )), + Err(e) => futures::future::ok(continue_or_break_loop( + layer_digests_iter, + None, + Some(format_err!( + "could not assemble metadata from blob '{:?}': {}", + String::from_utf8_lossy(&blob), + e, + )), + )), + } + }); + Either::B(examine_blobs) + }, + ) + .flatten() } #[derive(Debug, Deserialize)] @@ -63,23 +263,6 @@ struct Tags { tags: Vec, } -fn fetch_tags(registry: &str, repo: &str) -> Result, Error> { - let base = Url::parse(registry)?; - let tags: Tags = { - let mut response = reqwest::get(base.join(&format!("v2/{}/tags/list", repo))?) - .context("failed to fetch image tags")?; - ensure!( - response.status().is_success(), - "failed to fetch image tags: {}", - response.status() - ); - - serde_json::from_str(&response.text()?)? - }; - - Ok(tags.tags) -} - #[derive(Debug, Deserialize)] struct Manifest { #[serde(rename = "schemaVersion")] @@ -97,49 +280,8 @@ struct Layer { blob_sum: String, } -fn fetch_metadata(registry: &str, repo: &str, tag: &str) -> Result { - trace!("fetching metadata from {}/{}:{}", registry, repo, tag); - - let base = Url::parse(registry)?; - let manifest: Manifest = { - let mut response = reqwest::get(base.join(&format!("v2/{}/manifests/{}", repo, tag))?) - .context("failed to fetch image manifest")?; - ensure!( - response.status().is_success(), - "failed to fetch image manifest: {}", - response.status() - ); - - serde_json::from_str(&response.text()?).context("failed to parse image manifest")? - }; - - for layer in manifest.fs_layers { - match fetch_metadata_from_layer(&base, repo, &layer) { - Ok(metadata) => return Ok(metadata), - Err(err) => debug!("metadata document not found in layer: {}", err), - } - } - - bail!("metadata document not found in image") -} - -fn fetch_metadata_from_layer( - base: &Url, - repo: &str, - layer: &Layer, -) -> Result { - trace!("fetching metadata from {}", layer.blob_sum); - - let response = reqwest::get(base.join(&format!("v2/{}/blobs/{}", repo, layer.blob_sum))?) - .context("failed to fetch image blob")?; - - ensure!( - response.status().is_success(), - "failed to fetch metadata document: {}", - response.status() - ); - - let mut archive = Archive::new(GzDecoder::new(response)); +fn assemble_metadata(blob: &[u8], metadata_filename: &str) -> Result { + let mut archive = Archive::new(GzDecoder::new(blob)); match archive .entries()? .filter_map(|entry| match entry { @@ -150,7 +292,7 @@ fn fetch_metadata_from_layer( } }) .find(|file| match file.header().path() { - Ok(path) => path == Path::new("cincinnati.json"), + Ok(path) => path == Path::new(metadata_filename), Err(err) => { debug!("failed to read file header: {}", err); false @@ -159,9 +301,42 @@ fn fetch_metadata_from_layer( Some(mut file) => { let mut contents = String::new(); file.read_to_string(&mut contents)?; - serde_json::from_str(&contents).context("failed to parse cincinnati.json") + match serde_json::from_str::(&contents) { + Ok(m) => Ok::(m), + Err(e) => bail!(format!("couldn't parse '{}': {}", metadata_filename, e)), + } } - None => bail!("cincinnati.json not found"), + None => bail!(format!("'{}' not found", metadata_filename)), } .map_err(Into::into) } + +fn continue_or_break_loop( + mut layer_digests_iter: std::iter::Peekable, + r: Option, + e: Option, +) -> futures::future::Loop, std::iter::Peekable> +where + I: std::iter::Iterator, +{ + match (r, e) { + (Some(r), _) => { + trace!("Found release '{:?}'", r); + futures::future::Loop::Break(Ok(r)) + } + (_, Some(e)) => { + warn!("{}", e); + match layer_digests_iter.peek() { + Some(_) => futures::future::Loop::Continue(layer_digests_iter), + None => futures::future::Loop::Break(Err(e)), + } + } + _ => continue_or_break_loop( + layer_digests_iter, + None, + Some(format_err!( + "continue_or_break called with unexpected condition" + )), + ), + } +} diff --git a/graph-builder/src/release.rs b/graph-builder/src/release.rs index 618b76b32..1e2bbb795 100644 --- a/graph-builder/src/release.rs +++ b/graph-builder/src/release.rs @@ -17,7 +17,7 @@ use semver::Version; use std::collections::HashMap; use std::fmt; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct Metadata { kind: MetadataKind, pub version: Version, @@ -43,7 +43,7 @@ impl fmt::Display for Metadata { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub enum MetadataKind { #[serde(rename = "cincinnati-metadata-v0")] V0,