Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frequently written files may get debounced indefinitely #183

Merged
merged 16 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

mod timer;

use super::{op, DebouncedEvent, RawEvent};
use super::{op, DebouncedEvent, RawEvent, Config};

use self::timer::WatchTimer;

use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;
Expand Down Expand Up @@ -97,6 +97,15 @@ impl Debounce {
}
}

pub fn configure_debounced_mode(&mut self, config: Config) {
match config {
Config::OngoingWrites(c) => {
self.timer.set_ongoing_write_duration(c);
}
}
}


fn check_partial_rename(&mut self, path: PathBuf, op: op::Op, cookie: Option<u32>) {
if let Ok(mut op_buf) = self.operations_buffer.lock() {
// the previous event was a rename event, but this one isn't; something went wrong
Expand Down Expand Up @@ -250,6 +259,7 @@ impl Debounce {
// it already was a write event
Some(op::Op::WRITE) => {
restart_timer(timer_id, path.clone(), &mut self.timer);
handle_ongoing_write_event(&self.timer, path.clone(), &self.tx);
}

// upgrade to write event
Expand Down Expand Up @@ -507,3 +517,24 @@ fn restart_timer(timer_id: &mut Option<u64>, path: PathBuf, timer: &mut WatchTim
}
*timer_id = Some(timer.schedule(path));
}

fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender<DebouncedEvent>) {
let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap();
let mut event_details = Option::None;
if let Some(ref i) = *ongoing_write_event {
let now = Instant::now();
if i.0 <= now {
//fire event
let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone()));
} else {
event_details = Some((i.0, i.1.clone()));
}
} else {
//schedule event
if let Some(d) = timer.ongoing_write_duration {
let fire_at = Instant::now() + d;
event_details = Some((fire_at, path));
}
}
*ongoing_write_event = event_details;
}
19 changes: 18 additions & 1 deletion src/debounce/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct ScheduleWorker {
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
worker_ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
}

impl ScheduleWorker {
Expand Down Expand Up @@ -56,7 +57,12 @@ impl ScheduleWorker {
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::WRITE) => {
//disable ongoing_write
let mut ongoing_write_event = self.worker_ongoing_write_event.lock().unwrap();
*ongoing_write_event = None;
Some(DebouncedEvent::Write(path))
},
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
Expand Down Expand Up @@ -116,6 +122,8 @@ pub struct WatchTimer {
delay: Duration,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
stopped: Arc<AtomicBool>,
pub ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
pub ongoing_write_duration: Option<Duration>,
}

impl WatchTimer {
Expand All @@ -133,6 +141,8 @@ impl WatchTimer {
let worker_stop_trigger = stop_trigger.clone();
let worker_events = events.clone();
let worker_stopped = stopped.clone();
let ongoing_write_event = Arc::new(Mutex::new(None));
let worker_ongoing_write_event = ongoing_write_event.clone();
thread::spawn(move || {
ScheduleWorker {
new_event_trigger: worker_new_event_trigger,
Expand All @@ -141,6 +151,7 @@ impl WatchTimer {
tx,
operations_buffer,
stopped: worker_stopped,
worker_ongoing_write_event,
}
.run();
});
Expand All @@ -152,9 +163,15 @@ impl WatchTimer {
delay,
events,
stopped,
ongoing_write_event,
ongoing_write_duration: None,
}
}

pub fn set_ongoing_write_duration(&mut self, duration: Option<Duration>) {
self.ongoing_write_duration = duration;
}

pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);

Expand Down
10 changes: 9 additions & 1 deletion src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
extern crate fsevent as fse;

use super::debounce::{Debounce, EventTx};
use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher};
use super::{op, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher, Config};
use fsevent_sys::core_foundation as cf;
use fsevent_sys::fsevent as fs;
use libc;
Expand Down Expand Up @@ -394,6 +394,14 @@ impl Watcher for FsEventWatcher {
let _ = self.run();
result
}

fn configure(&self, config: Config) -> Result<()> {
let mut debounced_event = self.event_tx.lock().unwrap();
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
debounce.configure_debounced_mode(config);
}
Ok(())
}
}

