From c3bd08a7ec4685fbc3de029310b4e8380a564227 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Sun, 24 Feb 2019 12:28:25 -0500 Subject: [PATCH 01/16] WPI: First attempt at on_going_write --- src/debounce/mod.rs | 4 ++++ src/debounce/timer.rs | 23 ++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 53bed5b8..830e15cf 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -249,6 +249,7 @@ impl Debounce { // keep write event / not need to emit NoticeWrite because // it already was a write event Some(op::Op::WRITE) => { + println!("Delaying write..."); restart_timer(timer_id, path.clone(), &mut self.timer); } @@ -260,8 +261,11 @@ impl Debounce { // operations_buffer entry didn't exist None => { + println!("Sending notice write..."); *operation = Some(op::Op::WRITE); let _ = self.tx.send(DebouncedEvent::NoticeWrite(path.clone())); + //sj_todo + //schedule on_going_write restart_timer(timer_id, path.clone(), &mut self.timer); } diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 2ec918c4..1d4f14cc 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -26,11 +26,23 @@ struct ScheduleWorker { tx: mpsc::Sender, operations_buffer: OperationsBuffer, stopped: Arc, + worker_on_going_write_event: Arc>>, } impl ScheduleWorker { fn fire_due_events(&self, now: Instant) -> Option { let mut events = self.events.lock().unwrap(); + let on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); + match on_going_write_event { + Some(i) => { + if i <= now { + self.tx.send(op::Op::WRITE); + //sj_todo + //reschedule on_going_write + } + }, + None => {} + } while let Some(event) = events.pop_front() { if event.when <= now { self.fire_event(event) @@ -56,7 +68,11 @@ impl ScheduleWorker { } let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), - Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)), + Some(op::Op::WRITE) => { + Some(DebouncedEvent::Write(path)) + //sj_todo + //reschedule on_going_write + }, Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)), Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)), Some(op::Op::RENAME) if is_partial_rename => { @@ -116,6 +132,7 @@ pub struct WatchTimer { delay: Duration, events: Arc>>, stopped: Arc, + on_going_write_event: Arc>>, } impl WatchTimer { @@ -133,6 +150,8 @@ impl WatchTimer { let worker_stop_trigger = stop_trigger.clone(); let worker_events = events.clone(); let worker_stopped = stopped.clone(); + let on_going_write_event = Arc::new(Mutex::new(None)); + let worker_on_going_write_event = on_going_write_event.clone(); thread::spawn(move || { ScheduleWorker { new_event_trigger: worker_new_event_trigger, @@ -141,6 +160,7 @@ impl WatchTimer { tx, operations_buffer, stopped: worker_stopped, + worker_on_going_write_event: worker_on_going_write_event, } .run(); }); @@ -152,6 +172,7 @@ impl WatchTimer { delay, events, stopped, + on_going_write_event, } } From 36462419b046a6b6d28e6d77e3497cd7c1615349 Mon Sep 17 00:00:00 2001 From: Sugs Date: Mon, 25 Feb 2019 21:13:41 -0500 Subject: [PATCH 02/16] WPI: Able to emit on_going_write. Needs lot of refactoring. --- src/debounce/mod.rs | 17 ++++++++++++++--- src/debounce/timer.rs | 36 ++++++++++++++++++++++-------------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 830e15cf..dea7ab0e 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -10,7 +10,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::mpsc; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; pub type OperationsBuffer = Arc, Option, Option)>>>; @@ -264,9 +264,9 @@ impl Debounce { println!("Sending notice write..."); *operation = Some(op::Op::WRITE); let _ = self.tx.send(DebouncedEvent::NoticeWrite(path.clone())); - //sj_todo - //schedule on_going_write restart_timer(timer_id, path.clone(), &mut self.timer); + //sj_todo schedule on_going_write + set_on_going_write_timer(path.clone(), &mut self.timer); } // writing to a deleted file is impossible, @@ -511,3 +511,14 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim } *timer_id = Some(timer.schedule(path)); } + +fn set_on_going_write_timer(path: PathBuf, timer: &mut WatchTimer) { + let tt = Instant::now() + Duration::from_secs(1); + let mut sslkjsdf = timer.on_going_write_event.lock().unwrap(); + if *sslkjsdf == None { + println!("Set on_going_write"); + *sslkjsdf = Some((tt, path)); + } else { + println!("Ignore if set.."); + } +} diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 1d4f14cc..15803c0a 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -26,22 +26,29 @@ struct ScheduleWorker { tx: mpsc::Sender, operations_buffer: OperationsBuffer, stopped: Arc, - worker_on_going_write_event: Arc>>, + worker_on_going_write_event: Arc>>, } impl ScheduleWorker { fn fire_due_events(&self, now: Instant) -> Option { let mut events = self.events.lock().unwrap(); - let on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); - match on_going_write_event { - Some(i) => { - if i <= now { - self.tx.send(op::Op::WRITE); - //sj_todo - //reschedule on_going_write - } - }, - None => {} + let mut write_options = self.worker_on_going_write_event.lock().unwrap(); + let mut emitted = false; + if let Some(ref i) = *write_options { + if i.0 <= now { + println!("Sending on going write..."); + //self.tx.send(DebouncedEvent::Write((i.1).clone())); + //sj_todo + //reschedule on_going_write + emitted = true; + + } + } + if emitted { + if let Some(ref mut i) = *write_options { + let tt = Instant::now() + Duration::from_secs(1); + i.0 = tt; + } } while let Some(event) = events.pop_front() { if event.when <= now { @@ -69,9 +76,10 @@ impl ScheduleWorker { let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), Some(op::Op::WRITE) => { + //sj_todo stop on_going_write + //let (mut on_going_write_event, _) = self.worker_on_going_write_event.lock().unwrap(); + //*on_going_write_event = None; Some(DebouncedEvent::Write(path)) - //sj_todo - //reschedule on_going_write }, Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)), Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)), @@ -132,7 +140,7 @@ pub struct WatchTimer { delay: Duration, events: Arc>>, stopped: Arc, - on_going_write_event: Arc>>, + pub on_going_write_event: Arc>>, } impl WatchTimer { From 3c35d62a07f102660df329c60749ca9233ab7213 Mon Sep 17 00:00:00 2001 From: Sugs Date: Sat, 2 Mar 2019 11:40:21 -0500 Subject: [PATCH 03/16] WPI: Able to emit on_going_write. Needs lot of refactoring. --- src/debounce/mod.rs | 8 +++++--- src/debounce/timer.rs | 19 ++++++------------- src/lib.rs | 23 +++++++++++++++-------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index dea7ab0e..086cda3f 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -510,15 +510,17 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim timer.ignore(timer_id); } *timer_id = Some(timer.schedule(path)); + let ww = timer.on_going_write_event.lock().unwrap(); + if *ww == None { + + } } fn set_on_going_write_timer(path: PathBuf, timer: &mut WatchTimer) { - let tt = Instant::now() + Duration::from_secs(1); + let tt = Instant::now() + Duration::from_secs(3); let mut sslkjsdf = timer.on_going_write_event.lock().unwrap(); if *sslkjsdf == None { println!("Set on_going_write"); *sslkjsdf = Some((tt, path)); - } else { - println!("Ignore if set.."); } } diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 15803c0a..4afd463c 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -34,21 +34,14 @@ impl ScheduleWorker { let mut events = self.events.lock().unwrap(); let mut write_options = self.worker_on_going_write_event.lock().unwrap(); let mut emitted = false; - if let Some(ref i) = *write_options { + if let Some(ref mut i) = *write_options { if i.0 <= now { - println!("Sending on going write..."); - //self.tx.send(DebouncedEvent::Write((i.1).clone())); - //sj_todo - //reschedule on_going_write + self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); emitted = true; - } } if emitted { - if let Some(ref mut i) = *write_options { - let tt = Instant::now() + Duration::from_secs(1); - i.0 = tt; - } + *write_options = None; } while let Some(event) = events.pop_front() { if event.when <= now { @@ -76,9 +69,9 @@ impl ScheduleWorker { let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), Some(op::Op::WRITE) => { - //sj_todo stop on_going_write - //let (mut on_going_write_event, _) = self.worker_on_going_write_event.lock().unwrap(); - //*on_going_write_event = None; + println!("Stopping on_going_write"); + let mut alkjsh = self.worker_on_going_write_event.lock().unwrap(); + *alkjsh = None; Some(DebouncedEvent::Write(path)) }, Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)), diff --git a/src/lib.rs b/src/lib.rs index 6f77e6d6..b5f220fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -310,19 +310,20 @@ pub mod op { /// Multiple actions may be delivered in a single event. pub struct Op: u32 { /// Attributes changed - const CHMOD = 0b0000001; + const CHMOD = 0b00000001; /// Created - const CREATE = 0b0000010; + const CREATE = 0b00000010; /// Removed - const REMOVE = 0b0000100; + const REMOVE = 0b00000100; /// Renamed - const RENAME = 0b0001000; + const RENAME = 0b00001000; /// Written - const WRITE = 0b0010000; + const WRITE = 0b00010000; /// File opened for writing was closed - const CLOSE_WRITE = 0b0100000; + const CLOSE_WRITE = 0b00100000; /// Directories need to be rescanned - const RESCAN = 0b1000000; + const RESCAN = 0b01000000; + const ON_GOING_WRITE = 0b10000000; } } @@ -333,6 +334,7 @@ pub mod op { pub const WRITE: Op = Op::WRITE; pub const CLOSE_WRITE: Op = Op::CLOSE_WRITE; pub const RESCAN: Op = Op::RESCAN; + pub const ON_GOING_WRITE: Op = Op::ON_GOING_WRITE; } #[cfg(test)] @@ -348,12 +350,14 @@ mod op_test { fn new_bitflags_form() { let op = super::op::Op::CHMOD | super::op::Op::WRITE; assert!(op.contains(super::op::Op::WRITE)); + assert!(op.contains(super::op::Op::CHMOD)); } #[test] fn old_bitflags_form() { - let op = super::op::CHMOD | super::op::WRITE; + let op = super::op::CHMOD | super::op::WRITE | super::op::ON_GOING_WRITE; assert!(op.contains(super::op::WRITE)); + assert!(op.contains(super::op::ON_GOING_WRITE)); } } @@ -424,6 +428,9 @@ pub enum DebouncedEvent { /// watched directory. Write(PathBuf), + /// some doc + OnGoingWrite(PathBuf), + /// `Chmod` is emitted when attributes have been changed and no events were detected for the /// path within the specified time frame. Chmod(PathBuf), From 8b6037a92d578ec69377bea17e142c8ed9733e83 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Sat, 2 Mar 2019 15:10:09 -0500 Subject: [PATCH 04/16] 1) Set onGoingWrite event only when Write is delayed. 2) When Write is emitted, we disable onGoingWrite. --- src/debounce/mod.rs | 19 ++++++------------- src/debounce/timer.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 086cda3f..591e3e36 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -249,8 +249,8 @@ impl Debounce { // keep write event / not need to emit NoticeWrite because // it already was a write event Some(op::Op::WRITE) => { - println!("Delaying write..."); restart_timer(timer_id, path.clone(), &mut self.timer); + set_on_going_write_timer(path.clone(), &mut self.timer); } // upgrade to write event @@ -261,12 +261,10 @@ impl Debounce { // operations_buffer entry didn't exist None => { - println!("Sending notice write..."); *operation = Some(op::Op::WRITE); let _ = self.tx.send(DebouncedEvent::NoticeWrite(path.clone())); restart_timer(timer_id, path.clone(), &mut self.timer); - //sj_todo schedule on_going_write - set_on_going_write_timer(path.clone(), &mut self.timer); + //set_on_going_write_timer(path.clone(), &mut self.timer, self.on_going_write_duration); } // writing to a deleted file is impossible, @@ -510,17 +508,12 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim timer.ignore(timer_id); } *timer_id = Some(timer.schedule(path)); - let ww = timer.on_going_write_event.lock().unwrap(); - if *ww == None { - - } } fn set_on_going_write_timer(path: PathBuf, timer: &mut WatchTimer) { - let tt = Instant::now() + Duration::from_secs(3); - let mut sslkjsdf = timer.on_going_write_event.lock().unwrap(); - if *sslkjsdf == None { - println!("Set on_going_write"); - *sslkjsdf = Some((tt, path)); + let tt = Instant::now() + Duration::from_secs(2); + let mut on_going_write_event = timer.on_going_write_event.lock().unwrap(); + if *on_going_write_event == None { + *on_going_write_event = Some((tt, path)); } } diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 4afd463c..4ce1de98 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -32,16 +32,16 @@ struct ScheduleWorker { impl ScheduleWorker { fn fire_due_events(&self, now: Instant) -> Option { let mut events = self.events.lock().unwrap(); - let mut write_options = self.worker_on_going_write_event.lock().unwrap(); + let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); let mut emitted = false; - if let Some(ref mut i) = *write_options { + if let Some(ref i) = *on_going_write_event { if i.0 <= now { - self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); + let _ = self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); emitted = true; } } if emitted { - *write_options = None; + *on_going_write_event = None; } while let Some(event) = events.pop_front() { if event.when <= now { @@ -70,8 +70,8 @@ impl ScheduleWorker { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), Some(op::Op::WRITE) => { println!("Stopping on_going_write"); - let mut alkjsh = self.worker_on_going_write_event.lock().unwrap(); - *alkjsh = None; + let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); + *on_going_write_event = None; Some(DebouncedEvent::Write(path)) }, Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)), From 339b0fce2342047e88b98890e788a2738535446a Mon Sep 17 00:00:00 2001 From: Sugs Date: Sat, 2 Mar 2019 15:42:02 -0500 Subject: [PATCH 05/16] Scheduling on_going_write in WatchTimer --- src/debounce/mod.rs | 10 +--------- src/debounce/timer.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 591e3e36..6c1acfbe 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -250,7 +250,7 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); - set_on_going_write_timer(path.clone(), &mut self.timer); + self.timer.set_on_going_write_timer(path.clone()); } // upgrade to write event @@ -509,11 +509,3 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim } *timer_id = Some(timer.schedule(path)); } - -fn set_on_going_write_timer(path: PathBuf, timer: &mut WatchTimer) { - let tt = Instant::now() + Duration::from_secs(2); - let mut on_going_write_event = timer.on_going_write_event.lock().unwrap(); - if *on_going_write_event == None { - *on_going_write_event = Some((tt, path)); - } -} diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 4ce1de98..e68cbff4 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -177,6 +177,14 @@ impl WatchTimer { } } + pub fn set_on_going_write_timer(&self, path: PathBuf) { + let tt = Instant::now() + Duration::from_secs(2); + let mut on_going_write_event = self.on_going_write_event.lock().unwrap(); + if *on_going_write_event == None { + *on_going_write_event = Some((tt, path)); + } + } + pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); From c5d86c5574f8d6b8800384099e5276df3eb0407b Mon Sep 17 00:00:00 2001 From: Sugs Date: Sat, 2 Mar 2019 17:49:08 -0500 Subject: [PATCH 06/16] 1) Created new EventLoopMsg, OnGoingWrite which will be shoved down EventLoop's channel 2) This will then set on_going_write's delay in WatchTimer. --- src/debounce/mod.rs | 7 ++++++- src/debounce/timer.rs | 47 +++++++++++++++++++++++++++---------------- src/inotify.rs | 11 ++++++++++ src/lib.rs | 3 +++ src/null.rs | 4 ++++ src/poll.rs | 4 ++++ 6 files changed, 58 insertions(+), 18 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 6c1acfbe..b523a568 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -97,6 +97,11 @@ impl Debounce { } } + pub fn set_on_going_write_duration(&mut self, duration: Duration) { + self.timer.set_on_going_write_duration(duration); + } + + fn check_partial_rename(&mut self, path: PathBuf, op: op::Op, cookie: Option) { if let Ok(mut op_buf) = self.operations_buffer.lock() { // the previous event was a rename event, but this one isn't; something went wrong @@ -250,7 +255,7 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); - self.timer.set_on_going_write_timer(path.clone()); + self.timer.schedule_on_going_write_event(path.clone()); } // upgrade to write event diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index e68cbff4..6497afc0 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -31,18 +31,8 @@ struct ScheduleWorker { impl ScheduleWorker { fn fire_due_events(&self, now: Instant) -> Option { + self.fire_on_going_write_event(now); let mut events = self.events.lock().unwrap(); - let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); - let mut emitted = false; - if let Some(ref i) = *on_going_write_event { - if i.0 <= now { - let _ = self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); - emitted = true; - } - } - if emitted { - *on_going_write_event = None; - } while let Some(event) = events.pop_front() { if event.when <= now { self.fire_event(event) @@ -56,6 +46,20 @@ impl ScheduleWorker { None } + fn fire_on_going_write_event(&self, now: Instant) { + let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); + let mut emitted = false; + if let Some(ref i) = *on_going_write_event { + if i.0 <= now { + let _ = self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); + emitted = true; + } + } + if emitted { + *on_going_write_event = None; + } + } + fn fire_event(&self, ev: ScheduledEvent) { let ScheduledEvent { path, .. } = ev; if let Ok(ref mut op_buf) = self.operations_buffer.lock() { @@ -69,7 +73,7 @@ impl ScheduleWorker { let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), Some(op::Op::WRITE) => { - println!("Stopping on_going_write"); + //disable on_going_write let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); *on_going_write_event = None; Some(DebouncedEvent::Write(path)) @@ -134,6 +138,7 @@ pub struct WatchTimer { events: Arc>>, stopped: Arc, pub on_going_write_event: Arc>>, + pub on_going_write_duration: Option, } impl WatchTimer { @@ -174,14 +179,22 @@ impl WatchTimer { events, stopped, on_going_write_event, + on_going_write_duration: None, } } - pub fn set_on_going_write_timer(&self, path: PathBuf) { - let tt = Instant::now() + Duration::from_secs(2); - let mut on_going_write_event = self.on_going_write_event.lock().unwrap(); - if *on_going_write_event == None { - *on_going_write_event = Some((tt, path)); + pub fn set_on_going_write_duration(&mut self, duration: Duration) { + println!("set_on_going_write_duration in WatchTimer {:?}", duration); + self.on_going_write_duration = Some(duration); + } + + pub fn schedule_on_going_write_event(&self, path: PathBuf) { + if let Some(duration) = self.on_going_write_duration { + let tt = Instant::now() + duration; + let mut on_going_write_event = self.on_going_write_event.lock().unwrap(); + if *on_going_write_event == None { + *on_going_write_event = Some((tt, path)); + } } } diff --git a/src/inotify.rs b/src/inotify.rs index 084fecfb..f801bce4 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -54,6 +54,7 @@ enum EventLoopMsg { RemoveWatch(PathBuf, Sender>), Shutdown, RenameTimeout(u32), + OnGoingWriteDelay(Duration), } #[inline] @@ -201,6 +202,11 @@ impl EventLoop { send_pending_rename_event(&mut self.rename_event, &mut self.event_tx); } } + EventLoopMsg::OnGoingWriteDelay(duration) => { + if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx { + debounce.set_on_going_write_duration(duration); + } + } } } } @@ -457,6 +463,11 @@ impl Watcher for INotifyWatcher { Ok(INotifyWatcher(Mutex::new(channel))) } + fn set_on_going_write_duration(&self, duration: Duration) { + let msg = EventLoopMsg::OnGoingWriteDelay(duration); + self.0.lock().unwrap().send(msg).unwrap(); + } + fn watch>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> { let pb = if path.as_ref().is_absolute() { path.as_ref().to_owned() diff --git a/src/lib.rs b/src/lib.rs index b5f220fc..cedcdfe8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -594,6 +594,9 @@ pub trait Watcher: Sized { /// still being written to. fn new(tx: Sender, delay: Duration) -> Result; + ///dummy doc + fn set_on_going_write_duration(&self, duration: Duration); + /// Begin watching a new path. /// /// If the `path` is a directory, `recursive_mode` will be evaluated. If `recursive_mode` is diff --git a/src/null.rs b/src/null.rs index 3c23a0e1..1a0cc284 100644 --- a/src/null.rs +++ b/src/null.rs @@ -28,4 +28,8 @@ impl Watcher for NullWatcher { fn unwatch>(&mut self, path: P) -> Result<()> { Ok(()) } + + fn set_on_going_write_duration(&self, duration: Duration) { + + } } diff --git a/src/poll.rs b/src/poll.rs index f4344b7f..66f69c52 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -300,6 +300,10 @@ impl Watcher for PollWatcher { Err(Error::WatchNotFound) } } + + fn set_on_going_write_duration(&self, duration: Duration) { + + } } impl Drop for PollWatcher { From 7052a854ad4faffc9ec084c7a8b2de4a229a6485 Mon Sep 17 00:00:00 2001 From: Sugs Date: Mon, 4 Mar 2019 18:20:38 -0500 Subject: [PATCH 07/16] Firing on_going_write from Debounce itself. --- src/debounce/mod.rs | 32 +++++++++++++++++++++++++++++++- src/debounce/timer.rs | 28 +++++++++++++++++++++++----- 2 files changed, 54 insertions(+), 6 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index b523a568..31c3d4f8 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -255,7 +255,9 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); - self.timer.schedule_on_going_write_event(path.clone()); + //self.timer.schedule_on_going_write_event(path.clone()); + + handle_on_going_write_event(&self.timer, path.clone(), &self.tx); } // upgrade to write event @@ -514,3 +516,31 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim } *timer_id = Some(timer.schedule(path)); } + +fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender) { + let mut on_going_write_event = timer.on_going_write_event.lock().unwrap(); + let mut emitted = false; + let mut to_be_scheduled = false; + if let Some(ref i) = *on_going_write_event { + let now = Instant::now(); + if i.0 <= now { + //fire event + let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); + emitted = true; + } + } else { + //schedule event + if let Some(duration) = timer.on_going_write_duration { + to_be_scheduled = true; + } + } + + if to_be_scheduled { + let duration = timer.on_going_write_duration.unwrap(); + let tt = Instant::now() + duration; + *on_going_write_event = Some((tt, path)); + } + if emitted { + *on_going_write_event = None; + } +} diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 6497afc0..bc791cbc 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -31,7 +31,7 @@ struct ScheduleWorker { impl ScheduleWorker { fn fire_due_events(&self, now: Instant) -> Option { - self.fire_on_going_write_event(now); + //self.fire_on_going_write_event(now); let mut events = self.events.lock().unwrap(); while let Some(event) = events.pop_front() { if event.when <= now { @@ -46,7 +46,7 @@ impl ScheduleWorker { None } - fn fire_on_going_write_event(&self, now: Instant) { + /*fn fire_on_going_write_event(&self, now: Instant) { let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); let mut emitted = false; if let Some(ref i) = *on_going_write_event { @@ -58,7 +58,7 @@ impl ScheduleWorker { if emitted { *on_going_write_event = None; } - } + }*/ fn fire_event(&self, ev: ScheduledEvent) { let ScheduledEvent { path, .. } = ev; @@ -183,12 +183,30 @@ impl WatchTimer { } } + /*fn fire_on_going_write_event(&self, now: Instant) { + let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); + let mut emitted = false; + if let Some(ref i) = *on_going_write_event { + if i.0 <= now { + let _ = self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); + emitted = true; + } + } + if emitted { + *on_going_write_event = None; + } + }*/ + + pub fn get_on_going_write_duration(&self) -> Option { + self.on_going_write_duration + } + pub fn set_on_going_write_duration(&mut self, duration: Duration) { println!("set_on_going_write_duration in WatchTimer {:?}", duration); self.on_going_write_duration = Some(duration); } - pub fn schedule_on_going_write_event(&self, path: PathBuf) { + /*pub fn schedule_on_going_write_event(&self, path: PathBuf) { if let Some(duration) = self.on_going_write_duration { let tt = Instant::now() + duration; let mut on_going_write_event = self.on_going_write_event.lock().unwrap(); @@ -196,7 +214,7 @@ impl WatchTimer { *on_going_write_event = Some((tt, path)); } } - } + }*/ pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); From 4e6beb389d71801c307130d4a7fc892c9b408423 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Mon, 4 Mar 2019 18:47:16 -0500 Subject: [PATCH 08/16] 1) Adding macos changes. 2) Refactoring. --- src/debounce/mod.rs | 2 +- src/debounce/timer.rs | 4 ---- src/fsevent.rs | 7 +++++++ src/inotify.rs | 10 +++++----- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 31c3d4f8..086c2a61 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -530,7 +530,7 @@ fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sen } } else { //schedule event - if let Some(duration) = timer.on_going_write_duration { + if let Some(_) = timer.on_going_write_duration { to_be_scheduled = true; } } diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index bc791cbc..eb75cb66 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -197,10 +197,6 @@ impl WatchTimer { } }*/ - pub fn get_on_going_write_duration(&self) -> Option { - self.on_going_write_duration - } - pub fn set_on_going_write_duration(&mut self, duration: Duration) { println!("set_on_going_write_duration in WatchTimer {:?}", duration); self.on_going_write_duration = Some(duration); diff --git a/src/fsevent.rs b/src/fsevent.rs index 9dfd2585..537494a2 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -394,6 +394,13 @@ impl Watcher for FsEventWatcher { let _ = self.run(); result } + + fn set_on_going_write_duration(&self, duration: Duration) { + let mut debounced_event = self.event_tx.lock().unwrap(); + if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { + debounce.set_on_going_write_duration(duration); + } + } } impl Drop for FsEventWatcher { diff --git a/src/inotify.rs b/src/inotify.rs index f801bce4..a6f00d1c 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -463,11 +463,6 @@ impl Watcher for INotifyWatcher { Ok(INotifyWatcher(Mutex::new(channel))) } - fn set_on_going_write_duration(&self, duration: Duration) { - let msg = EventLoopMsg::OnGoingWriteDelay(duration); - self.0.lock().unwrap().send(msg).unwrap(); - } - fn watch>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> { let pb = if path.as_ref().is_absolute() { path.as_ref().to_owned() @@ -497,6 +492,11 @@ impl Watcher for INotifyWatcher { self.0.lock().unwrap().send(msg).unwrap(); rx.recv().unwrap() } + + fn set_on_going_write_duration(&self, duration: Duration) { + let msg = EventLoopMsg::OnGoingWriteDelay(duration); + self.0.lock().unwrap().send(msg).unwrap(); + } } impl Drop for INotifyWatcher { From f659050b3cecd1e53ea70476028459d23bc63d22 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Mon, 4 Mar 2019 18:57:41 -0500 Subject: [PATCH 09/16] 1) Refactoring and docs. --- src/lib.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cedcdfe8..824bd6b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -355,9 +355,8 @@ mod op_test { #[test] fn old_bitflags_form() { - let op = super::op::CHMOD | super::op::WRITE | super::op::ON_GOING_WRITE; + let op = super::op::CHMOD | super::op::WRITE; assert!(op.contains(super::op::WRITE)); - assert!(op.contains(super::op::ON_GOING_WRITE)); } } @@ -428,9 +427,6 @@ pub enum DebouncedEvent { /// watched directory. Write(PathBuf), - /// some doc - OnGoingWrite(PathBuf), - /// `Chmod` is emitted when attributes have been changed and no events were detected for the /// path within the specified time frame. Chmod(PathBuf), @@ -453,6 +449,9 @@ pub enum DebouncedEvent { /// /// This event may contain a path for which the error was detected. Error(Error, Option), + + /// Event emitted when a file being watched is to be tailed. + OnGoingWrite(PathBuf), } impl PartialEq for DebouncedEvent { @@ -594,9 +593,6 @@ pub trait Watcher: Sized { /// still being written to. fn new(tx: Sender, delay: Duration) -> Result; - ///dummy doc - fn set_on_going_write_duration(&self, duration: Duration); - /// Begin watching a new path. /// /// If the `path` is a directory, `recursive_mode` will be evaluated. If `recursive_mode` is @@ -621,6 +617,10 @@ pub trait Watcher: Sized { /// Returns an error in the case that `path` has not been watched or if removing the watch /// fails. fn unwatch>(&mut self, path: P) -> Result<()>; + + ///Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be + /// fired every "duration" units. + fn set_on_going_write_duration(&self, duration: Duration); } /// The recommended `Watcher` implementation for the current platform From c531f72ead9f8c48e41a1917afe1b655c4dba7cc Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Mon, 4 Mar 2019 18:58:49 -0500 Subject: [PATCH 10/16] 1) Removing println. --- src/debounce/timer.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index eb75cb66..2bb5fb59 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -198,7 +198,6 @@ impl WatchTimer { }*/ pub fn set_on_going_write_duration(&mut self, duration: Duration) { - println!("set_on_going_write_duration in WatchTimer {:?}", duration); self.on_going_write_duration = Some(duration); } From d3e2ad177c079e56a46ebcb25a099b9db77997ac Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Tue, 5 Mar 2019 21:04:58 -0500 Subject: [PATCH 11/16] - Implementing on_going_write event in Windows. --- src/windows.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/windows.rs b/src/windows.rs index 4b28333f..f4750594 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -54,6 +54,7 @@ enum Action { Watch(PathBuf, RecursiveMode), Unwatch(PathBuf), Stop, + SetOnGoingWriteEventDuration(Duration), } pub enum MetaEvent { @@ -118,6 +119,12 @@ impl ReadDirectoryChangesServer { stop_watch(ws, &self.meta_tx); } break; + }, + Action::SetOnGoingWriteEventDuration(duration) => { + let mut debounced_event = self.event_tx.lock().unwrap(); + if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { + debounce.set_on_going_write_duration(duration); + } } } } @@ -562,6 +569,10 @@ impl Watcher for ReadDirectoryChangesWatcher { self.wakeup_server(); res } + + fn set_on_going_write_duration(&self, duration: Duration) { + self.tx.send(Action::SetOnGoingWriteEventDuration(duration)); + } } impl Drop for ReadDirectoryChangesWatcher { From ccd1a15bb33c5f229aa5c4d086ee6f029283d3e1 Mon Sep 17 00:00:00 2001 From: "joshi.sujit020386@gmail.com" Date: Thu, 7 Mar 2019 20:42:02 -0500 Subject: [PATCH 12/16] Removing comments. --- src/debounce/mod.rs | 3 --- src/debounce/timer.rs | 39 --------------------------------------- 2 files changed, 42 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 086c2a61..7015eb69 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -255,8 +255,6 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); - //self.timer.schedule_on_going_write_event(path.clone()); - handle_on_going_write_event(&self.timer, path.clone(), &self.tx); } @@ -271,7 +269,6 @@ impl Debounce { *operation = Some(op::Op::WRITE); let _ = self.tx.send(DebouncedEvent::NoticeWrite(path.clone())); restart_timer(timer_id, path.clone(), &mut self.timer); - //set_on_going_write_timer(path.clone(), &mut self.timer, self.on_going_write_duration); } // writing to a deleted file is impossible, diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 2bb5fb59..1aa2cc99 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -31,7 +31,6 @@ struct ScheduleWorker { impl ScheduleWorker { fn fire_due_events(&self, now: Instant) -> Option { - //self.fire_on_going_write_event(now); let mut events = self.events.lock().unwrap(); while let Some(event) = events.pop_front() { if event.when <= now { @@ -46,20 +45,6 @@ impl ScheduleWorker { None } - /*fn fire_on_going_write_event(&self, now: Instant) { - let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); - let mut emitted = false; - if let Some(ref i) = *on_going_write_event { - if i.0 <= now { - let _ = self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); - emitted = true; - } - } - if emitted { - *on_going_write_event = None; - } - }*/ - fn fire_event(&self, ev: ScheduledEvent) { let ScheduledEvent { path, .. } = ev; if let Ok(ref mut op_buf) = self.operations_buffer.lock() { @@ -183,34 +168,10 @@ impl WatchTimer { } } - /*fn fire_on_going_write_event(&self, now: Instant) { - let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); - let mut emitted = false; - if let Some(ref i) = *on_going_write_event { - if i.0 <= now { - let _ = self.tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); - emitted = true; - } - } - if emitted { - *on_going_write_event = None; - } - }*/ - pub fn set_on_going_write_duration(&mut self, duration: Duration) { self.on_going_write_duration = Some(duration); } - /*pub fn schedule_on_going_write_event(&self, path: PathBuf) { - if let Some(duration) = self.on_going_write_duration { - let tt = Instant::now() + duration; - let mut on_going_write_event = self.on_going_write_event.lock().unwrap(); - if *on_going_write_event == None { - *on_going_write_event = Some((tt, path)); - } - } - }*/ - pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); From a2ebcfd7002de64e3b3f6c0a2a932a9fa23dcb52 Mon Sep 17 00:00:00 2001 From: Sugs Date: Fri, 22 Mar 2019 17:30:19 -0400 Subject: [PATCH 13/16] Changes as per code review. --- src/lib.rs | 6 ++++-- src/null.rs | 4 ---- src/poll.rs | 4 ---- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 824bd6b6..4e06bc9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -618,9 +618,11 @@ pub trait Watcher: Sized { /// fails. fn unwatch>(&mut self, path: P) -> Result<()>; - ///Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be + /// Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be /// fired every "duration" units. - fn set_on_going_write_duration(&self, duration: Duration); + fn set_on_going_write_duration(&self, duration: Duration) { + // null and poll watchers are not required to implement this. + } } /// The recommended `Watcher` implementation for the current platform diff --git a/src/null.rs b/src/null.rs index 1a0cc284..3c23a0e1 100644 --- a/src/null.rs +++ b/src/null.rs @@ -28,8 +28,4 @@ impl Watcher for NullWatcher { fn unwatch>(&mut self, path: P) -> Result<()> { Ok(()) } - - fn set_on_going_write_duration(&self, duration: Duration) { - - } } diff --git a/src/poll.rs b/src/poll.rs index 66f69c52..f4344b7f 100644 --- a/src/poll.rs +++ b/src/poll.rs @@ -300,10 +300,6 @@ impl Watcher for PollWatcher { Err(Error::WatchNotFound) } } - - fn set_on_going_write_duration(&self, duration: Duration) { - - } } impl Drop for PollWatcher { From a6f342d5ce3faf7864ede4b5021c0ebb034db500 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Fri, 22 Mar 2019 19:08:52 -0400 Subject: [PATCH 14/16] Refactoring as per code review. --- src/debounce/mod.rs | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 7015eb69..bd9f3bf0 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -516,28 +516,19 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender) { let mut on_going_write_event = timer.on_going_write_event.lock().unwrap(); - let mut emitted = false; - let mut to_be_scheduled = false; + let mut event_details = Option::None; if let Some(ref i) = *on_going_write_event { let now = Instant::now(); if i.0 <= now { //fire event let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); - emitted = true; } } else { //schedule event - if let Some(_) = timer.on_going_write_duration { - to_be_scheduled = true; + if let Some(d) = timer.on_going_write_duration { + let fire_at = Instant::now() + d; + event_details = Some((fire_at, path)); } } - - if to_be_scheduled { - let duration = timer.on_going_write_duration.unwrap(); - let tt = Instant::now() + duration; - *on_going_write_event = Some((tt, path)); - } - if emitted { - *on_going_write_event = None; - } + *on_going_write_event = event_details; } From 4a69c4be0af47cb5da6a30dd58099a777b05c233 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Sat, 23 Mar 2019 14:07:04 -0400 Subject: [PATCH 15/16] Renaming "on_going" to "ongoing" --- src/debounce/mod.rs | 16 ++++++++-------- src/debounce/timer.rs | 26 +++++++++++++------------- src/fsevent.rs | 4 ++-- src/inotify.rs | 4 ++-- src/lib.rs | 6 +++--- src/windows.rs | 4 ++-- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index bd9f3bf0..f035e253 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -97,8 +97,8 @@ impl Debounce { } } - pub fn set_on_going_write_duration(&mut self, duration: Duration) { - self.timer.set_on_going_write_duration(duration); + pub fn set_ongoing_write_duration(&mut self, duration: Duration) { + self.timer.set_ongoing_write_duration(duration); } @@ -255,7 +255,7 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); - handle_on_going_write_event(&self.timer, path.clone(), &self.tx); + handle_ongoing_write_event(&self.timer, path.clone(), &self.tx); } // upgrade to write event @@ -514,10 +514,10 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim *timer_id = Some(timer.schedule(path)); } -fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender) { - let mut on_going_write_event = timer.on_going_write_event.lock().unwrap(); +fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender) { + let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap(); let mut event_details = Option::None; - if let Some(ref i) = *on_going_write_event { + if let Some(ref i) = *ongoing_write_event { let now = Instant::now(); if i.0 <= now { //fire event @@ -525,10 +525,10 @@ fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sen } } else { //schedule event - if let Some(d) = timer.on_going_write_duration { + if let Some(d) = timer.ongoing_write_duration { let fire_at = Instant::now() + d; event_details = Some((fire_at, path)); } } - *on_going_write_event = event_details; + *ongoing_write_event = event_details; } diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 1aa2cc99..d4c0fb41 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -26,7 +26,7 @@ struct ScheduleWorker { tx: mpsc::Sender, operations_buffer: OperationsBuffer, stopped: Arc, - worker_on_going_write_event: Arc>>, + worker_ongoing_write_event: Arc>>, } impl ScheduleWorker { @@ -58,9 +58,9 @@ impl ScheduleWorker { let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), Some(op::Op::WRITE) => { - //disable on_going_write - let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap(); - *on_going_write_event = None; + //disable ongoing_write + let mut ongoing_write_event = self.worker_ongoing_write_event.lock().unwrap(); + *ongoing_write_event = None; Some(DebouncedEvent::Write(path)) }, Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)), @@ -122,8 +122,8 @@ pub struct WatchTimer { delay: Duration, events: Arc>>, stopped: Arc, - pub on_going_write_event: Arc>>, - pub on_going_write_duration: Option, + pub ongoing_write_event: Arc>>, + pub ongoing_write_duration: Option, } impl WatchTimer { @@ -141,8 +141,8 @@ impl WatchTimer { let worker_stop_trigger = stop_trigger.clone(); let worker_events = events.clone(); let worker_stopped = stopped.clone(); - let on_going_write_event = Arc::new(Mutex::new(None)); - let worker_on_going_write_event = on_going_write_event.clone(); + let ongoing_write_event = Arc::new(Mutex::new(None)); + let worker_ongoing_write_event = ongoing_write_event.clone(); thread::spawn(move || { ScheduleWorker { new_event_trigger: worker_new_event_trigger, @@ -151,7 +151,7 @@ impl WatchTimer { tx, operations_buffer, stopped: worker_stopped, - worker_on_going_write_event: worker_on_going_write_event, + worker_ongoing_write_event, } .run(); }); @@ -163,13 +163,13 @@ impl WatchTimer { delay, events, stopped, - on_going_write_event, - on_going_write_duration: None, + ongoing_write_event, + ongoing_write_duration: None, } } - pub fn set_on_going_write_duration(&mut self, duration: Duration) { - self.on_going_write_duration = Some(duration); + pub fn set_ongoing_write_duration(&mut self, duration: Duration) { + self.ongoing_write_duration = Some(duration); } pub fn schedule(&mut self, path: PathBuf) -> u64 { diff --git a/src/fsevent.rs b/src/fsevent.rs index 537494a2..de369724 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -395,10 +395,10 @@ impl Watcher for FsEventWatcher { result } - fn set_on_going_write_duration(&self, duration: Duration) { + fn set_ongoing_write_duration(&self, duration: Duration) { let mut debounced_event = self.event_tx.lock().unwrap(); if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { - debounce.set_on_going_write_duration(duration); + debounce.set_ongoing_write_duration(duration); } } } diff --git a/src/inotify.rs b/src/inotify.rs index a6f00d1c..3e2b7d35 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -204,7 +204,7 @@ impl EventLoop { } EventLoopMsg::OnGoingWriteDelay(duration) => { if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx { - debounce.set_on_going_write_duration(duration); + debounce.set_ongoing_write_duration(duration); } } } @@ -493,7 +493,7 @@ impl Watcher for INotifyWatcher { rx.recv().unwrap() } - fn set_on_going_write_duration(&self, duration: Duration) { + fn set_ongoing_write_duration(&self, duration: Duration) { let msg = EventLoopMsg::OnGoingWriteDelay(duration); self.0.lock().unwrap().send(msg).unwrap(); } diff --git a/src/lib.rs b/src/lib.rs index 4e06bc9b..26cf14db 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -323,7 +323,7 @@ pub mod op { const CLOSE_WRITE = 0b00100000; /// Directories need to be rescanned const RESCAN = 0b01000000; - const ON_GOING_WRITE = 0b10000000; + const ONGOING_WRITE = 0b10000000; } } @@ -334,7 +334,7 @@ pub mod op { pub const WRITE: Op = Op::WRITE; pub const CLOSE_WRITE: Op = Op::CLOSE_WRITE; pub const RESCAN: Op = Op::RESCAN; - pub const ON_GOING_WRITE: Op = Op::ON_GOING_WRITE; + pub const ONGOING_WRITE: Op = Op::ONGOING_WRITE; } #[cfg(test)] @@ -620,7 +620,7 @@ pub trait Watcher: Sized { /// Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be /// fired every "duration" units. - fn set_on_going_write_duration(&self, duration: Duration) { + fn set_ongoing_write_duration(&self, duration: Duration) { // null and poll watchers are not required to implement this. } } diff --git a/src/windows.rs b/src/windows.rs index f4750594..242f6dbd 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -123,7 +123,7 @@ impl ReadDirectoryChangesServer { Action::SetOnGoingWriteEventDuration(duration) => { let mut debounced_event = self.event_tx.lock().unwrap(); if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { - debounce.set_on_going_write_duration(duration); + debounce.set_ongoing_write_duration(duration); } } } @@ -570,7 +570,7 @@ impl Watcher for ReadDirectoryChangesWatcher { res } - fn set_on_going_write_duration(&self, duration: Duration) { + fn set_ongoing_write_duration(&self, duration: Duration) { self.tx.send(Action::SetOnGoingWriteEventDuration(duration)); } } From 8c482a0db6aad50f6f9e80475295278d04577b39 Mon Sep 17 00:00:00 2001 From: Sujit Joshi Date: Sat, 23 Mar 2019 19:02:37 -0400 Subject: [PATCH 16/16] Documentation and some more refactoring. --- src/debounce/mod.rs | 12 +++++++++--- src/debounce/timer.rs | 4 ++-- src/fsevent.rs | 7 ++++--- src/inotify.rs | 13 +++++++------ src/lib.rs | 37 ++++++++++++++++++++++++------------- src/windows.rs | 13 +++++++------ 6 files changed, 53 insertions(+), 33 deletions(-) diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index f035e253..25e7abbb 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -2,7 +2,7 @@ mod timer; -use super::{op, DebouncedEvent, RawEvent}; +use super::{op, DebouncedEvent, RawEvent, Config}; use self::timer::WatchTimer; @@ -97,8 +97,12 @@ impl Debounce { } } - pub fn set_ongoing_write_duration(&mut self, duration: Duration) { - self.timer.set_ongoing_write_duration(duration); + pub fn configure_debounced_mode(&mut self, config: Config) { + match config { + Config::OngoingWrites(c) => { + self.timer.set_ongoing_write_duration(c); + } + } } @@ -522,6 +526,8 @@ fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Send if i.0 <= now { //fire event let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); + } else { + event_details = Some((i.0, i.1.clone())); } } else { //schedule event diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index d4c0fb41..dbe60d05 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -168,8 +168,8 @@ impl WatchTimer { } } - pub fn set_ongoing_write_duration(&mut self, duration: Duration) { - self.ongoing_write_duration = Some(duration); + pub fn set_ongoing_write_duration(&mut self, duration: Option) { + self.ongoing_write_duration = duration; } pub fn schedule(&mut self, path: PathBuf) -> u64 { diff --git a/src/fsevent.rs b/src/fsevent.rs index de369724..e0de51e2 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -14,7 +14,7 @@ extern crate fsevent as fse; use super::debounce::{Debounce, EventTx}; -use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher}; +use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher, Config}; use fsevent_sys::core_foundation as cf; use fsevent_sys::fsevent as fs; use libc; @@ -395,11 +395,12 @@ impl Watcher for FsEventWatcher { result } - fn set_ongoing_write_duration(&self, duration: Duration) { + fn configure(&self, config: Config) -> Result<()> { let mut debounced_event = self.event_tx.lock().unwrap(); if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { - debounce.set_ongoing_write_duration(duration); + debounce.configure_debounced_mode(config); } + Ok(()) } } diff --git a/src/inotify.rs b/src/inotify.rs index 3e2b7d35..a8b96f51 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -11,7 +11,7 @@ extern crate walkdir; use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask}; use self::walkdir::WalkDir; use super::debounce::{Debounce, EventTx}; -use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; +use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config}; use mio; use mio_extras; use std::collections::HashMap; @@ -54,7 +54,7 @@ enum EventLoopMsg { RemoveWatch(PathBuf, Sender>), Shutdown, RenameTimeout(u32), - OnGoingWriteDelay(Duration), + Configure(Config), } #[inline] @@ -202,9 +202,9 @@ impl EventLoop { send_pending_rename_event(&mut self.rename_event, &mut self.event_tx); } } - EventLoopMsg::OnGoingWriteDelay(duration) => { + EventLoopMsg::Configure(config) => { if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx { - debounce.set_ongoing_write_duration(duration); + debounce.configure_debounced_mode(config); } } } @@ -493,9 +493,10 @@ impl Watcher for INotifyWatcher { rx.recv().unwrap() } - fn set_ongoing_write_duration(&self, duration: Duration) { - let msg = EventLoopMsg::OnGoingWriteDelay(duration); + fn configure(&self, config: Config) -> Result<()> { + let msg = EventLoopMsg::Configure(config); self.0.lock().unwrap().send(msg).unwrap(); + Ok(()) } } diff --git a/src/lib.rs b/src/lib.rs index 26cf14db..feae9c2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -310,20 +310,19 @@ pub mod op { /// Multiple actions may be delivered in a single event. pub struct Op: u32 { /// Attributes changed - const CHMOD = 0b00000001; + const CHMOD = 0b0000001; /// Created - const CREATE = 0b00000010; + const CREATE = 0b0000010; /// Removed - const REMOVE = 0b00000100; + const REMOVE = 0b0000100; /// Renamed - const RENAME = 0b00001000; + const RENAME = 0b0001000; /// Written - const WRITE = 0b00010000; + const WRITE = 0b0010000; /// File opened for writing was closed - const CLOSE_WRITE = 0b00100000; + const CLOSE_WRITE = 0b0100000; /// Directories need to be rescanned - const RESCAN = 0b01000000; - const ONGOING_WRITE = 0b10000000; + const RESCAN = 0b1000000; } } @@ -334,7 +333,6 @@ pub mod op { pub const WRITE: Op = Op::WRITE; pub const CLOSE_WRITE: Op = Op::CLOSE_WRITE; pub const RESCAN: Op = Op::RESCAN; - pub const ONGOING_WRITE: Op = Op::ONGOING_WRITE; } #[cfg(test)] @@ -618,13 +616,26 @@ pub trait Watcher: Sized { /// fails. fn unwatch>(&mut self, path: P) -> Result<()>; - /// Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be - /// fired every "duration" units. - fn set_ongoing_write_duration(&self, duration: Duration) { - // null and poll watchers are not required to implement this. + /// Configure notify with Configs. + fn configure(&self, option: Config) -> Result<()> { + // Default implementation because null and poll watcher are not configurable (but can be in future) + Ok(()) } } +/// Configurations that can be used when watching a file/directory. +pub enum Config { + /// In debounced mode a WRITE event is fired every X unit of time if no WRITE occurs before X. + /// But in some scenarios (like when tailing a file) we would never receive the WRITE event + /// because the watchee is being written to every Y unit of time where Y < X. + /// Use this config to let notify emit DebouncedEvent::OnGoingWrite event before emitting a + /// WRITE event. Once a WRITE event is emitted notify will cancel OnGoingWrite (but still emit + /// OnGoingWrite in the future) + /// Hence the Duration of this config should be less than watchers delay. + /// To stop emitting OnGoingWrite, pass this config with None. + OngoingWrites(Option), +} + /// The recommended `Watcher` implementation for the current platform #[cfg(target_os = "linux")] pub type RecommendedWatcher = INotifyWatcher; diff --git a/src/windows.rs b/src/windows.rs index 242f6dbd..b9f6d2cb 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -16,7 +16,7 @@ use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0}; use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE}; use super::debounce::{Debounce, EventTx}; -use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; +use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config}; use std::collections::HashMap; use std::env; use std::ffi::OsString; @@ -54,7 +54,7 @@ enum Action { Watch(PathBuf, RecursiveMode), Unwatch(PathBuf), Stop, - SetOnGoingWriteEventDuration(Duration), + Configure(Config), } pub enum MetaEvent { @@ -120,10 +120,10 @@ impl ReadDirectoryChangesServer { } break; }, - Action::SetOnGoingWriteEventDuration(duration) => { + Action::Configure(config) => { let mut debounced_event = self.event_tx.lock().unwrap(); if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { - debounce.set_ongoing_write_duration(duration); + debounce.configure_debounced_mode(config); } } } @@ -570,8 +570,9 @@ impl Watcher for ReadDirectoryChangesWatcher { res } - fn set_ongoing_write_duration(&self, duration: Duration) { - self.tx.send(Action::SetOnGoingWriteEventDuration(duration)); + fn configure(&self, config: Config) -> Result<()> { + self.tx.send(Action::Configure(config)); + Ok(()) } }