Skip to content

Commit

Permalink
seperate async download_file() out
Browse files Browse the repository at this point in the history
  • Loading branch information
taoky committed Jan 2, 2024
1 parent 81fdb2a commit 98c88ef
Showing 1 changed file with 66 additions and 48 deletions.
114 changes: 66 additions & 48 deletions src/cli/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
},
};

use anyhow::Result;
use chrono::{FixedOffset, NaiveDateTime};
use crossbeam_deque::{Injector, Worker};
use futures_util::StreamExt;
Expand Down Expand Up @@ -119,6 +120,69 @@ fn determinate_timezone(
}
}

async fn download_file(
client: &reqwest::Client,
item: &ListItem,
path: &Path,
args: &SyncArgs,
mprogress: &MultiProgress,
timezone: Option<FixedOffset>,
cwd: &Path,
) -> Result<()> {
// Here we use async to allow streaming and progress bar
// Ref: https://gist.github.com/giuliano-oliveira/4d11d6b3bb003dba3a1b53f43d81b30d
let resp = match again_async(|| get_async(client, item.url.clone()), args.retry).await {
Ok(resp) => resp,
Err(e) => {
error!("Failed to GET {}: {:?}", item.url, e);
return Err(e);
}
};
let total_size = resp.content_length().unwrap();
let pb = mprogress.add(ProgressBar::new(total_size));
pb.set_style(
ProgressStyle::default_bar()
.template("{msg}\n[{elapsed_precise}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
.unwrap()
.progress_chars("#>-"),
);
pb.set_message(format!("Downloading {}", item.url));

let mtime = match utils::get_async_response_mtime(&resp) {
Ok(mtime) => mtime,
Err(e) => {
if args.allow_mtime_from_parser {
naive_to_utc(&item.mtime, timezone)
} else {
error!("Failed to get mtime of {}: {:?}", item.url, e);
return Err(e);
}
}
};

let tmp_path = cwd.join(format!(".tmp.{}", item.name));
{
let mut dest_file = File::create(&tmp_path).unwrap();
let mut stream = resp.bytes_stream();

while let Some(item) = stream.next().await {
let chunk = item.unwrap();
dest_file.write_all(&chunk).unwrap();
let new = std::cmp::min(pb.position() + (chunk.len() as u64), total_size);
pb.set_position(new);
}
filetime::set_file_handle_times(
&dest_file,
None,
Some(filetime::FileTime::from_system_time(mtime.into())),
)
.unwrap();
}
// move tmp file to expected path
std::fs::rename(&tmp_path, path).unwrap();
Ok(())
}

struct ThreadsOptions<'a> {
bind_address: Option<String>,
download_dir: &'a Path,
Expand Down Expand Up @@ -336,55 +400,9 @@ fn sync_threads(args: &SyncArgs, parser: &dyn crate::parser::Parser, options: &T

if should_download && !args.dry_run {
let future = async {
// Here we use async to allow streaming and progress bar
// Ref: https://gist.github.com/giuliano-oliveira/4d11d6b3bb003dba3a1b53f43d81b30d
let resp = match again_async(|| get_async(&async_client, item.url.clone()), args.retry).await {
Ok(resp) => resp,
Err(e) => {
error!("Failed to GET {}: {:?}", task.url, e);
options.failure_downloading.store(true, Ordering::SeqCst);
return;
}
};
let total_size = resp.content_length().unwrap();
let pb = mprogress.add(ProgressBar::new(total_size));
pb.set_style(ProgressStyle::default_bar()
.template("{msg}\n[{elapsed_precise}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap()
.progress_chars("#>-"));
pb.set_message(format!("Downloading {}", item.url));

let mtime = match utils::get_async_response_mtime(&resp) {
Ok(mtime) => mtime,
Err(e) => {
if args.allow_mtime_from_parser {
naive_to_utc(&item.mtime, timezone)
} else {
error!("Failed to get mtime of {}: {:?}", task.url, e);
options.failure_downloading.store(true, Ordering::SeqCst);
return;
}}
};

let tmp_path = cwd.join(format!(".tmp.{}", item.name));
{
let mut dest_file = File::create(&tmp_path).unwrap();
let mut stream = resp.bytes_stream();

while let Some(item) = stream.next().await {
let chunk = item.unwrap();
dest_file.write_all(&chunk).unwrap();
let new = std::cmp::min(pb.position() + (chunk.len() as u64), total_size);
pb.set_position(new);
}
filetime::set_file_handle_times(
&dest_file,
None,
Some(filetime::FileTime::from_system_time(mtime.into())),
)
.unwrap();
if (download_file(&async_client, &item, &expected_path, args, &mprogress, timezone, &cwd).await).is_err() {
options.failure_downloading.store(true, Ordering::SeqCst);
}
// move tmp file to expected path
std::fs::rename(&tmp_path, &expected_path).unwrap();
};
runtime.block_on(future);
} else if should_download {
Expand Down

0 comments on commit 98c88ef

Please sign in to comment.