Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frequently written files may get debounced indefinitely #183

Merged
merged 16 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, Instant};

pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;
Expand Down Expand Up @@ -97,6 +97,11 @@ impl Debounce {
}
}

pub fn set_on_going_write_duration(&mut self, duration: Duration) {
self.timer.set_on_going_write_duration(duration);
}


fn check_partial_rename(&mut self, path: PathBuf, op: op::Op, cookie: Option<u32>) {
if let Ok(mut op_buf) = self.operations_buffer.lock() {
// the previous event was a rename event, but this one isn't; something went wrong
Expand Down Expand Up @@ -250,6 +255,7 @@ impl Debounce {
// it already was a write event
Some(op::Op::WRITE) => {
restart_timer(timer_id, path.clone(), &mut self.timer);
handle_on_going_write_event(&self.timer, path.clone(), &self.tx);
}

// upgrade to write event
Expand Down Expand Up @@ -507,3 +513,31 @@ fn restart_timer(timer_id: &mut Option<u64>, path: PathBuf, timer: &mut WatchTim
}
*timer_id = Some(timer.schedule(path));
}

fn handle_on_going_write_event(timer: &WatchTimer, path: PathBuf, tx: &mpsc::Sender<DebouncedEvent>) {
let mut on_going_write_event = timer.on_going_write_event.lock().unwrap();
let mut emitted = false;
let mut to_be_scheduled = false;
if let Some(ref i) = *on_going_write_event {
let now = Instant::now();
if i.0 <= now {
//fire event
let _ = tx.send(DebouncedEvent::OnGoingWrite((i.1).clone()));
emitted = true;
}
} else {
//schedule event
if let Some(_) = timer.on_going_write_duration {
to_be_scheduled = true;
}
}

if to_be_scheduled {
let duration = timer.on_going_write_duration.unwrap();
let tt = Instant::now() + duration;
*on_going_write_event = Some((tt, path));
}
if emitted {
*on_going_write_event = None;
}
passcod marked this conversation as resolved.
Show resolved Hide resolved
}
19 changes: 18 additions & 1 deletion src/debounce/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct ScheduleWorker {
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
worker_on_going_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
}

impl ScheduleWorker {
Expand Down Expand Up @@ -56,7 +57,12 @@ impl ScheduleWorker {
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::WRITE) => {
//disable on_going_write
let mut on_going_write_event = self.worker_on_going_write_event.lock().unwrap();
*on_going_write_event = None;
Some(DebouncedEvent::Write(path))
},
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
Expand Down Expand Up @@ -116,6 +122,8 @@ pub struct WatchTimer {
delay: Duration,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
stopped: Arc<AtomicBool>,
pub on_going_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
pub on_going_write_duration: Option<Duration>,
}

impl WatchTimer {
Expand All @@ -133,6 +141,8 @@ impl WatchTimer {
let worker_stop_trigger = stop_trigger.clone();
let worker_events = events.clone();
let worker_stopped = stopped.clone();
let on_going_write_event = Arc::new(Mutex::new(None));
let worker_on_going_write_event = on_going_write_event.clone();
thread::spawn(move || {
ScheduleWorker {
new_event_trigger: worker_new_event_trigger,
Expand All @@ -141,6 +151,7 @@ impl WatchTimer {
tx,
operations_buffer,
stopped: worker_stopped,
worker_on_going_write_event: worker_on_going_write_event,
}
.run();
});
Expand All @@ -152,9 +163,15 @@ impl WatchTimer {
delay,
events,
stopped,
on_going_write_event,
on_going_write_duration: None,
}
}

pub fn set_on_going_write_duration(&mut self, duration: Duration) {
self.on_going_write_duration = Some(duration);
}

pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);

Expand Down
7 changes: 7 additions & 0 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,13 @@ impl Watcher for FsEventWatcher {
let _ = self.run();
result
}

fn set_on_going_write_duration(&self, duration: Duration) {
let mut debounced_event = self.event_tx.lock().unwrap();
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
debounce.set_on_going_write_duration(duration);
}
}
}

impl Drop for FsEventWatcher {
Expand Down
11 changes: 11 additions & 0 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum EventLoopMsg {
RemoveWatch(PathBuf, Sender<Result<()>>),
Shutdown,
RenameTimeout(u32),
OnGoingWriteDelay(Duration),
}

#[inline]
Expand Down Expand Up @@ -201,6 +202,11 @@ impl EventLoop {
send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
}
}
EventLoopMsg::OnGoingWriteDelay(duration) => {
if let EventTx::Debounced {ref tx,ref mut debounce} = self.event_tx {
debounce.set_on_going_write_duration(duration);
}
}
}
}
}
Expand Down Expand Up @@ -486,6 +492,11 @@ impl Watcher for INotifyWatcher {
self.0.lock().unwrap().send(msg).unwrap();
rx.recv().unwrap()
}

