Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,13 @@ func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
// watcher is behind compaction revision
if w.compacted {
// compaction notification was successfully sent in chooseAll;
// remove from the original unsynced group
s.unsynced.delete(w)
}
// else: failed to send compacted watch response due to w.ch is full.
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
continue
}
Expand Down
17 changes: 12 additions & 5 deletions server/storage/mvcc/watcher_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,12 @@ func (wg *watcherGroup) delete(wa *watcher) bool {
return true
}

// choose selects watchers from the watcher group to update
// choose selects watchers from the watcher group to update.
// It always returns a new watcherGroup containing the selected watchers,
// never the original group. This is critical because chooseAll() may delete
// watchers from the returned group, and we must not modify the original
// unsynced group while the caller (syncWatchers) iterates over it.
func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
if len(wg.watchers) < maxWatchers {
return wg, wg.chooseAll(curRev, compactRev)
}
ret := newWatcherGroup()
for w := range wg.watchers {
if maxWatchers <= 0 {
Expand All @@ -249,10 +250,16 @@ func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
w.restore = false
}
if w.minRev < compactRev {
// Skip if already marked as compacted (notification already sent).
// This can happen when multiple goroutines race to process the same watcher.
if w.compacted {
continue
}
select {
case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
w.compacted = true
wg.delete(w)
// Note: we don't delete from wg here. The caller (syncWatchers)
// will delete from the original unsynced group after iterating.
default:
// retry next time
}
Expand Down