diff --git a/src/main.rs b/src/main.rs index 747c2fa..90f1816 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ use std::{ collections::{BTreeMap, HashSet}, - fs, io, + fs, num::NonZeroU8, path::PathBuf, process::exit, + str::FromStr, sync::{Arc, Mutex}, }; @@ -132,42 +133,85 @@ struct Job { #[derive(Debug, Parser)] struct Cli { + #[clap(flatten)] + options: Options, + #[clap(value_parser = RevisionRef::from_str)] + revisions: Vec, +} + +#[derive(Debug, Parser)] +struct Options { #[clap(long)] out_dir: PathBuf, - #[clap(long)] - revision: String, #[clap(long = "job-type-re")] job_type_name_regex: Option, #[clap(long = "artifact")] artifact_names: Vec, #[clap(long = "max-parallel", default_value = "10")] max_parallel_artifact_downloads: NonZeroU8, - #[clap(long = "project", default_value = "try")] - project_name: String, #[clap(long, default_value = "https://treeherder.mozilla.org")] treeherder_host: Url, #[clap(long, default_value = "https://firefox-ci-tc.services.mozilla.com")] taskcluster_host: Url, } +#[derive(Clone, Debug)] +struct RevisionRef { + project: String, + hash: String, +} + +impl FromStr for RevisionRef { + type Err = &'static str; + + fn from_str(s: &str) -> Result { + s.split_once(':') + .map(|(project, hash)| Self { + project: project.to_owned(), + hash: hash.to_owned(), + }) + .ok_or_else(|| { + "no dividing colon found; expected revision ref. of the form :" + }) + } +} + #[tokio::main] async fn main() { - env_logger::init(); + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .init(); + + let Cli { options, revisions } = Cli::parse(); - let Cli { + let client = Client::new(); + + for rev_ref in revisions { + get_artifacts_for_revision(&client, &options, &rev_ref).await + } +} + +async fn get_artifacts_for_revision(client: &Client, options: &Options, revision: &RevisionRef) { + let Options { out_dir, - revision, job_type_name_regex, artifact_names, max_parallel_artifact_downloads, - project_name, treeherder_host, taskcluster_host, - } = Cli::parse(); + } = options; + let RevisionRef { + project: project_name, + hash: revision, + } = revision; - let client = Client::new(); + log::info!("fetching for revision(s): {:?}", [&revision]); - let revision = client + let Revision { + meta: RevisionMeta { count }, + mut results, + } = client .get(format!( "{treeherder_host}api/project/{project_name}/push/?revision={revision}" )) @@ -178,10 +222,6 @@ async fn main() { .await .unwrap(); - log::info!("fetching for revision(s): {:?}", [&revision]); - - let Revision { meta, mut results } = revision; - let RevisionMeta { count } = meta; assert!(results.len() == usize::try_from(count).unwrap()); if count > 1 { log::warn!("more than one `result` found for specified push"); @@ -263,80 +303,85 @@ async fn main() { progress_bar.tick(); // Force the progress bar to show now, rather than waiting until first // completion of a download. - fs::remove_dir_all(&out_dir) - .or_else(|e| match e.kind() { - io::ErrorKind::NotFound => Ok(()), - e => Err(e), - }) - .unwrap(); - let task_counts = Arc::new(Mutex::new(BTreeMap::new())); - let max_parallel_artifact_downloads = usize::from(max_parallel_artifact_downloads.get()); - progress_bar - .wrap_stream(artifacts) - .for_each_concurrent(max_parallel_artifact_downloads, |(job, artifact_name)| { - let client = &client; - let out_dir = &out_dir; - let task_counts = &task_counts; - let taskcluster_host = &taskcluster_host; - let progress_bar = progress_bar.clone(); - async move { - let Job { - job_group_symbol, - job_type_symbol, - job_type_name, - platform, - task_id, - platform_option, - .. - } = job; - - let job_path = - format!("{platform}/{platform_option}/{job_group_symbol}/{job_type_symbol}"); - - let this_task_idx: u32; - { - let mut task_counts = task_counts.lock().unwrap(); - let task_count = task_counts.entry(job_path.clone()).or_insert(0); - this_task_idx = *task_count; - *task_count += 1; - } + let artifacts = artifacts.then(|(job, artifact_name)| { + let client = &client; + let out_dir = &out_dir; + let task_counts = &task_counts; + let taskcluster_host = &taskcluster_host; + let progress_bar = progress_bar.clone(); + async move { + let Job { + job_group_symbol, + job_type_symbol, + job_type_name, + platform, + task_id, + platform_option, + .. + } = job; + + let job_path = format!( + "{revision}/{platform}/{platform_option}/{job_group_symbol}/{job_type_symbol}" + ); + + let this_task_idx: u32; + { + let mut task_counts = task_counts.lock().unwrap(); + let task_count = task_counts.entry(job_path.clone()).or_insert(0); + this_task_idx = *task_count; + *task_count += 1; + } + + let local_artifact_path = { + let mut path = out_dir.join(job_path); + path.push(&this_task_idx.to_string()); + path.push(artifact_name); + path + }; + + if local_artifact_path.is_file() { + progress_bar.suspend(|| { + log::info!( + "skipping file that already appears to be downloaded: {}", + local_artifact_path.display() + ); + }); + return; + } - let artifact = - match get_artifact(client, taskcluster_host, task_id, artifact_name).await { - Ok(bytes) => bytes, - Err(code) => { - progress_bar.suspend(|| { - log::error!( - "got unexpected response {code} with request for task \ + let artifact = + match get_artifact(client, taskcluster_host, task_id, artifact_name).await { + Ok(bytes) => bytes, + Err(code) => { + progress_bar.suspend(|| { + log::error!( + "got unexpected response {code} with request for task \ {task_id:?} ({job_type_name:?}, index {this_task_idx}), \ artifact {artifact_name:?}; skipping download", - ); - }); - return; - } - }; - - let local_artifact_path = { - let mut path = out_dir.join(job_path); - path.push(&this_task_idx.to_string()); - path.push(artifact_name); - path + ); + }); + return; + } }; - { - let parent_dir = local_artifact_path.parent().unwrap(); - fs::create_dir_all(parent_dir).unwrap_or_else(|e| { - panic!("failed to create `{}`: {e}", parent_dir.display()) - }); - } - fs::write(&local_artifact_path, artifact).unwrap_or_else(|e| { - panic!( - "failed to write artifact `{}`: {e}", - local_artifact_path.display() - ) - }); + { + let parent_dir = local_artifact_path.parent().unwrap(); + fs::create_dir_all(parent_dir) + .unwrap_or_else(|e| panic!("failed to create `{}`: {e}", parent_dir.display())); } + fs::write(&local_artifact_path, artifact).unwrap_or_else(|e| { + panic!( + "failed to write artifact `{}`: {e}", + local_artifact_path.display() + ) + }); + } + }); + let max_parallel_artifact_downloads = usize::from(max_parallel_artifact_downloads.get()); + artifacts + .for_each_concurrent(max_parallel_artifact_downloads, |()| async { + progress_bar.inc(1) }) .await; }