fn set_on_going_write_duration(&self, duration: Duration) {
let msg = EventLoopMsg::OnGoingWriteDelay(duration);
self.0.lock().unwrap().send(msg).unwrap();
}
}

impl Drop for INotifyWatcher {
Expand Down
24 changes: 17 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,19 +310,20 @@ pub mod op {
/// Multiple actions may be delivered in a single event.
pub struct Op: u32 {
/// Attributes changed
const CHMOD = 0b0000001;
const CHMOD = 0b00000001;
/// Created
const CREATE = 0b0000010;
const CREATE = 0b00000010;
/// Removed
const REMOVE = 0b0000100;
const REMOVE = 0b00000100;
/// Renamed
const RENAME = 0b0001000;
const RENAME = 0b00001000;
/// Written
const WRITE = 0b0010000;
const WRITE = 0b00010000;
/// File opened for writing was closed
const CLOSE_WRITE = 0b0100000;
const CLOSE_WRITE = 0b00100000;
/// Directories need to be rescanned
const RESCAN = 0b1000000;
const RESCAN = 0b01000000;
const ON_GOING_WRITE = 0b10000000;
}
}

Expand All @@ -333,6 +334,7 @@ pub mod op {
pub const WRITE: Op = Op::WRITE;
pub const CLOSE_WRITE: Op = Op::CLOSE_WRITE;
pub const RESCAN: Op = Op::RESCAN;
pub const ON_GOING_WRITE: Op = Op::ON_GOING_WRITE;
}

#[cfg(test)]
Expand All @@ -348,6 +350,7 @@ mod op_test {
fn new_bitflags_form() {
let op = super::op::Op::CHMOD | super::op::Op::WRITE;
assert!(op.contains(super::op::Op::WRITE));
assert!(op.contains(super::op::Op::CHMOD));
}

#[test]
Expand Down Expand Up @@ -446,6 +449,9 @@ pub enum DebouncedEvent {
///
/// This event may contain a path for which the error was detected.
Error(Error, Option<PathBuf>),

/// Event emitted when a file being watched is to be tailed.
OnGoingWrite(PathBuf),
}

impl PartialEq for DebouncedEvent {
Expand Down Expand Up @@ -611,6 +617,10 @@ pub trait Watcher: Sized {
/// Returns an error in the case that `path` has not been watched or if removing the watch
/// fails.
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()>;

///Sets the duration for DebouncedEvent::OnGoingWrite. When set, OnGoingWrite event will be
/// fired every "duration" units.
fn set_on_going_write_duration(&self, duration: Duration);
passcod marked this conversation as resolved.
Show resolved Hide resolved
}

/// The recommended `Watcher` implementation for the current platform
Expand Down
4 changes: 4 additions & 0 deletions src/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl Watcher for NullWatcher {
fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
Ok(())
}

fn set_on_going_write_duration(&self, duration: Duration) {

}
}
4 changes: 4 additions & 0 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ impl Watcher for PollWatcher {
Err(Error::WatchNotFound)
}
}

fn set_on_going_write_duration(&self, duration: Duration) {

}
}

impl Drop for PollWatcher {
Expand Down
11 changes: 11 additions & 0 deletions src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum Action {
Watch(PathBuf, RecursiveMode),
Unwatch(PathBuf),
Stop,
SetOnGoingWriteEventDuration(Duration),
}

pub enum MetaEvent {
Expand Down Expand Up @@ -118,6 +119,12 @@ impl ReadDirectoryChangesServer {
stop_watch(ws, &self.meta_tx);
}
break;
},
Action::SetOnGoingWriteEventDuration(duration) => {
let mut debounced_event = self.event_tx.lock().unwrap();
if let EventTx::Debounced {ref tx,ref mut debounce} = *debounced_event {
debounce.set_on_going_write_duration(duration);
}
}
}
}
Expand Down Expand Up @@ -562,6 +569,10 @@ impl Watcher for ReadDirectoryChangesWatcher {
self.wakeup_server();
res
}

fn set_on_going_write_duration(&self, duration: Duration) {
self.tx.send(Action::SetOnGoingWriteEventDuration(duration));
}
}

impl Drop for ReadDirectoryChangesWatcher {
Expand Down