diff --git a/src/cli/sync.rs b/src/cli/sync.rs index 6c0a966..0ca793c 100644 --- a/src/cli/sync.rs +++ b/src/cli/sync.rs @@ -43,21 +43,29 @@ struct Task { url: Url, } +fn worker_add_task(worker: &Worker, wake: &AtomicUsize, task: Task) { + worker.push(task); + wake.fetch_add(1, Ordering::SeqCst); +} + fn extension_push_task(worker: &Worker, wake: &AtomicUsize, package: &ExtensionPackage) { - worker.push(Task { - task: TaskType::Download(ListItem { + worker_add_task( + worker, + wake, + Task { + task: TaskType::Download(ListItem { + url: package.url.clone(), + name: package.filename.clone(), + type_: listing::FileType::File, + // size and mtime would be ignored as skip_check is set + size: None, + mtime: NaiveDateTime::default(), + skip_check: true, + }), + relative: package.relative.clone(), url: package.url.clone(), - name: package.filename.clone(), - type_: listing::FileType::File, - // size and mtime would be ignored as skip_check is set - size: None, - mtime: NaiveDateTime::default(), - skip_check: true, - }), - relative: package.relative.clone(), - url: package.url.clone(), - }); - wake.fetch_add(1, Ordering::SeqCst); + }, + ); } fn determinate_timezone( @@ -183,7 +191,7 @@ async fn download_file( Ok(()) } -struct ThreadsOptions<'a> { +struct ThreadsContext<'a> { bind_address: Option, download_dir: &'a Path, remote_list: &'a Arc>>, @@ -193,18 +201,268 @@ struct ThreadsOptions<'a> { failure_downloading: &'a AtomicBool, } -fn sync_threads(args: &SyncArgs, parser: &dyn crate::parser::Parser, options: &ThreadsOptions) { +struct TaskContext<'a> { + task: &'a Task, + cwd: &'a Path, + relative: &'a str, + worker: &'a Worker, + wake: &'a AtomicUsize, + blocking_client: &'a reqwest::blocking::Client, + // async_client: &'a reqwest::Client, + exclusion_result: regex_process::Comparison, + exclusion_manager: &'a ExclusionManager, + timezone: Option, +} + +struct AsyncDownloadContext<'a> { + async_client: &'a reqwest::Client, + mprogress: &'a MultiProgress, + runtime: &'a tokio::runtime::Runtime, +} + +fn list_handler( + args: &SyncArgs, + parser: &dyn crate::parser::Parser, + thr_context: &ThreadsContext, + task_context: &TaskContext, +) { + let task = task_context.task; + let cwd = task_context.cwd; + info!("Listing {}", task.url); + { + thr_context + .remote_list + .lock() + .unwrap() + .insert(cwd.to_path_buf()); + } + + if is_symlink(cwd) && !task_context.relative.is_empty() { + info!("{:?} is a symlink, ignored", cwd); + return; + } + + let items = match again( + || parser.get_list(task_context.blocking_client, &task.url), + args.retry, + ) { + Ok(items) => items, + Err(e) => { + error!("Failed to list {}: {:?}", task.url, e); + thr_context.failure_listing.store(true, Ordering::SeqCst); + return; + } + }; + match items { + ListResult::List(items) => { + for item in items { + if item.type_ == listing::FileType::Directory { + let mut relative = task.relative.clone(); + relative.push(item.name); + worker_add_task( + task_context.worker, + task_context.wake, + Task { + task: TaskType::Listing, + relative, + url: item.url, + }, + ); + } else { + if task_context.exclusion_result == regex_process::Comparison::ListOnly { + info!("Skipping (by list only) {}", item.url); + continue; + } + worker_add_task( + task_context.worker, + task_context.wake, + Task { + task: TaskType::Download(item.clone()), + relative: task.relative.clone(), + url: item.url, + }, + ); + thr_context.stat_size.fetch_add( + match item.size { + Some(size) => size.get_estimated(), + None => 0, + }, + Ordering::SeqCst, + ); + } + thr_context.stat_objects.fetch_add(1, Ordering::SeqCst); + } + } + ListResult::Redirect(target_url) => { + // This "Redirect" only supports creating symlink of current directory + info!( + "Redirected {} -> {}. Try to create a symlink", + task.url, target_url + ); + if cwd.exists() { + warn!("Skipping symlink creation because {:?} already exists, but it is not a symlink", cwd); + return; + } + // get last segment of target_url + let target_name = match target_url.split('/').nth_back(1) { + Some(name) => name, + None => { + error!("Failed to get last segment of target_url: {}", target_url); + return; + } + }; + info!("Try symlink {:?} -> {}", cwd, target_name); + if let Err(e) = symlink(target_name, cwd.clone()) { + error!( + "Failed to create symlink {:?} -> {}: {:?}", + cwd, target_name, e + ); + } + } + } +} + +fn download_handler( + item: &ListItem, + args: &SyncArgs, + thr_context: &ThreadsContext, + task_context: &TaskContext, + async_context: &AsyncDownloadContext, +) { + let task = task_context.task; + let cwd = task_context.cwd; + // create path in case for first sync + if !args.dry_run { + std::fs::create_dir_all(cwd).unwrap(); + } + // Absolute filesystem path of expected file + let expected_path = cwd.join(&item.name); + // Here relative filepath is only used to check exclusion + let relative_filepath = PathBuf::from(&task_context.relative).join(&item.name); + let relative_filepath = relative_filepath.to_string_lossy(); + debug!( + "expected_path: {:?}, relative: {:?}", + expected_path, relative_filepath + ); + { + if !thr_context + .remote_list + .lock() + .unwrap() + .insert(expected_path.clone()) + { + // It is possible that multiple tasks might download the same file + // (generated by apt/yum parser, etc.) + // skip when we find that some thread has already downloaded it + info!("Skipping already handled {:?}", &expected_path); + return; + } + } + + // We should put relative filepath into exclusion manager here + if task_context.exclusion_manager.match_str(&relative_filepath) + == regex_process::Comparison::Stop + { + info!("Skipping excluded {:?}", &relative_filepath); + return; + } + + let mut should_download = true; + let mut skip_if_exists = false; + for i in &args.skip_if_exists { + if i.is_match(&relative_filepath) { + skip_if_exists = true; + break; + } + } + + // Following code requires real filesystem path (expected_path) to work + if !should_download_by_list( + &expected_path, + item, + task_context.timezone, + skip_if_exists, + false, + ) { + info!("Skipping {}", task.url); + should_download = false; + } + + let mut compare_size_only = false; + for i in &args.compare_size_only { + if i.is_match(&expected_path.to_string_lossy()) { + compare_size_only = true; + break; + } + } + + if should_download && args.head_before_get { + match again( + || head(task_context.blocking_client, item.url.clone()), + args.retry, + ) { + Ok(resp) => { + if !should_download_by_head(&expected_path, &resp, compare_size_only) { + info!("Skipping (by HEAD) {}", task.url); + should_download = false; + } + } + Err(e) => { + error!("Failed to HEAD {}: {:?}", task.url, e); + thr_context + .failure_downloading + .store(true, Ordering::SeqCst); + should_download = false; + } + }; + } + + if should_download && !args.dry_run { + let future = async { + if (download_file( + async_context.async_client, + item, + &expected_path, + args, + async_context.mprogress, + task_context.timezone, + cwd, + ) + .await) + .is_err() + { + thr_context + .failure_downloading + .store(true, Ordering::SeqCst); + } + }; + async_context.runtime.block_on(future); + } else if should_download { + info!("Dry run, not downloading {}", task.url); + } + + extension_handler(args, &expected_path, &task.relative, &item.url, |package| { + extension_push_task(task_context.worker, task_context.wake, package); + }); +} + +fn sync_threads(args: &SyncArgs, parser: &dyn crate::parser::Parser, thr_context: &ThreadsContext) { let exclusion_manager = ExclusionManager::new(&args.exclude, &args.include); let client = build_client!( reqwest::blocking::Client, args, parser, - options.bind_address.as_ref() + thr_context.bind_address.as_ref() ); // async support let runtime = tokio::runtime::Runtime::new().unwrap(); - let async_client = build_client!(reqwest::Client, args, parser, options.bind_address.as_ref()); + let async_client = build_client!( + reqwest::Client, + args, + parser, + thr_context.bind_address.as_ref() + ); let mprogress = MultiProgress::with_draw_target(ProgressDrawTarget::term_like_with_hz( Box::new(AlternativeTerm::buffered_stdout()), @@ -214,7 +472,7 @@ fn sync_threads(args: &SyncArgs, parser: &dyn crate::parser::Parser, options: &T let timezone = determinate_timezone(args, parser, &client); if !args.dry_run { - std::fs::create_dir_all(options.download_dir).unwrap(); + std::fs::create_dir_all(thr_context.download_dir).unwrap(); } let workers: Vec<_> = (0..args.threads) @@ -247,7 +505,7 @@ fn sync_threads(args: &SyncArgs, parser: &dyn crate::parser::Parser, options: &T .and_then(|s| s.success()) }) { let relative = task.relative.join("/"); - let cwd = options.download_dir.join(&relative); + let cwd = thr_context.download_dir.join(&relative); debug!("cwd: {:?}, relative: {:?}", cwd, relative); // exclude this? // note that it only checks the relative folder! @@ -259,159 +517,34 @@ fn sync_threads(args: &SyncArgs, parser: &dyn crate::parser::Parser, options: &T } else if exclusion_result == regex_process::Comparison::ListOnly { info!("List only in {:?}", &relative); } - match task.task { + let task_context = TaskContext { + task: &task, + cwd: &cwd, + relative: &relative, + worker: &worker, + wake: &wake, + blocking_client: &client, + exclusion_result, + exclusion_manager: &exclusion_manager, + timezone, + }; + match &task.task { TaskType::Listing => { - info!("Listing {}", task.url); - { - options.remote_list.lock().unwrap().insert(cwd.clone()); - } - - if is_symlink(&cwd) && !relative.is_empty() { - info!("{:?} is a symlink, ignored", cwd); - continue; - } - - let items = match again(|| parser.get_list(&client, &task.url), args.retry) { - Ok(items) => items, - Err(e) => { - error!("Failed to list {}: {:?}", task.url, e); - options.failure_listing.store(true, Ordering::SeqCst); - continue; - } - }; - match items { - ListResult::List(items) => { - for item in items { - if item.type_ == listing::FileType::Directory { - let mut relative = task.relative.clone(); - relative.push(item.name); - worker.push(Task { - task: TaskType::Listing, - relative, - url: item.url, - }); - wake.fetch_add(1, Ordering::SeqCst); - } else { - if exclusion_result == regex_process::Comparison::ListOnly { - info!("Skipping (by list only) {}", item.url); - continue; - } - worker.push(Task { - task: TaskType::Download(item.clone()), - relative: task.relative.clone(), - url: item.url, - }); - wake.fetch_add(1, Ordering::SeqCst); - options.stat_size.fetch_add(match item.size { - Some(size) => size.get_estimated(), - None => 0, - }, Ordering::SeqCst); - } - options.stat_objects.fetch_add(1, Ordering::SeqCst); - } - } - ListResult::Redirect(target_url) => { - // This "Redirect" only supports creating symlink of current directory - info!("Redirected {} -> {}. Try to create a symlink", task.url, target_url); - if cwd.exists() { - warn!("Skipping symlink creation because {:?} already exists, but it is not a symlink", cwd); - continue; - } - // get last segment of target_url - let target_name = match target_url.split('/').nth_back(1) { - Some(name) => name, - None => { - error!("Failed to get last segment of target_url: {}", target_url); - continue; - } - }; - info!("Try symlink {:?} -> {}", cwd, target_name); - if let Err(e) = symlink(target_name, cwd.clone()) { - error!("Failed to create symlink {:?} -> {}: {:?}", cwd, target_name, e); - } - } - } + list_handler(args, parser, thr_context, &task_context); } TaskType::Download(item) => { - // create path in case for first sync - if !args.dry_run { - std::fs::create_dir_all(&cwd).unwrap(); - } - // Absolute filesystem path of expected file - let expected_path = cwd.join(&item.name); - // Here relative filepath is only used to check exclusion - let relative_filepath = PathBuf::from(&relative).join(&item.name); - let relative_filepath = relative_filepath.to_string_lossy(); - debug!("expected_path: {:?}, relative: {:?}", expected_path, relative_filepath); - { - if !options.remote_list.lock().unwrap().insert(expected_path.clone()) { - // It is possible that multiple tasks might download the same file - // (generated by apt/yum parser, etc.) - // skip when we find that some thread has already downloaded it - info!("Skipping already handled {:?}", &expected_path); - continue; - } - } - - // We should put relative filepath into exclusion manager here - if exclusion_manager.match_str(&relative_filepath) == regex_process::Comparison::Stop { - info!("Skipping excluded {:?}", &relative_filepath); - continue; - } - - let mut should_download = true; - let mut skip_if_exists = false; - for i in &args.skip_if_exists { - if i.is_match(&relative_filepath) { - skip_if_exists = true; - break; - } - } - - // Following code requires real filesystem path (expected_path) to work - if !should_download_by_list(&expected_path, &item, timezone, skip_if_exists, false) { - info!("Skipping {}", task.url); - should_download = false; - } - - let mut compare_size_only = false; - for i in &args.compare_size_only { - if i.is_match(&expected_path.to_string_lossy()) { - compare_size_only = true; - break; - } - } - - if should_download && args.head_before_get { - match again(|| head(&client, item.url.clone()), args.retry) { - Ok(resp) => { - if !should_download_by_head(&expected_path, &resp, compare_size_only) { - info!("Skipping (by HEAD) {}", task.url); - should_download = false; - } - }, - Err(e) => { - error!("Failed to HEAD {}: {:?}", task.url, e); - options.failure_downloading.store(true, Ordering::SeqCst); - should_download = false; - } - }; - } - - if should_download && !args.dry_run { - let future = async { - if (download_file(&async_client, &item, &expected_path, args, &mprogress, timezone, &cwd).await).is_err() { - options.failure_downloading.store(true, Ordering::SeqCst); - } - }; - runtime.block_on(future); - } else if should_download { - info!("Dry run, not downloading {}", task.url); - } - - extension_handler(args, &expected_path, &task.relative, &item.url, |package| { - extension_push_task(&worker, &wake, package); - }); + let async_context = AsyncDownloadContext { + async_client: &async_client, + mprogress: &mprogress, + runtime: &runtime, + }; + download_handler( + item, + args, + thr_context, + &task_context, + &async_context, + ); } } } @@ -467,7 +600,7 @@ pub fn sync(args: &SyncArgs, bind_address: Option) -> ! { sync_threads( args, &*parser, - &ThreadsOptions { + &ThreadsContext { bind_address, download_dir, remote_list: &remote_list, diff --git a/src/main.rs b/src/main.rs index 690e8ce..e72a17b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ #![warn(clippy::cognitive_complexity)] -#![warn(clippy::too_many_lines)] use std::path::PathBuf; use clap::{Parser, Subcommand};