diff --git a/src/debounce/mod.rs b/src/debounce/mod.rs index 53bed5b8..25e7abbb 100644 --- a/src/debounce/mod.rs +++ b/src/debounce/mod.rs @@ -2,7 +2,7 @@ mod timer; -use super::{op, DebouncedEvent, RawEvent}; +use super::{op, DebouncedEvent, RawEvent, Config}; use self::timer::WatchTimer; @@ -10,7 +10,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::mpsc; use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::time::{Duration, Instant}; pub type OperationsBuffer = Arc, Option, Option)>>>; @@ -97,6 +97,15 @@ impl Debounce { } } + pub fn configure_debounced_mode(&mut self, config: Config) { + match config { + Config::OngoingWrites(c) => { + self.timer.set_ongoing_write_duration(c); + } + } + } + + fn check_partial_rename(&mut self, path: PathBuf, op: op::Op, cookie: Option) { if let Ok(mut op_buf) = self.operations_buffer.lock() { // the previous event was a rename event, but this one isn't; something went wrong @@ -250,6 +259,7 @@ impl Debounce { // it already was a write event Some(op::Op::WRITE) => { restart_timer(timer_id, path.clone(), &mut self.timer); + handle_ongoing_write_event(&self.timer, path.clone(), &self.tx); } // upgrade to write event @@ -507,3 +517,24 @@ fn restart_timer(timer_id: &mut Option, path: PathBuf, timer: &mut WatchTim } *timer_id = Some(timer.schedule(path)); } + +fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender) { + let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap(); + let mut event_details = Option::None; + if let Some(ref i) = *ongoing_write_event { + let now = Instant::now(); + if i.0 <= now { + //fire event + let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone())); + } else { + event_details = Some((i.0, i.1.clone())); + } + } else { + //schedule event + if let Some(d) = timer.ongoing_write_duration { + let fire_at = Instant::now() + d; + event_details = Some((fire_at, path)); + } + } + *ongoing_write_event = event_details; +} diff --git a/src/debounce/timer.rs b/src/debounce/timer.rs index 2ec918c4..dbe60d05 100644 --- a/src/debounce/timer.rs +++ b/src/debounce/timer.rs @@ -26,6 +26,7 @@ struct ScheduleWorker { tx: mpsc::Sender, operations_buffer: OperationsBuffer, stopped: Arc, + worker_ongoing_write_event: Arc>>, } impl ScheduleWorker { @@ -56,7 +57,12 @@ impl ScheduleWorker { } let message = match op { Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)), - Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)), + Some(op::Op::WRITE) => { + //disable ongoing_write + let mut ongoing_write_event = self.worker_ongoing_write_event.lock().unwrap(); + *ongoing_write_event = None; + Some(DebouncedEvent::Write(path)) + }, Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)), Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)), Some(op::Op::RENAME) if is_partial_rename => { @@ -116,6 +122,8 @@ pub struct WatchTimer { delay: Duration, events: Arc>>, stopped: Arc, + pub ongoing_write_event: Arc>>, + pub ongoing_write_duration: Option, } impl WatchTimer { @@ -133,6 +141,8 @@ impl WatchTimer { let worker_stop_trigger = stop_trigger.clone(); let worker_events = events.clone(); let worker_stopped = stopped.clone(); + let ongoing_write_event = Arc::new(Mutex::new(None)); + let worker_ongoing_write_event = ongoing_write_event.clone(); thread::spawn(move || { ScheduleWorker { new_event_trigger: worker_new_event_trigger, @@ -141,6 +151,7 @@ impl WatchTimer { tx, operations_buffer, stopped: worker_stopped, + worker_ongoing_write_event, } .run(); }); @@ -152,9 +163,15 @@ impl WatchTimer { delay, events, stopped, + ongoing_write_event, + ongoing_write_duration: None, } } + pub fn set_ongoing_write_duration(&mut self, duration: Option) { + self.ongoing_write_duration = duration; + } + pub fn schedule(&mut self, path: PathBuf) -> u64 { self.counter = self.counter.wrapping_add(1); diff --git a/src/fsevent.rs b/src/fsevent.rs index 9dfd2585..e0de51e2 100644 --- a/src/fsevent.rs +++ b/src/fsevent.rs @@ -14,7 +14,7 @@ extern crate fsevent as fse; use super::debounce::{Debounce, EventTx}; -use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher}; +use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher, Config}; use fsevent_sys::core_foundation as cf; use fsevent_sys::fsevent as fs; use libc; @@ -394,6 +394,14 @@ impl Watcher for FsEventWatcher { let _ = self.run(); result } + + fn configure(&self, config: Config) -> Result<()> { + let mut debounced_event = self.event_tx.lock().unwrap(); + if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { + debounce.configure_debounced_mode(config); + } + Ok(()) + } } impl Drop for FsEventWatcher { diff --git a/src/inotify.rs b/src/inotify.rs index 084fecfb..a8b96f51 100644 --- a/src/inotify.rs +++ b/src/inotify.rs @@ -11,7 +11,7 @@ extern crate walkdir; use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask}; use self::walkdir::WalkDir; use super::debounce::{Debounce, EventTx}; -use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; +use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config}; use mio; use mio_extras; use std::collections::HashMap; @@ -54,6 +54,7 @@ enum EventLoopMsg { RemoveWatch(PathBuf, Sender>), Shutdown, RenameTimeout(u32), + Configure(Config), } #[inline] @@ -201,6 +202,11 @@ impl EventLoop { send_pending_rename_event(&mut self.rename_event, &mut self.event_tx); } } + EventLoopMsg::Configure(config) => { + if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx { + debounce.configure_debounced_mode(config); + } + } } } } @@ -486,6 +492,12 @@ impl Watcher for INotifyWatcher { self.0.lock().unwrap().send(msg).unwrap(); rx.recv().unwrap() } + + fn configure(&self, config: Config) -> Result<()> { + let msg = EventLoopMsg::Configure(config); + self.0.lock().unwrap().send(msg).unwrap(); + Ok(()) + } } impl Drop for INotifyWatcher { diff --git a/src/lib.rs b/src/lib.rs index 6f77e6d6..feae9c2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -348,6 +348,7 @@ mod op_test { fn new_bitflags_form() { let op = super::op::Op::CHMOD | super::op::Op::WRITE; assert!(op.contains(super::op::Op::WRITE)); + assert!(op.contains(super::op::Op::CHMOD)); } #[test] @@ -446,6 +447,9 @@ pub enum DebouncedEvent { /// /// This event may contain a path for which the error was detected. Error(Error, Option), + + /// Event emitted when a file being watched is to be tailed. + OnGoingWrite(PathBuf), } impl PartialEq for DebouncedEvent { @@ -611,6 +615,25 @@ pub trait Watcher: Sized { /// Returns an error in the case that `path` has not been watched or if removing the watch /// fails. fn unwatch>(&mut self, path: P) -> Result<()>; + + /// Configure notify with Configs. + fn configure(&self, option: Config) -> Result<()> { + // Default implementation because null and poll watcher are not configurable (but can be in future) + Ok(()) + } +} + +/// Configurations that can be used when watching a file/directory. +pub enum Config { + /// In debounced mode a WRITE event is fired every X unit of time if no WRITE occurs before X. + /// But in some scenarios (like when tailing a file) we would never receive the WRITE event + /// because the watchee is being written to every Y unit of time where Y < X. + /// Use this config to let notify emit DebouncedEvent::OnGoingWrite event before emitting a + /// WRITE event. Once a WRITE event is emitted notify will cancel OnGoingWrite (but still emit + /// OnGoingWrite in the future) + /// Hence the Duration of this config should be less than watchers delay. + /// To stop emitting OnGoingWrite, pass this config with None. + OngoingWrites(Option), } /// The recommended `Watcher` implementation for the current platform diff --git a/src/windows.rs b/src/windows.rs index 4b28333f..b9f6d2cb 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -16,7 +16,7 @@ use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0}; use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE}; use super::debounce::{Debounce, EventTx}; -use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher}; +use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config}; use std::collections::HashMap; use std::env; use std::ffi::OsString; @@ -54,6 +54,7 @@ enum Action { Watch(PathBuf, RecursiveMode), Unwatch(PathBuf), Stop, + Configure(Config), } pub enum MetaEvent { @@ -118,6 +119,12 @@ impl ReadDirectoryChangesServer { stop_watch(ws, &self.meta_tx); } break; + }, + Action::Configure(config) => { + let mut debounced_event = self.event_tx.lock().unwrap(); + if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event { + debounce.configure_debounced_mode(config); + } } } } @@ -562,6 +569,11 @@ impl Watcher for ReadDirectoryChangesWatcher { self.wakeup_server(); res } + + fn configure(&self, config: Config) -> Result<()> { + self.tx.send(Action::Configure(config)); + Ok(()) + } } impl Drop for ReadDirectoryChangesWatcher {