impl Drop for FsEventWatcher {
Expand Down
14 changes: 13 additions & 1 deletion src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ extern crate walkdir;
use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
use self::walkdir::WalkDir;
use super::debounce::{Debounce, EventTx};
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config};
use mio;
use mio_extras;
use std::collections::HashMap;
Expand Down Expand Up @@ -54,6 +54,7 @@ enum EventLoopMsg {
RemoveWatch(PathBuf, Sender<Result<()>>),
Shutdown,
RenameTimeout(u32),
Configure(Config),
}

#[inline]
Expand Down Expand Up @@ -201,6 +202,11 @@ impl EventLoop {
send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
}
}
EventLoopMsg::Configure(config) => {
if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx {
debounce.configure_debounced_mode(config);
}
}
}
}
}
Expand Down Expand Up @@ -486,6 +492,12 @@ impl Watcher for INotifyWatcher {
self.0.lock().unwrap().send(msg).unwrap();
rx.recv().unwrap()
}

fn configure(&self, config: Config) -> Result<()> {
let msg = EventLoopMsg::Configure(config);
self.0.lock().unwrap().send(msg).unwrap();
Ok(())
}
}

impl Drop for INotifyWatcher {
Expand Down
23 changes: 23 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ mod op_test {
fn new_bitflags_form() {
let op = super::op::Op::CHMOD | super::op::Op::WRITE;
assert!(op.contains(super::op::Op::WRITE));
assert!(op.contains(super::op::Op::CHMOD));
}

#[test]
Expand Down Expand Up @@ -446,6 +447,9 @@ pub enum DebouncedEvent {
///
/// This event may contain a path for which the error was detected.
Error(Error, Option<PathBuf>),

/// Event emitted when a file being watched is to be tailed.
OnGoingWrite(PathBuf),
}

impl PartialEq for DebouncedEvent {
Expand Down Expand Up @@ -611,6 +615,25 @@ pub trait Watcher: Sized {
/// Returns an error in the case that `path` has not been watched or if removing the watch
/// fails.
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>;

/// Configure notify with Configs.
fn configure(&self, option: Config) -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about this...should I make this a collection of Config rather than just a Config?

// Default implementation because null and poll watcher are not configurable (but can be in future)
Ok(())
}
}

/// Configurations that can be used when watching a file/directory.
pub enum Config {
/// In debounced mode a WRITE event is fired every X unit of time if no WRITE occurs before X.
/// But in some scenarios (like when tailing a file) we would never receive the WRITE event
/// because the watchee is being written to every Y unit of time where Y < X.
/// Use this config to let notify emit DebouncedEvent::OnGoingWrite event before emitting a
/// WRITE event. Once a WRITE event is emitted notify will cancel OnGoingWrite (but still emit
/// OnGoingWrite in the future)
/// Hence the Duration of this config should be less than watchers delay.
/// To stop emitting OnGoingWrite, pass this config with None.
OngoingWrites(Option<Duration>),
}

/// The recommended `Watcher` implementation for the current platform
Expand Down
14 changes: 13 additions & 1 deletion src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use winapi::um::winbase::{self, INFINITE, WAIT_OBJECT_0};
use winapi::um::winnt::{self, FILE_NOTIFY_INFORMATION, HANDLE};

use super::debounce::{Debounce, EventTx};
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher, Config};
use std::collections::HashMap;
use std::env;
use std::ffi::OsString;
Expand Down Expand Up @@ -54,6 +54,7 @@ enum Action {
Watch(PathBuf, RecursiveMode),
Unwatch(PathBuf),
Stop,
Configure(Config),
}

pub enum MetaEvent {
Expand Down Expand Up @@ -118,6 +119,12 @@ impl ReadDirectoryChangesServer {
stop_watch(ws, &self.meta_tx);
}
break;
},
Action::Configure(config) => {
let mut debounced_event = self.event_tx.lock().unwrap();
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
debounce.configure_debounced_mode(config);
}
}
}
}
Expand Down Expand Up @@ -562,6 +569,11 @@ impl Watcher for ReadDirectoryChangesWatcher {
self.wakeup_server();
res
}

fn configure(&self, config: Config) -> Result<()> {
self.tx.send(Action::Configure(config));
Ok(())
}
}

impl Drop for ReadDirectoryChangesWatcher {
Expand Down