-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdedupequeue.go
More file actions
169 lines (151 loc) · 4.45 KB
/
dedupequeue.go
File metadata and controls
169 lines (151 loc) · 4.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package inflight
import (
"container/list"
"sync"
)
// OpQueue is a thread-safe duplicate operation suppression queue, that combines
// duplicate operations (queue entires) into sets that will be dequeued together.
//
// For example, If you enqueue an item with a key that already exists, then that
// item will be appended to that key's set of items. Otherwise the item is
// inserted into the head of the list as a new item.
//
// On Dequeue a SET is returned of all items that share a key in the queue.
// It blocks on dequeue if the queue is empty, but returns an error if the
// queue is full during enqueue.
type DedupeQueue struct {
mu sync.Mutex
cond sync.Cond
depth int
width int
q *list.List
entries map[ID]*OpSet
backup map[ID]*OpSet
closed bool
}
// NewOpQueue create a new OpQueue.
func NewDedupeQueue(depth, width int) *DedupeQueue {
q := DedupeQueue{
depth: depth,
width: width,
q: list.New(),
entries: map[ID]*OpSet{},
backup: map[ID]*OpSet{},
}
q.cond.L = &q.mu
return &q
}
// Close releases resources associated with this callgroup, by canceling the context.
// The owner of this OpQueue should either call Close or cancel the context, both are
// equivalent.
func (q *DedupeQueue) Close() {
q.mu.Lock()
q.closed = true
q.mu.Unlock()
q.cond.Broadcast() // alert all dequeue calls that they should wake up and return.
}
// Len returns the number of uniq IDs in the queue, that is the depth of the queue.
func (q *DedupeQueue) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.q.Len()
}
// Enqueue add the op to the queue. If the ID already exists then the Op
// is added to the existing OpSet for this ID, otherwise it's inserted as a new
// OpSet.
//
// Enqueue doesn't block if the queue if full, instead it returns a ErrQueueSaturated
// error.
func (q *DedupeQueue) Enqueue(id ID, op *Op) error {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return ErrQueueClosed
}
if set, ok := q.backup[id]; ok {
if len(set.Ops()) >= q.width {
return ErrQueueSaturatedWidth
}
set.append(op)
return nil
}
set, ok := q.entries[id]
if !ok {
// This is a new item, so we need to insert it into the queue.
if q.q.Len() >= q.depth {
return ErrQueueSaturatedDepth
}
q.newEntry(id, op)
// Signal one waiting go routine to wake up and Dequeue
// I believe we only need to signal if we enqueue a new item.
// Consider the following possible states the queue could be in :
// 1. if no one is currently waiting in Dequeue, the signal isn't
// needed and all items will be dequeued on the next call to
// Dequeue.
// 2. One or Many go-routines are waiting in Dequeue because it's
// empty, and calling Signal will wake up one. Which will dequeue
// the item and return.
// 3. At most One go-routine is in the act of Dequeueing existing items
// from the queue (i.e. only one can have the lock and be in the "if OK"
// condition within the forloop in Dequeue). In which cause the signal
// is ignored and after returning we return to condition (1) above.
// Note signaled waiting go-routines will not be able the acquire
// the condition lock until this method call returns, finishing
// its append of the new operation.
q.cond.Signal()
return nil
}
if len(set.Ops()) >= q.width {
return ErrQueueSaturatedWidth
}
set.append(op)
return nil
}
// Dequeue removes the oldest OpSet from the queue and returns it.
// Dequeue will block if the Queue is empty. An Enqueue will wake the
// go routine up and it will continue on.
//
// If the OpQueue is closed, then Dequeue will return false
// for the second parameter.
func (q *DedupeQueue) Dequeue(callback func(*OpSet)) bool {
q.mu.Lock()
for {
if id, set, ok := q.dequeue(); ok {
q.mu.Unlock()
callback(set)
q.mu.Lock()
defer q.mu.Unlock()
if q.backup[id].Len() > 0 {
q.entries[id] = q.backup[id]
q.q.PushBack(id)
}
delete(q.backup, id)
return true
}
if q.closed {
q.mu.Unlock()
return false
}
q.cond.Wait()
}
}
func (q *DedupeQueue) newEntry(id ID, op *Op) {
set := newOpSet(op)
q.entries[id] = set
q.q.PushBack(id)
}
func (q *DedupeQueue) dequeue() (ID, *OpSet, bool) {
elem := q.q.Front()
if elem == nil {
return 0, nil, false
}
idt := q.q.Remove(elem)
id := idt.(ID)
set, ok := q.entries[id]
if !ok {
panic("invariant broken: we dequeued a value that isn't in the map")
}
delete(q.entries, id)
q.backup[id] = &OpSet{}
return id, set, true
}