From aeb3d5afab8d4e6a40d8eeb6c473f058a5920f47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Sun, 22 Oct 2023 18:25:56 +0200 Subject: [PATCH] Report cache flusher errors to the caller By reporting cache flushing problems to the caller, we can log those errors through Log instead of using eprintln. Additionally, flush errors at exit are also logged now. --- Cargo.lock | 11 +++++++++++ fclones/Cargo.toml | 1 + fclones/src/cache.rs | 40 +++++++++++++++++++++++++++------------- fclones/src/hasher.rs | 10 ++++++++++ 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cc58708..107fb62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,6 +314,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.3" @@ -493,6 +503,7 @@ dependencies = [ "chrono", "clap", "console", + "crossbeam-channel", "crossbeam-utils", "csv", "dashmap", diff --git a/fclones/Cargo.toml b/fclones/Cargo.toml index 9407fee..19a38b7 100644 --- a/fclones/Cargo.toml +++ b/fclones/Cargo.toml @@ -25,6 +25,7 @@ byte-unit = "4.0" chrono = { version = "0.4", default-features = false, features = ["serde", "clock", "std"] } clap = { version = "4.4", features = ["derive", "cargo", "wrap_help"] } console = "0.15" +crossbeam-channel = "0.5" crossbeam-utils = "0.8" csv = "1.1" dashmap = "5.2" diff --git a/fclones/src/cache.rs b/fclones/src/cache.rs index daf701b..30e1eb5 100644 --- a/fclones/src/cache.rs +++ b/fclones/src/cache.rs @@ -1,9 +1,9 @@ //! Persistent caching of file hashes +use crossbeam_channel::RecvTimeoutError; use std::fmt::{Display, Formatter}; use std::fs::create_dir_all; -use std::sync::mpsc::{channel, RecvTimeoutError, Sender}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::thread; use std::thread::JoinHandle; use std::time::{Duration, UNIX_EPOCH}; @@ -47,7 +47,7 @@ const FLUSH_INTERVAL: Duration = Duration::from_millis(1000); /// them from file data. pub struct HashCache { cache: Arc, - _flusher: HashCacheFlusher, + flusher: HashCacheFlusher, } impl HashCache { @@ -76,10 +76,7 @@ impl HashCache { let tree_id = format!("hash_db:{:?}:{}", algorithm, transform.unwrap_or("")); let cache = Arc::new(typed_sled::Tree::open(&db, tree_id)); let flusher = HashCacheFlusher::start(&cache); - Ok(HashCache { - cache, - _flusher: flusher, - }) + Ok(HashCache { cache, flusher }) } /// Opens the file hash database located in `fclones` subdir of user cache directory. @@ -115,7 +112,11 @@ impl HashCache { .insert(key, &value) .map_err(|e| format!("Failed to write entry to cache: {e}"))?; - Ok(()) + // Check for cache flush errors. If there were errors, report them to the caller. + match self.flusher.err_channel.try_recv() { + Ok(err) => Err(err), + Err(_) => Ok(()), + } } /// Retrieves the cached hash of a file. @@ -163,24 +164,36 @@ impl HashCache { }; Ok(key) } + + /// Flushes all unwritten data and closes the cache. + pub fn close(self) -> Result<(), Error> { + self.cache + .flush() + .map_err(|e| format!("Failed to flush cache: {e}"))?; + Ok(()) + } } /// Periodically flushes the cache in a background thread struct HashCacheFlusher { thread_handle: Option>, - control_channel: Option>>, // wrapped in Mutex because Sender is not Send in older versions of Rust + control_channel: Option>, + err_channel: crossbeam_channel::Receiver, } impl HashCacheFlusher { fn start(cache: &Arc) -> HashCacheFlusher { let cache = Arc::downgrade(cache); - let (tx, rx) = channel::<()>(); + let (ctrl_tx, ctrl_rx) = crossbeam_channel::bounded::<()>(1); + let (err_tx, err_rx) = crossbeam_channel::bounded(1); let thread_handle = thread::spawn(move || { - while let Err(RecvTimeoutError::Timeout) = rx.recv_timeout(FLUSH_INTERVAL) { + while let Err(RecvTimeoutError::Timeout) = ctrl_rx.recv_timeout(FLUSH_INTERVAL) { if let Some(cache) = cache.upgrade() { if let Err(e) = cache.flush() { - eprintln!("Failed to flush hash cache: {e}"); + err_tx + .send(format!("Failed to flush the hash cache: {e}").into()) + .unwrap_or_default(); return; } } @@ -189,7 +202,8 @@ impl HashCacheFlusher { HashCacheFlusher { thread_handle: Some(thread_handle), - control_channel: Some(Mutex::new(tx)), + control_channel: Some(ctrl_tx), + err_channel: err_rx, } } } diff --git a/fclones/src/hasher.rs b/fclones/src/hasher.rs index 415545c..7c8d2a4 100644 --- a/fclones/src/hasher.rs +++ b/fclones/src/hasher.rs @@ -437,6 +437,16 @@ impl FileHasher<'_> { } } +impl<'a> Drop for FileHasher<'a> { + fn drop(&mut self) { + if let Some(cache) = self.cache.take() { + if let Err(e) = cache.close() { + self.log.warn(e); + } + } + } +} + fn format_output_stream(output: &str) -> String { let output = output.trim().to_string(); if output.is_empty() {