Skip to content

Commit

Permalink
graph-builder/registry: switch to using dkregistry
Browse files Browse the repository at this point in the history
Switch to using dkregistry for all registry requests which supports
asynchronous authenticated calls towards image registries.
This commit also adds the parameter `--credentials-file` which takes a
path to a JSON file as produced by `docker login`[1].

[1]: https://docs.docker.com/engine/reference/commandline/login/
  • Loading branch information
steveej committed Nov 6, 2018
1 parent 0761da9 commit 9232924
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 91 deletions.
13 changes: 12 additions & 1 deletion graph-builder/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate dkregistry;

use std::net::IpAddr;
use std::num::ParseIntError;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;

Expand All @@ -32,7 +35,11 @@ pub struct Options {
pub repository: String,

/// Duration of the pause (in seconds) between scans of the registry
#[structopt(long = "period", default_value = "30", parse(try_from_str = "parse_duration"))]
#[structopt(
long = "period",
default_value = "30",
parse(try_from_str = "parse_duration")
)]
pub period: Duration,

/// Address on which the server will listen
Expand All @@ -42,6 +49,10 @@ pub struct Options {
/// Port to which the server will bind
#[structopt(long = "port", default_value = "8080")]
pub port: u16,

/// Credentials file for authentication against the image registry
#[structopt(long = "credentials-file", parse(from_os_str))]
pub credentials_path: Option<PathBuf>,
}

fn parse_duration(src: &str) -> Result<Duration, ParseIntError> {
Expand Down
4 changes: 2 additions & 2 deletions graph-builder/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use actix_web::http::header::{self, HeaderValue};
use actix_web::{HttpMessage, HttpRequest, HttpResponse};
use cincinnati::{AbstractRelease, CONTENT_TYPE, Graph, Release};
use cincinnati::{AbstractRelease, Graph, Release, CONTENT_TYPE};
use config;
use failure::{Error, ResultExt};
use registry;
Expand Down Expand Up @@ -67,7 +67,7 @@ pub fn run(opts: &config::Options, state: &State) -> ! {
fn create_graph(opts: &config::Options) -> Result<Graph, Error> {
let mut graph = Graph::default();

registry::fetch_releases(&opts.registry, &opts.repository)
registry::fetch_releases(&opts.registry, &opts.repository, &opts.credentials_path)
.context("failed to fetch all release metadata")?
.into_iter()
.try_for_each(|release| {
Expand Down
5 changes: 3 additions & 2 deletions graph-builder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ fn main() -> Result<(), Error> {
App::with_state(state.clone())
.middleware(Logger::default())
.route("/v1/graph", Method::GET, graph::index)
}).bind(addr)?
.run();
})
.bind(addr)?
.run();
Ok(())
}
256 changes: 175 additions & 81 deletions graph-builder/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate dkregistry;
extern crate failure;
extern crate futures;
extern crate tokio_core;

use cincinnati;
use failure::{Error, ResultExt};
use failure::ResultExt;
use flate2::read::GzDecoder;
use release;
use reqwest::{self, Url};
use serde_json;
use std::io::Read;
use std::path::Path;
use std;
use std::{fs::File, io::Read, path::Path, path::PathBuf};
use tar::Archive;

use failure::Error;
use registry::futures::prelude::*;
use registry::tokio_core::reactor::Core;

#[derive(Debug, Clone)]
pub struct Release {
pub source: String,
pub metadata: release::Metadata,
Expand All @@ -37,24 +46,160 @@ impl Into<cincinnati::Release> for Release {
}
}

fn trim_protocol(src: &str) -> &str {
src.trim_left_matches("https://")
.trim_left_matches("http://")
}

pub fn authenticate_client<'a>(
client: &'a mut dkregistry::v2::Client,
login_scope: std::string::String,
) -> impl futures::future::Future<Item = &'a dkregistry::v2::Client, Error = dkregistry::errors::Error>
{
client
.is_v2_supported()
.and_then(move |v2_supported| {
if !v2_supported {
Err("API v2 not supported".into())
} else {
Ok(client)
}
})
.and_then(|dclient| {
dclient.is_auth(None).and_then(move |is_auth| {
if is_auth {
Err("no login performed, but already authenticated".into())
} else {
Ok(dclient)
}
})
})
.and_then(move |dclient| {
dclient.login(&[&login_scope]).and_then(move |token| {
dclient
.is_auth(Some(token.token()))
.and_then(move |is_auth| {
if !is_auth {
Err("login failed".into())
} else {
println!("logged in!");
Ok(dclient.set_token(Some(token.token())))
}
})
})
})
}

