Skip to content

Commit

Permalink
[ongoing] Rewrite ongoing events to use a map
Browse files Browse the repository at this point in the history
The original implementation used a single Option in a Mutex to hold the
current path undergoing ongoing writes. The issue brought up by Sujit
Joshi in #191 was that this fails to perform correctly when more than
one path is undergoing ongoing writes.

The solution is to instead use a map to hold path -> instant pairs.
Because the map has to be shared between threads, and besides needs to
be held from two structs at once, a CHashMap is used. An ad-hoc
construction with stdlib data structures and sync primitives could have
been set up, but the CHashMap saves that boilerplate and offers elegant
usage patterns that make the code clearer and more understandable.

Additionally, another issue was identified in that the ongoing event
state was not cleared when the feature is disabled.

Finally, the terminology was changed from ongoing duration -> delay,
and various identifiers internally were made more concise.
  • Loading branch information
passcod committed Apr 12, 2019
1 parent 0eb4f2e commit c4d192f
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 65 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ filetime = "^0.2.1"
libc = "^0.2.4"
serde = { version = "1.0.89", features = ["derive"], optional = true }
walkdir = "^2.0.1"
chashmap = "2.2.2"

[target.'cfg(target_os="linux")'.dependencies]
inotify = { version = "^0.7", default-features = false }
Expand Down
34 changes: 8 additions & 26 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crossbeam_channel::Sender;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::time::Duration;

pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;
Expand Down Expand Up @@ -45,7 +45,10 @@ impl EventTx {
}

pub fn new_debounced(tx: Sender<DebouncedEvent>, debounce: Debounce) -> Self {
EventTx::Debounced { tx, debounce: Arc::new(Mutex::new(debounce)) }
EventTx::Debounced {
tx,
debounce: Arc::new(Mutex::new(debounce)),
}
}

