Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions notify/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
- FIX: [macOS] prevent handler panicking in the FSEvents callback panics [#790]
- FIX: [macOS] prevent panicking when path contains non UTF-8 chars [#790]
- FIX: [linux] fix a TOCTOU issue when adding a watcher [#792]
- FIX: [macOS] remove `FSEventsPurgeEventsForDeviceUpToEventId` call [#795]

[#705]: https://github.com/notify-rs/notify/pull/705
[#767]: https://github.com/notify-rs/notify/pull/767
[#790]: https://github.com/notify-rs/notify/pull/790
[#792]: https://github.com/notify-rs/notify/pull/792
[#795]: https://github.com/notify-rs/notify/pull/795

## notify 9.0.0-rc.1 (2026-01-25)

Expand Down
152 changes: 135 additions & 17 deletions notify/src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::fmt;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::ptr::{self, NonNull};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;

Expand Down Expand Up @@ -67,11 +68,18 @@ pub struct FsEventWatcher {
latency: cf::CFTimeInterval,
flags: fs::FSEventStreamCreateFlags,
event_handler: Arc<Mutex<dyn EventHandler>>,
runloop: Option<(cf::CFRetained<cf::CFRunLoop>, thread::JoinHandle<()>)>,
runloop: Option<RunLoopHandle>,
recursive_info: HashMap<PathBuf, bool>,
event_kinds: EventKindMask,
}

#[derive(Debug)]
struct RunLoopHandle {
runloop: cf::CFRetained<cf::CFRunLoop>,
stop_flag: Arc<AtomicBool>,
thread_handle: thread::JoinHandle<()>,
}

impl fmt::Debug for FsEventWatcher {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("FsEventWatcher")
Expand Down Expand Up @@ -354,13 +362,17 @@ impl FsEventWatcher {
return;
}

if let Some((runloop, thread_handle)) = self.runloop.take() {
while !runloop.is_waiting() {
thread::yield_now();
}

if let Some(RunLoopHandle {
runloop,
stop_flag,
thread_handle,
}) = self.runloop.take()
{
// Don't wait for the runloop to become "waiting" before stopping; if the
// stream is under heavy load that can delay shutdown indefinitely.
stop_flag.store(true, Ordering::Release);
runloop.stop();

runloop.wake_up();
// Wait for the thread to shut down.
thread_handle.join().expect("thread to shut down");
}
Expand Down Expand Up @@ -481,6 +493,11 @@ impl FsEventWatcher {
// channel to pass runloop around
let (rl_tx, rl_rx) = unbounded();

// Used to stop the runloop thread without relying on privileged APIs or
// on `CFRunLoopIsWaiting()` becoming true under heavy event load.
let stop_flag = Arc::new(AtomicBool::new(false));
let stop_flag_thread = Arc::clone(&stop_flag);

let thread_handle = thread::Builder::new()
.name("notify-rs fsevents loop".to_string())
.spawn(move || {
Expand Down Expand Up @@ -509,27 +526,29 @@ impl FsEventWatcher {
return;
}

// the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop()
// `stop()` will call `CFRunLoopStop` + `CFRunLoopWakeUp` and then join this
// thread.
rl_tx
.send(Ok(CFRunLoopSendWrapper(cur_runloop)))
.expect("Unable to send runloop to watcher");

cf::CFRunLoop::run();
// Avoid polling the runloop: block indefinitely until `CFRunLoopStop` is
// called (or until the runloop is otherwise finished).
if !stop_flag_thread.load(Ordering::Acquire) {
cf::CFRunLoop::run();
}
fs::FSEventStreamStop(stream);
// There are edge-cases, when many events are pending,
// despite the stream being stopped, that the stream's
// associated callback will be invoked. Purging events
// is intended to prevent this.
let event_id = fs::FSEventsGetCurrentEventId();
let device = fs::FSEventStreamGetDeviceBeingWatched(stream);
fs::FSEventsPurgeEventsForDeviceUpToEventId(device, event_id);
fs::FSEventStreamInvalidate(stream);
fs::FSEventStreamRelease(stream);
}
})?;
// block until runloop has been sent
let runloop_wrapper = rl_rx.recv().unwrap()?;
self.runloop = Some((runloop_wrapper.0, thread_handle));
self.runloop = Some(RunLoopHandle {
runloop: runloop_wrapper.0,
stop_flag,
thread_handle,
});

Ok(())
}
Expand Down Expand Up @@ -719,6 +738,105 @@ mod tests {
channel()
}

#[test]
fn stop_does_not_wait_for_runloop_to_be_waiting() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::time::Instant;

// Regression test for a shutdown hang where `stop()` waited for
// `CFRunLoopIsWaiting()` to become true. If the runloop is busy (e.g. under
// heavy event load), it may never enter a waiting state.

let dir = tempfile::tempdir().unwrap();

let (tx, _rx) = mpsc::channel::<crate::Result<Event>>();
let mut watcher = FsEventWatcher::new(tx, Default::default()).unwrap();
watcher.watch(dir.path(), RecursiveMode::Recursive).unwrap();

let runloop = watcher
.runloop
.as_ref()
.expect("watcher to be running")
.runloop
.clone();
let mode = unsafe { cf::kCFRunLoopDefaultMode.expect("default runloop mode") };

// Keep the runloop continuously "busy" by creating a source that signals itself in its
// perform callback.
struct SourceHammer {
source: *const cf::CFRunLoopSource,
fires: AtomicUsize,
}

unsafe extern "C-unwind" fn hammer_source(info: *mut std::ffi::c_void) {
let Some(hammer) = (info as *const SourceHammer).as_ref() else {
return;
};
hammer.fires.fetch_add(1, Ordering::Relaxed);

// Signal the source again so the runloop has more work to do.
let Some(source) = (hammer.source as *const cf::CFRunLoopSource).as_ref() else {
return;
};
source.signal();
}

let mut hammer = Box::new(SourceHammer {
source: std::ptr::null(),
fires: AtomicUsize::new(0),
});

let mut ctx = cf::CFRunLoopSourceContext {
version: 0,
info: (&mut *hammer as *mut SourceHammer).cast(),
retain: None,
release: None,
copyDescription: None,
equal: None,
hash: None,
schedule: None,
cancel: None,
perform: Some(hammer_source),
};

let source = unsafe {
cf::CFRunLoopSource::new(cf::kCFAllocatorDefault, 0, &mut ctx)
.expect("source to be created")
};
hammer.source = cf::CFRetained::as_ptr(&source).as_ptr();

runloop.add_source(Some(&source), Some(mode));
source.signal();
runloop.wake_up();

// Ensure our setup actually made the runloop busy.
let setup_start = Instant::now();
while hammer.fires.load(Ordering::Relaxed) == 0
&& setup_start.elapsed() < Duration::from_secs(1)
{
std::thread::yield_now();
}
assert!(
hammer.fires.load(Ordering::Relaxed) > 0,
"runloop source never fired; test setup failed"
);

let (done_tx, done_rx) = mpsc::channel::<()>();
std::thread::spawn(move || {
drop(watcher);
let _ = done_tx.send(());
});

// If shutdown regresses, this would hang indefinitely; keep the test bounded.
done_rx
.recv_timeout(Duration::from_secs(5))
.expect("dropping FsEventWatcher timed out (possible shutdown hang)");

// No cleanup: The source is owned by the runloop; removing sources cross-thread can be
// sensitive on some systems. Dropping the last reference to the runloop will release it.
}

#[test]
fn test_fsevent_watcher_drop() {
use super::*;
Expand Down