Skip to content

Commit

Permalink
sync: Wrap all download file code inside again_async
Browse files Browse the repository at this point in the history
  • Loading branch information
taoky committed Aug 11, 2024
1 parent 4644634 commit 927e6bc
Showing 1 changed file with 68 additions and 60 deletions.
128 changes: 68 additions & 60 deletions src/cli/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,70 +139,78 @@ async fn download_file(
) -> Result<()> {
// Here we use async to allow streaming and progress bar
// Ref: https://gist.github.com/giuliano-oliveira/4d11d6b3bb003dba3a1b53f43d81b30d
let url = item.url.clone();
let resp = match again_async(|| get_async(client, url.clone()), args.retry).await {
Ok(resp) => resp,
Err(e) => {
error!("Failed to GET {}: {:?}", url, e);
return Err(e);
}
};
let total_size = match resp.content_length() {
Some(s) => s,
None => {
warn!("URL {} does not give a content length", url);
// This value would be used only for showing progress bar.
0
}
};
let pb = mprogress.add(ProgressBar::new(total_size));
pb.set_style(
ProgressStyle::default_bar()
.template("{msg}\n[{elapsed_precise}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
.unwrap()
.progress_chars("#>-"),
);
pb.set_message(format!("Downloading {}", url));

let mtime = match utils::get_async_response_mtime(&resp) {
Ok(mtime) => mtime,
Err(e) => {
if args.allow_mtime_from_parser {
naive_to_utc(&item.mtime, timezone)
} else {
error!("Failed to get mtime of {}: {:?}", url, e);
return Err(e);
}
}
};

let tmp_path = cwd.join(format!(".tmp.{}", item.name));
{
let mut dest_file = File::create(&tmp_path).unwrap();
let mut stream = resp.bytes_stream();
again_async(
|| async {
let url = item.url.clone();
let resp = match get_async(client, url.clone()).await {
Ok(resp) => resp,
Err(e) => {
error!("Failed to GET {}: {:?}", url, e);
return Err(e);
}
};
let total_size = match resp.content_length() {
Some(s) => s,
None => {
warn!("URL {} does not give a content length", url);
// This value would be used only for showing progress bar.
0
}
};
let pb = mprogress.add(ProgressBar::new(total_size));
pb.set_style(
ProgressStyle::default_bar()
.template(
"{msg}\n[{elapsed_precise}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})",
)
.unwrap()
.progress_chars("#>-"),
);
pb.set_message(format!("Downloading {}", url));

while let Some(item) = stream.next().await {
let chunk = match item {
Ok(i) => i,
let mtime = match utils::get_async_response_mtime(&resp) {
Ok(mtime) => mtime,
Err(e) => {
error!("Failed when downloading {}: {:?}", url, e);
return Err(e.into());
if args.allow_mtime_from_parser {
naive_to_utc(&item.mtime, timezone)
} else {
error!("Failed to get mtime of {}: {:?}", url, e);
return Err(e);
}
}
};
dest_file.write_all(&chunk).unwrap();
let new = std::cmp::min(pb.position() + (chunk.len() as u64), total_size);
pb.set_position(new);
}
filetime::set_file_handle_times(
&dest_file,
None,
Some(filetime::FileTime::from_system_time(mtime.into())),
)
.unwrap();
}
// move tmp file to expected path
std::fs::rename(&tmp_path, path).unwrap();
Ok(())

let tmp_path = cwd.join(format!(".tmp.{}", item.name));
{
let mut dest_file = File::create(&tmp_path).unwrap();
let mut stream = resp.bytes_stream();

while let Some(item) = stream.next().await {
let chunk = match item {
Ok(i) => i,
Err(e) => {
error!("Failed when downloading {}: {:?}", url, e);
return Err(e.into());
}
};
dest_file.write_all(&chunk).unwrap();
let new = std::cmp::min(pb.position() + (chunk.len() as u64), total_size);
pb.set_position(new);
}
filetime::set_file_handle_times(
&dest_file,
None,
Some(filetime::FileTime::from_system_time(mtime.into())),
)
.unwrap();
}
// move tmp file to expected path
std::fs::rename(&tmp_path, path).unwrap();
Ok(())
},
args.retry,
)
.await
}

struct ThreadsContext<'a> {
Expand Down

0 comments on commit 927e6bc

Please sign in to comment.