diff --git a/Cargo.toml b/Cargo.toml index 10cf920b..c9d8f271 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ filetime = "^0.2.1" libc = "^0.2.4" serde = { version = "1.0.89", features = ["derive"], optional = true } walkdir = "^2.0.1" +chashmap = "2.2.2" [target.'cfg(target_os="linux")'.dependencies] inotify = { version = "^0.7", default-features = false } diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index ff26bf6e..70a2ad50 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -9,7 +9,7 @@ use crossbeam_channel::Sender; use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::time::Duration; pub type OperationsBuffer = Arc, Option, Option)>>>; @@ -45,7 +45,10 @@ impl EventTx { } pub fn new_debounced(tx: Sender, debounce: Debounce) -> Self { - EventTx::Debounced { tx, debounce: Arc::new(Mutex::new(debounce)) } + EventTx::Debounced { + tx, + debounce: Arc::new(Mutex::new(debounce)), + } } pub fn debounced_tx(&self) -> Self { @@ -59,7 +62,7 @@ impl EventTx { match self { EventTx::Debounced { ref debounce, .. } => { debounce.lock().unwrap().configure(config, tx); - }, + } _ => {} } } @@ -135,7 +138,7 @@ impl Debounce { pub fn configure(&mut self, config: Config, tx: Sender>) { tx.send(match config { - Config::OngoingWrites(c) => self.timer.set_ongoing_write_duration(c), + Config::OngoingWrites(c) => self.timer.set_ongoing_writes(c), _ => Ok(false), }) .expect("configuration channel disconnected"); @@ -294,7 +297,7 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); - handle_ongoing_write_event(&self.timer, path.clone(), &self.tx); + self.timer.handle_ongoing_write(&path, &self.tx); } // upgrade to write event @@ -552,24 +555,3 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim } *timer_id = Some(timer.schedule(path)); } - -fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &Sender) { - let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap(); - let mut event_details = Option::None; - if let Some(ref i) = *ongoing_write_event { - let now = Instant::now(); - if i.0 <= now { - //fire event - let _ = tx.send(DebouncedEvent::OngoingWrite((i.1).clone())); - } else { - event_details = Some((i.0, i.1.clone())); - } - } else { - //schedule event - if let Some(d) = timer.ongoing_write_duration { - let fire_at = Instant::now() + d; - event_details = Some((fire_at, path)); - } - } - *ongoing_write_event = event_details; -} diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 557b7d3f..22cfe796 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -1,4 +1,5 @@ use super::super::{op, DebouncedEvent, Error, Result}; +use chashmap::CHashMap; use crossbeam_channel::Sender; use std::collections::VecDeque; use std::path::PathBuf; @@ -26,7 +27,7 @@ struct ScheduleWorker { tx: Sender, operations_buffer: OperationsBuffer, stopped: Arc, - worker_ongoing_write_event: Arc>>, + ongoing_writes: Arc>, } impl ScheduleWorker { @@ -58,10 +59,7 @@ impl ScheduleWorker { let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), Some(op::Op::WRITE) => { - //disable ongoing_write - let mut ongoing_write_event = - self.worker_ongoing_write_event.lock().unwrap(); - *ongoing_write_event = None; + self.ongoing_writes.remove(&path); Some(DebouncedEvent::Write(path)) } Some(op::Op::METADATA) => Some(DebouncedEvent::Metadata(path)), @@ -124,8 +122,8 @@ pub struct WatchTimer { delay: Duration, events: Arc>>, stopped: Arc, - pub ongoing_write_event: Arc>>, - pub ongoing_write_duration: Option, + ongoing_writes: Arc>, + ongoing_delay: Option, } impl WatchTimer { @@ -138,13 +136,13 @@ impl WatchTimer { let new_event_trigger = Arc::new(Condvar::new()); let stop_trigger = Arc::new(Condvar::new()); let stopped = Arc::new(AtomicBool::new(false)); + let ongoing_writes = Arc::new(CHashMap::new()); let worker_new_event_trigger = new_event_trigger.clone(); let worker_stop_trigger = stop_trigger.clone(); let worker_events = events.clone(); let worker_stopped = stopped.clone(); - let ongoing_write_event = Arc::new(Mutex::new(None)); - let worker_ongoing_write_event = ongoing_write_event.clone(); + let worker_ongoing_writes = ongoing_writes.clone(); thread::spawn(move || { ScheduleWorker { new_event_trigger: worker_new_event_trigger, @@ -153,7 +151,7 @@ impl WatchTimer { tx, operations_buffer, stopped: worker_stopped, - worker_ongoing_write_event, + ongoing_writes: worker_ongoing_writes, } .run(); }); @@ -165,22 +163,39 @@ impl WatchTimer { delay, events, stopped, - ongoing_write_event, - ongoing_write_duration: None, + ongoing_writes, + ongoing_delay: None, } } - pub fn set_ongoing_write_duration(&mut self, duration: Option) -> Result { - if let Some(duration) = duration { - if duration > self.delay { + pub fn set_ongoing_writes(&mut self, delay: Option) -> Result { + if let Some(delay) = delay { + if delay > self.delay { return Err(Error::InvalidConfigValue); } + } else if self.ongoing_delay.is_some() { + // Reset the current ongoing state when disabling + self.ongoing_writes.clear(); } - self.ongoing_write_duration = duration; + self.ongoing_delay = delay; Ok(true) } + pub fn handle_ongoing_write(&self, path: &PathBuf, tx: &Sender) { + if let Some(delay) = self.ongoing_delay { + self.ongoing_writes.upsert( + path.clone(), + || Instant::now() + delay, + |fire_at| { + if fire_at <= &mut Instant::now() { + tx.send(DebouncedEvent::OngoingWrite(path.clone())).ok(); + } + }, + ); + } + } + pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); diff --git a/src/fsevent.rs b/src/fsevent.rs index d66c7dcd..ecb1f3f2 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -373,10 +373,7 @@ impl Watcher for FsEventWatcher { since_when: fs::kFSEventStreamEventIdSinceNow, latency: 0.0, flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer, - event_tx: Arc::new(EventTx::new_debounced( - tx.clone(), - Debounce::new(delay, tx), - )), + event_tx: Arc::new(EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx))), runloop: None, context: None, recursive_info: HashMap::new(), diff --git a/src/inotify.rs b/src/inotify.rs index c4d4e044..03fef4a4 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -289,10 +289,7 @@ impl EventLoop { } if !o.is_empty() { - send_pending_rename_event( - &mut self.rename_event, - &self.event_tx, - ); + send_pending_rename_event(&mut self.rename_event, &self.event_tx); self.event_tx.send(RawEvent { path: path, @@ -460,10 +457,7 @@ impl Watcher for INotifyWatcher { fn new(tx: Sender, delay: Duration) -> Result { let inotify = Inotify::init()?; - let event_tx = EventTx::new_debounced( - tx.clone(), - Debounce::new(delay, tx), - ); + let event_tx = EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx)); let event_loop = EventLoop::new(inotify, event_tx)?; let channel = event_loop.channel(); event_loop.run(); diff --git a/src/lib.rs b/src/lib.rs index 9298c189..2e380364 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -119,6 +119,7 @@ extern crate anymap; #[macro_use] extern crate bitflags; +extern crate chashmap; extern crate crossbeam_channel; extern crate filetime; #[cfg(target_os = "macos")] diff --git a/src/poll.rs b/src/poll.rs index 33cc8cf4..8e507b2e 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -11,7 +11,10 @@ use filetime::FileTime; use std::collections::HashMap; use std::fs; use std::path::{Path, PathBuf}; -use std::sync::{Arc, atomic::{AtomicBool, Ordering}, Mutex}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; use std::thread; use std::time::{Duration, Instant}; @@ -197,10 +200,7 @@ impl Watcher for PollWatcher { } fn new(tx: Sender, delay: Duration) -> Result { - let event_tx = EventTx::new_debounced( - tx.clone(), - Debounce::new(delay, tx), - ); + let event_tx = EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx)); let mut p = PollWatcher { event_tx: event_tx.debounced_tx(), watches: Arc::new(Mutex::new(HashMap::new())), diff --git a/tests/watcher.rs b/tests/watcher.rs index 47ee212c..f7615ce4 100644 --- a/tests/watcher.rs +++ b/tests/watcher.rs @@ -700,7 +700,8 @@ fn poll_watch_recursive_create_directory() { let tdir = TempDir::new("temp_dir").expect("failed to create temporary directory"); let (tx, rx) = unbounded(); - let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher"); + let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)) + .expect("failed to create poll watcher"); watcher .watch(tdir.mkpath("."), RecursiveMode::Recursive) .expect("failed to watch directory"); @@ -738,7 +739,8 @@ fn poll_watch_recursive_move() { tdir.create_all(vec!["dir1a"]); let (tx, rx) = unbounded(); - let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher"); + let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)) + .expect("failed to create poll watcher"); watcher .watch(tdir.mkpath("."), RecursiveMode::Recursive) .expect("failed to watch directory"); @@ -802,7 +804,8 @@ fn poll_watch_recursive_move_in() { tdir.create_all(vec!["watch_dir", "dir1a/dir1"]); let (tx, rx) = unbounded(); - let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher"); + let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)) + .expect("failed to create poll watcher"); watcher .watch(tdir.mkpath("watch_dir"), RecursiveMode::Recursive) .expect("failed to watch directory"); @@ -855,7 +858,8 @@ fn poll_watch_recursive_move_out() { tdir.create_all(vec!["watch_dir/dir1a/dir1"]); let (tx, rx) = unbounded(); - let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher"); + let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)) + .expect("failed to create poll watcher"); watcher .watch(tdir.mkpath("watch_dir"), RecursiveMode::Recursive) .expect("failed to watch directory"); @@ -909,7 +913,8 @@ fn poll_watch_nonrecursive() { tdir.create_all(vec!["dir1"]); let (tx, rx) = unbounded(); - let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher"); + let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)) + .expect("failed to create poll watcher"); watcher .watch(tdir.mkpath("."), RecursiveMode::NonRecursive) .expect("failed to watch directory"); @@ -939,7 +944,8 @@ fn poll_watch_file() { tdir.create_all(vec!["file1"]); let (tx, rx) = unbounded(); - let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher"); + let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)) + .expect("failed to create poll watcher"); watcher .watch(tdir.mkpath("file1"), RecursiveMode::Recursive) .expect("failed to watch directory");