pub fn debounced_tx(&self) -> Self {
Expand All @@ -59,7 +62,7 @@ impl EventTx {
match self {
EventTx::Debounced { ref debounce, .. } => {
debounce.lock().unwrap().configure(config, tx);
},
}
_ => {}
}
}
Expand Down Expand Up @@ -135,7 +138,7 @@ impl Debounce {

pub fn configure(&mut self, config: Config, tx: Sender<Result<bool>>) {
tx.send(match config {
Config::OngoingWrites(c) => self.timer.set_ongoing_write_duration(c),
Config::OngoingWrites(c) => self.timer.set_ongoing_writes(c),
_ => Ok(false),
})
.expect("configuration channel disconnected");
Expand Down Expand Up @@ -294,7 +297,7 @@ impl Debounce {
// it already was a write event
Some(op::Op::WRITE) => {
restart_timer(timer_id, path.clone(), &mut self.timer);
handle_ongoing_write_event(&self.timer, path.clone(), &self.tx);
self.timer.handle_ongoing_write(&path, &self.tx);
}

// upgrade to write event
Expand Down Expand Up @@ -552,24 +555,3 @@ fn restart_timer(timer_id: &mut Option<u64>, path: PathBuf, timer: &mut WatchTim
}
*timer_id = Some(timer.schedule(path));
}

fn handle_ongoing_write_event(timer: &WatchTimer, path: PathBuf, tx: &Sender<DebouncedEvent>) {
let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap();
let mut event_details = Option::None;
if let Some(ref i) = *ongoing_write_event {
let now = Instant::now();
if i.0 <= now {
//fire event
let _ = tx.send(DebouncedEvent::OngoingWrite((i.1).clone()));
} else {
event_details = Some((i.0, i.1.clone()));
}
} else {
//schedule event
if let Some(d) = timer.ongoing_write_duration {
let fire_at = Instant::now() + d;
event_details = Some((fire_at, path));
}
}
*ongoing_write_event = event_details;
}
47 changes: 31 additions & 16 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::super::{op, DebouncedEvent, Error, Result};
use chashmap::CHashMap;
use crossbeam_channel::Sender;
use std::collections::VecDeque;
use std::path::PathBuf;
Expand Down Expand Up @@ -26,7 +27,7 @@ struct ScheduleWorker {
tx: Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
worker_ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
ongoing_writes: Arc<CHashMap<PathBuf, Instant>>,
}

impl ScheduleWorker {
Expand Down Expand Up @@ -58,10 +59,7 @@ impl ScheduleWorker {
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => {
//disable ongoing_write
let mut ongoing_write_event =
self.worker_ongoing_write_event.lock().unwrap();
*ongoing_write_event = None;
self.ongoing_writes.remove(&path);
Some(DebouncedEvent::Write(path))
}
Some(op::Op::METADATA) => Some(DebouncedEvent::Metadata(path)),
Expand Down Expand Up @@ -124,8 +122,8 @@ pub struct WatchTimer {
delay: Duration,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
stopped: Arc<AtomicBool>,
pub ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
pub ongoing_write_duration: Option<Duration>,
ongoing_writes: Arc<CHashMap<PathBuf, Instant>>,
ongoing_delay: Option<Duration>,
}

impl WatchTimer {
Expand All @@ -138,13 +136,13 @@ impl WatchTimer {
let new_event_trigger = Arc::new(Condvar::new());
let stop_trigger = Arc::new(Condvar::new());
let stopped = Arc::new(AtomicBool::new(false));
let ongoing_writes = Arc::new(CHashMap::new());

let worker_new_event_trigger = new_event_trigger.clone();
let worker_stop_trigger = stop_trigger.clone();
let worker_events = events.clone();
let worker_stopped = stopped.clone();
let ongoing_write_event = Arc::new(Mutex::new(None));
let worker_ongoing_write_event = ongoing_write_event.clone();
let worker_ongoing_writes = ongoing_writes.clone();
thread::spawn(move || {
ScheduleWorker {
new_event_trigger: worker_new_event_trigger,
Expand All @@ -153,7 +151,7 @@ impl WatchTimer {
tx,
operations_buffer,
stopped: worker_stopped,
worker_ongoing_write_event,
ongoing_writes: worker_ongoing_writes,
}
.run();
});
Expand All @@ -165,22 +163,39 @@ impl WatchTimer {
delay,
events,
stopped,
ongoing_write_event,
ongoing_write_duration: None,
ongoing_writes,
ongoing_delay: None,
}
}

pub fn set_ongoing_write_duration(&mut self, duration: Option<Duration>) -> Result<bool> {
if let Some(duration) = duration {
if duration > self.delay {
pub fn set_ongoing_writes(&mut self, delay: Option<Duration>) -> Result<bool> {
if let Some(delay) = delay {
if delay > self.delay {
return Err(Error::InvalidConfigValue);
}
} else if self.ongoing_delay.is_some() {
// Reset the current ongoing state when disabling
self.ongoing_writes.clear();
}

self.ongoing_write_duration = duration;
self.ongoing_delay = delay;
Ok(true)
}

pub fn handle_ongoing_write(&self, path: &PathBuf, tx: &Sender<DebouncedEvent>) {
if let Some(delay) = self.ongoing_delay {
self.ongoing_writes.upsert(
path.clone(),
|| Instant::now() + delay,
|fire_at| {
if fire_at <= &mut Instant::now() {
tx.send(DebouncedEvent::OngoingWrite(path.clone())).ok();
}
},
);
}
}

pub fn schedule(&mut self, path: PathBuf) -> u64 {
self.counter = self.counter.wrapping_add(1);

Expand Down
5 changes: 1 addition & 4 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,7 @@ impl Watcher for FsEventWatcher {
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
event_tx: Arc::new(EventTx::new_debounced(
tx.clone(),
Debounce::new(delay, tx),
)),
event_tx: Arc::new(EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx))),
runloop: None,
context: None,
recursive_info: HashMap::new(),
Expand Down
10 changes: 2 additions & 8 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,7 @@ impl EventLoop {
}

if !o.is_empty() {
send_pending_rename_event(
&mut self.rename_event,
&self.event_tx,
);
send_pending_rename_event(&mut self.rename_event, &self.event_tx);

self.event_tx.send(RawEvent {
path: path,
Expand Down Expand Up @@ -460,10 +457,7 @@ impl Watcher for INotifyWatcher {

fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<INotifyWatcher> {
let inotify = Inotify::init()?;
let event_tx = EventTx::new_debounced(
tx.clone(),
Debounce::new(delay, tx),
);
let event_tx = EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx));
let event_loop = EventLoop::new(inotify, event_tx)?;
let channel = event_loop.channel();
event_loop.run();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
extern crate anymap;
#[macro_use]
extern crate bitflags;
extern crate chashmap;
extern crate crossbeam_channel;
extern crate filetime;
#[cfg(target_os = "macos")]
Expand Down
10 changes: 5 additions & 5 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use filetime::FileTime;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, atomic::{AtomicBool, Ordering}, Mutex};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
};
use std::thread;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -197,10 +200,7 @@ impl Watcher for PollWatcher {
}

fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<PollWatcher> {
let event_tx = EventTx::new_debounced(
tx.clone(),
Debounce::new(delay, tx),
);
let event_tx = EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx));
let mut p = PollWatcher {
event_tx: event_tx.debounced_tx(),
watches: Arc::new(Mutex::new(HashMap::new())),
Expand Down
18 changes: 12 additions & 6 deletions tests/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ fn poll_watch_recursive_create_directory() {
let tdir = TempDir::new("temp_dir").expect("failed to create temporary directory");

let (tx, rx) = unbounded();
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher");
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50))
.expect("failed to create poll watcher");
watcher
.watch(tdir.mkpath("."), RecursiveMode::Recursive)
.expect("failed to watch directory");
Expand Down Expand Up @@ -738,7 +739,8 @@ fn poll_watch_recursive_move() {
tdir.create_all(vec!["dir1a"]);

let (tx, rx) = unbounded();
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher");
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50))
.expect("failed to create poll watcher");
watcher
.watch(tdir.mkpath("."), RecursiveMode::Recursive)
.expect("failed to watch directory");
Expand Down Expand Up @@ -802,7 +804,8 @@ fn poll_watch_recursive_move_in() {
tdir.create_all(vec!["watch_dir", "dir1a/dir1"]);

let (tx, rx) = unbounded();
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher");
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50))
.expect("failed to create poll watcher");
watcher
.watch(tdir.mkpath("watch_dir"), RecursiveMode::Recursive)
.expect("failed to watch directory");
Expand Down Expand Up @@ -855,7 +858,8 @@ fn poll_watch_recursive_move_out() {
tdir.create_all(vec!["watch_dir/dir1a/dir1"]);

let (tx, rx) = unbounded();
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher");
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50))
.expect("failed to create poll watcher");
watcher
.watch(tdir.mkpath("watch_dir"), RecursiveMode::Recursive)
.expect("failed to watch directory");
Expand Down Expand Up @@ -909,7 +913,8 @@ fn poll_watch_nonrecursive() {
tdir.create_all(vec!["dir1"]);

let (tx, rx) = unbounded();
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher");
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50))
.expect("failed to create poll watcher");
watcher
.watch(tdir.mkpath("."), RecursiveMode::NonRecursive)
.expect("failed to watch directory");
Expand Down Expand Up @@ -939,7 +944,8 @@ fn poll_watch_file() {
tdir.create_all(vec!["file1"]);

let (tx, rx) = unbounded();
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50)).expect("failed to create poll watcher");
let mut watcher = PollWatcher::with_delay(tx, Duration::from_millis(50))
.expect("failed to create poll watcher");
watcher
.watch(tdir.mkpath("file1"), RecursiveMode::Recursive)
.expect("failed to watch directory");
Expand Down

0 comments on commit c4d192f

Please sign in to comment.