/// Fetches a vector of all release metadata from the given repository, hosted on the given
/// registry.
pub fn fetch_releases(registry: &str, repo: &str) -> Result<Vec<Release>, Error> {
let mut metadata = Vec::new();
for tag in fetch_tags(registry, repo)? {
metadata.push(Release {
source: format!(
"{}/{}:{}",
registry
.trim_left_matches("https://")
.trim_left_matches("http://"),
repo,
tag
),
metadata: fetch_metadata(registry, repo, &tag)?,
pub fn fetch_releases(
registry: &str,
repo: &str,
credentials_path: &Option<PathBuf>,
) -> Result<Vec<Release>, Error> {
let mut tcore = Core::new()?;

let registry_host = trim_protocol(registry);

let mut client = dkregistry::v2::Client::configure(&tcore.handle())
.registry(registry_host)
.insecure_registry(false)
.read_credentials(|| -> Result<Box<dyn std::io::Read>, Error> {
match credentials_path {
Some(path) => match File::open(path) {
Ok(file) => Ok(Box::new(file)),
Err(e) => Err(e.into()),
},
None => Ok(Box::new(std::io::empty())),
}
}()?)
.build()
.map_err(|e| format_err!("{}", e))?;

let authenticated_client = tcore
.run(authenticate_client(
&mut client,
format!("repository:{}:pull", &repo),
))
.map_err(|e| format_err!("{}", e))?;

let releases = authenticated_client
.get_tags(&repo, None)
.and_then(|tag| {
trace!("processing: {}:{}", &repo, &tag);

let tag_clone0 = tag.clone(); // TODO(steveeJ): is there a way to avoid this?
let manifest_kind_future = authenticated_client
.has_manifest(&repo, &tag, None)
.and_then(move |manifest_kind| match manifest_kind {
Some(manifest_kind) => Ok(manifest_kind),
None => {
Err(format!("{}:{} doesn't have a manifest", &repo, &tag_clone0).into())
}
})
.inspect(|manifest_kind| {
trace!("manifest_kind: {:?}", manifest_kind);
});

let manifest_future = authenticated_client.get_manifest(&repo, &tag);

let layer_digests_future =
manifest_kind_future
.join(manifest_future)
.map(|(manifest_kind, manifest)| match manifest_kind {
dkregistry::mediatypes::MediaTypes::ManifestV2S1Signed => {
let m: dkregistry::v2::manifest::ManifestSchema1Signed =
serde_json::from_slice(manifest.as_slice())?;
Ok((m.get_layers(), tag))
}
dkregistry::mediatypes::MediaTypes::ManifestV2S2 => {
let m: dkregistry::v2::manifest::ManifestSchema2 =
serde_json::from_slice(manifest.as_slice())?;
Ok((m.get_layers(), tag))
}
_ => Err(format_err!("unknown manifest_kind '{:?}'", manifest_kind)),
});

layer_digests_future
})
}
Ok(metadata)
.map_err(|e| format_err!("{}", e))
.and_then(|layer_digests_tag| {
let (layer_digests, tag) = layer_digests_tag?;
trace!("tag: {:?} layer_digests: {:?}", &tag, &layer_digests);

let layer_digests_mapped_to_releases = layer_digests.iter().map(|layer_digest| {
let (registry_host, repo, tag) = (registry_host.clone(), repo.clone(), tag.clone());

trace!("Downloading layer {}...", &layer_digest);
authenticated_client
.get_blob(&repo, &layer_digest)
.map_err(|e| format_err!("{}", e))
.and_then(move |blob| {
trace!("Layer has {} bytes.", blob.len());
let metadata = extract_metadata_from_layer_blob(&blob, "cincinnati.json")?;
let release = Release {
source: format!("{}/{}:{}", &registry_host, &repo, &tag),
metadata: metadata,
};
trace!("Found release '{:?}'", release);
Ok(release)
})
});

Ok(layer_digests_mapped_to_releases.collect::<Vec<_>>())
})
.and_then(|futures| {
// Select the first Ok, resembling the first Release found
futures::future::select_ok(futures)
})
.map(|(release, _)| {
// Drop the remainder futures after the first Ok
release
})
.collect()
.map_err(|e| format_err!("{}", e));

Ok(tcore.run(releases)?)
}

#[derive(Debug, Deserialize)]
Expand All @@ -63,23 +208,6 @@ struct Tags {
tags: Vec<String>,
}

fn fetch_tags(registry: &str, repo: &str) -> Result<Vec<String>, Error> {
let base = Url::parse(registry)?;
let tags: Tags = {
let mut response = reqwest::get(base.join(&format!("v2/{}/tags/list", repo))?)
.context("failed to fetch image tags")?;
ensure!(
response.status().is_success(),
"failed to fetch image tags: {}",
response.status()
);

serde_json::from_str(&response.text()?)?
};

Ok(tags.tags)
}

#[derive(Debug, Deserialize)]
struct Manifest {
#[serde(rename = "schemaVersion")]
Expand All @@ -97,49 +225,13 @@ struct Layer {
blob_sum: String,
}

fn fetch_metadata(registry: &str, repo: &str, tag: &str) -> Result<release::Metadata, Error> {
trace!("fetching metadata from {}/{}:{}", registry, repo, tag);

let base = Url::parse(registry)?;
let manifest: Manifest = {
let mut response = reqwest::get(base.join(&format!("v2/{}/manifests/{}", repo, tag))?)
.context("failed to fetch image manifest")?;
ensure!(
response.status().is_success(),
"failed to fetch image manifest: {}",
response.status()
);

serde_json::from_str(&response.text()?).context("failed to parse image manifest")?
};

for layer in manifest.fs_layers {
match fetch_metadata_from_layer(&base, repo, &layer) {
Ok(metadata) => return Ok(metadata),
Err(err) => debug!("metadata document not found in layer: {}", err),
}
}

bail!("metadata document not found in image")
}

fn fetch_metadata_from_layer(
base: &Url,
repo: &str,
layer: &Layer,
fn extract_metadata_from_layer_blob(
blob: &[u8],
metadata_filename: &str,
) -> Result<release::Metadata, Error> {
trace!("fetching metadata from {}", layer.blob_sum);

let response = reqwest::get(base.join(&format!("v2/{}/blobs/{}", repo, layer.blob_sum))?)
.context("failed to fetch image blob")?;
trace!("Looking for {} in archive", metadata_filename);

ensure!(
response.status().is_success(),
"failed to fetch metadata document: {}",
response.status()
);

let mut archive = Archive::new(GzDecoder::new(response));
let mut archive = Archive::new(GzDecoder::new(blob.as_ref()));
match archive
.entries()?
.filter_map(|entry| match entry {
Expand All @@ -150,7 +242,7 @@ fn fetch_metadata_from_layer(
}
})
.find(|file| match file.header().path() {
Ok(path) => path == Path::new("cincinnati.json"),
Ok(path) => path == Path::new(metadata_filename),
Err(err) => {
debug!("failed to read file header: {}", err);
false
Expand All @@ -159,8 +251,10 @@ fn fetch_metadata_from_layer(
Some(mut file) => {
let mut contents = String::new();
file.read_to_string(&mut contents)?;
serde_json::from_str(&contents).context("failed to parse cincinnati.json")
serde_json::from_str(&contents)
.context(format!("failed to parse {}", metadata_filename))
}
None => bail!("cincinnati.json not found"),
}.map_err(Into::into)
None => bail!(format!("'{}' not found", metadata_filename)),
}
.map_err(Into::into)
}
4 changes: 2 additions & 2 deletions graph-builder/src/release.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use semver::Version;
use std::collections::HashMap;
use std::fmt;

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct Metadata {
kind: MetadataKind,
pub version: Version,
Expand All @@ -43,7 +43,7 @@ impl fmt::Display for Metadata {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub enum MetadataKind {
#[serde(rename = "cincinnati-metadata-v0")]
V0,
Expand Down
2 changes: 1 addition & 1 deletion policy-engine/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use actix_web::http::header::{self, HeaderValue};
use actix_web::{HttpMessage, HttpRequest, HttpResponse};
use cincinnati::{CONTENT_TYPE, Graph};
use cincinnati::{Graph, CONTENT_TYPE};
use failure::Error;
use futures::{future, Future, Stream};
use hyper::{Body, Client, Request, Uri};
Expand Down
Loading

0 comments on commit 9232924

Please sign in to comment.